16 "git.arvados.org/arvados.git/sdk/go/arvados"
17 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
18 "git.arvados.org/arvados.git/sdk/go/keepclient"
19 log "github.com/sirupsen/logrus"
20 "golang.org/x/crypto/blake2b"
21 "golang.org/x/net/websocket"
24 type eventMessage struct {
26 ObjectUUID string `json:"object_uuid"`
27 EventType string `json:"event_type"`
33 type arvadosClient struct {
35 notifying map[string]map[chan<- eventMessage]int
36 wantClose chan struct{}
37 wsconn *websocket.Conn
41 // Listen for events concerning the given uuids. When an event occurs
42 // (and after connecting/reconnecting to the event stream), send each
43 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
44 // be sent only once for each update, but two Unsubscribe calls will
45 // be needed to stop sending them.
46 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
48 defer client.mtx.Unlock()
49 if client.notifying == nil {
50 client.notifying = map[string]map[chan<- eventMessage]int{}
51 client.wantClose = make(chan struct{})
52 go client.runNotifier()
54 chmap := client.notifying[uuid]
56 chmap = map[chan<- eventMessage]int{}
57 client.notifying[uuid] = chmap
60 for _, nch := range chmap {
67 if needSub && client.wsconn != nil {
68 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
69 "method": "subscribe",
70 "filters": [][]interface{}{
71 {"object_uuid", "=", uuid},
72 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
78 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
80 defer client.mtx.Unlock()
81 chmap := client.notifying[uuid]
82 if n := chmap[ch] - 1; n == 0 {
85 delete(client.notifying, uuid)
87 if client.wsconn != nil {
88 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
89 "method": "unsubscribe",
90 "filters": [][]interface{}{
91 {"object_uuid", "=", uuid},
92 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
101 func (client *arvadosClient) Close() {
103 defer client.mtx.Unlock()
104 if client.notifying != nil {
105 client.notifying = nil
106 close(client.wantClose)
110 func (client *arvadosClient) runNotifier() {
113 var cluster arvados.Cluster
114 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
116 log.Warnf("error getting cluster config: %s", err)
117 time.Sleep(5 * time.Second)
120 wsURL := cluster.Services.Websocket.ExternalURL
121 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
122 wsURL.Path = "/websocket"
123 wsURLNoToken := wsURL.String()
124 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
125 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
127 log.Warnf("websocket connection error: %s", err)
128 time.Sleep(5 * time.Second)
131 log.Printf("connected to websocket at %s", wsURLNoToken)
135 resubscribe := make([]string, 0, len(client.notifying))
136 for uuid := range client.notifying {
137 resubscribe = append(resubscribe, uuid)
142 w := json.NewEncoder(conn)
143 for _, uuid := range resubscribe {
144 w.Encode(map[string]interface{}{
145 "method": "subscribe",
146 "filters": [][]interface{}{
147 {"object_uuid", "=", uuid},
148 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
154 r := json.NewDecoder(conn)
157 err := r.Decode(&msg)
159 case <-client.wantClose:
163 log.Printf("error decoding websocket message: %s", err)
171 for ch := range client.notifying[msg.ObjectUUID] {
180 var refreshTicker = time.NewTicker(5 * time.Second)
182 type arvadosContainerRunner struct {
183 Client *arvados.Client
189 Prog string // if empty, run /proc/self/exe
191 Mounts map[string]map[string]interface{}
195 func (runner *arvadosContainerRunner) Run() (string, error) {
196 return runner.RunContext(context.Background())
199 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
200 if runner.ProjectUUID == "" {
201 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
204 mounts := map[string]map[string]interface{}{
206 "kind": "collection",
210 for path, mnt := range runner.Mounts {
216 prog = "/mnt/cmd/lightning"
217 cmdUUID, err := runner.makeCommandCollection()
221 mounts["/mnt/cmd"] = map[string]interface{}{
222 "kind": "collection",
226 command := append([]string{prog}, runner.Args...)
228 priority := runner.Priority
232 rc := arvados.RuntimeConstraints{
235 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
237 outname := &runner.OutputName
241 var cr arvados.ContainerRequest
242 err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
243 "container_request": map[string]interface{}{
244 "owner_uuid": runner.ProjectUUID,
246 "container_image": "lightning-runtime",
249 "use_existing": true,
250 "output_path": "/mnt/output",
251 "output_name": outname,
252 "runtime_constraints": rc,
253 "priority": runner.Priority,
254 "state": arvados.ContainerRequestStateCommitted,
260 log.Printf("container request UUID: %s", cr.UUID)
261 log.Printf("container UUID: %s", cr.ContainerUUID)
263 logch := make(chan eventMessage)
264 client := arvadosClient{Client: runner.Client}
268 if subscribedUUID != "" {
269 client.Unsubscribe(logch, subscribedUUID)
275 lastState := cr.State
276 refreshCR := func() {
277 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
279 fmt.Fprint(os.Stderr, neednewline)
280 log.Printf("error getting container request: %s", err)
283 if lastState != cr.State {
284 fmt.Fprint(os.Stderr, neednewline)
285 log.Printf("container request state: %s", cr.State)
288 if subscribedUUID != cr.ContainerUUID {
289 fmt.Fprint(os.Stderr, neednewline)
291 if subscribedUUID != "" {
292 client.Unsubscribe(logch, subscribedUUID)
294 client.Subscribe(logch, cr.ContainerUUID)
295 subscribedUUID = cr.ContainerUUID
299 var reCrunchstat = regexp.MustCompile(`mem .* rss`)
301 for cr.State != arvados.ContainerRequestStateFinal {
304 err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
305 "container_request": map[string]interface{}{
310 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
313 case <-refreshTicker.C:
316 switch msg.EventType {
320 for _, line := range strings.Split(msg.Properties.Text, "\n") {
322 fmt.Fprint(os.Stderr, neednewline)
328 for _, line := range strings.Split(msg.Properties.Text, "\n") {
329 mem := reCrunchstat.FindString(line)
331 fmt.Fprintf(os.Stderr, "%s \r", mem)
338 fmt.Fprint(os.Stderr, neednewline)
340 if err := ctx.Err(); err != nil {
344 var c arvados.Container
345 err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
348 } else if c.State != arvados.ContainerStateComplete {
349 return "", fmt.Errorf("container did not complete: %s", c.State)
350 } else if c.ExitCode != 0 {
351 return "", fmt.Errorf("container exited %d", c.ExitCode)
353 return cr.OutputUUID, err
356 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
358 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
359 if runner.Mounts == nil {
360 runner.Mounts = make(map[string]map[string]interface{})
362 for _, path := range paths {
363 if *path == "" || *path == "-" {
366 m := collectionInPathRe.FindStringSubmatch(*path)
368 return fmt.Errorf("cannot find uuid in path: %q", *path)
371 mnt, ok := runner.Mounts["/mnt/"+uuid]
373 mnt = map[string]interface{}{
374 "kind": "collection",
377 runner.Mounts["/mnt/"+uuid] = mnt
379 *path = "/mnt/" + uuid + m[3]
384 var mtxMakeCommandCollection sync.Mutex
386 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
387 mtxMakeCommandCollection.Lock()
388 defer mtxMakeCommandCollection.Unlock()
389 exe, err := ioutil.ReadFile("/proc/self/exe")
393 b2 := blake2b.Sum256(exe)
394 cname := fmt.Sprintf("lightning-%x", b2)
395 var existing arvados.CollectionList
396 err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
399 Filters: []arvados.Filter{
400 {Attr: "name", Operator: "=", Operand: cname},
401 {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
407 if len(existing.Items) > 0 {
408 uuid := existing.Items[0].UUID
409 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
412 log.Printf("writing lightning binary to new collection %q", cname)
413 ac, err := arvadosclient.New(runner.Client)
417 kc := keepclient.New(ac)
418 var coll arvados.Collection
419 fs, err := coll.FileSystem(runner.Client, kc)
423 f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
427 _, err = f.Write(exe)
435 mtxt, err := fs.MarshalManifest(".")
439 err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
440 "collection": map[string]interface{}{
441 "owner_uuid": runner.ProjectUUID,
442 "manifest_text": mtxt,
449 log.Printf("stored lightning binary in new collection %s", coll.UUID)
450 return coll.UUID, nil