18 "git.arvados.org/arvados.git/lib/cmd"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
21 "git.arvados.org/arvados.git/sdk/go/keepclient"
22 log "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/blake2b"
24 "golang.org/x/net/websocket"
27 type eventMessage struct {
29 ObjectUUID string `json:"object_uuid"`
30 EventType string `json:"event_type"`
36 type arvadosClient struct {
38 notifying map[string]map[chan<- eventMessage]int
39 wantClose chan struct{}
40 wsconn *websocket.Conn
44 // Listen for events concerning the given uuids. When an event occurs
45 // (and after connecting/reconnecting to the event stream), send each
46 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
47 // be sent only once for each update, but two Unsubscribe calls will
48 // be needed to stop sending them.
49 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
51 defer client.mtx.Unlock()
52 if client.notifying == nil {
53 client.notifying = map[string]map[chan<- eventMessage]int{}
54 client.wantClose = make(chan struct{})
55 go client.runNotifier()
57 chmap := client.notifying[uuid]
59 chmap = map[chan<- eventMessage]int{}
60 client.notifying[uuid] = chmap
63 for _, nch := range chmap {
70 if needSub && client.wsconn != nil {
71 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
72 "method": "subscribe",
73 "filters": [][]interface{}{
74 {"object_uuid", "=", uuid},
75 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
81 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
83 defer client.mtx.Unlock()
84 chmap := client.notifying[uuid]
85 if n := chmap[ch] - 1; n == 0 {
88 delete(client.notifying, uuid)
90 if client.wsconn != nil {
91 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
92 "method": "unsubscribe",
93 "filters": [][]interface{}{
94 {"object_uuid", "=", uuid},
95 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
104 func (client *arvadosClient) Close() {
106 defer client.mtx.Unlock()
107 if client.notifying != nil {
108 client.notifying = nil
109 close(client.wantClose)
113 func (client *arvadosClient) runNotifier() {
116 var cluster arvados.Cluster
117 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
119 log.Warnf("error getting cluster config: %s", err)
120 time.Sleep(5 * time.Second)
123 wsURL := cluster.Services.Websocket.ExternalURL
124 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
125 wsURL.Path = "/websocket"
126 wsURLNoToken := wsURL.String()
127 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
128 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
130 log.Warnf("websocket connection error: %s", err)
131 time.Sleep(5 * time.Second)
134 log.Printf("connected to websocket at %s", wsURLNoToken)
138 resubscribe := make([]string, 0, len(client.notifying))
139 for uuid := range client.notifying {
140 resubscribe = append(resubscribe, uuid)
145 w := json.NewEncoder(conn)
146 for _, uuid := range resubscribe {
147 w.Encode(map[string]interface{}{
148 "method": "subscribe",
149 "filters": [][]interface{}{
150 {"object_uuid", "=", uuid},
151 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
157 r := json.NewDecoder(conn)
160 err := r.Decode(&msg)
162 case <-client.wantClose:
166 log.Printf("error decoding websocket message: %s", err)
174 for ch := range client.notifying[msg.ObjectUUID] {
183 var refreshTicker = time.NewTicker(5 * time.Second)
185 type arvadosContainerRunner struct {
186 Client *arvados.Client
193 Prog string // if empty, run /proc/self/exe
195 Mounts map[string]map[string]interface{}
197 KeepCache int // cache buffers per VCPU (0 for default)
200 func (runner *arvadosContainerRunner) Run() (string, error) {
201 return runner.RunContext(context.Background())
204 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
205 if runner.ProjectUUID == "" {
206 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
209 mounts := map[string]map[string]interface{}{
211 "kind": "collection",
215 for path, mnt := range runner.Mounts {
221 prog = "/mnt/cmd/lightning"
222 cmdUUID, err := runner.makeCommandCollection()
226 mounts["/mnt/cmd"] = map[string]interface{}{
227 "kind": "collection",
231 command := append([]string{prog}, runner.Args...)
233 priority := runner.Priority
237 keepCache := runner.KeepCache
241 rc := arvados.RuntimeConstraints{
242 API: &runner.APIAccess,
245 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
247 outname := &runner.OutputName
251 var cr arvados.ContainerRequest
252 err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
253 "container_request": map[string]interface{}{
254 "owner_uuid": runner.ProjectUUID,
256 "container_image": "lightning-runtime",
259 "use_existing": true,
260 "output_path": "/mnt/output",
261 "output_name": outname,
262 "runtime_constraints": rc,
263 "priority": runner.Priority,
264 "state": arvados.ContainerRequestStateCommitted,
270 log.Printf("container request UUID: %s", cr.UUID)
271 log.Printf("container UUID: %s", cr.ContainerUUID)
273 logch := make(chan eventMessage)
274 client := arvadosClient{Client: runner.Client}
278 if subscribedUUID != "" {
279 client.Unsubscribe(logch, subscribedUUID)
285 lastState := cr.State
286 refreshCR := func() {
287 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
289 fmt.Fprint(os.Stderr, neednewline)
290 log.Printf("error getting container request: %s", err)
293 if lastState != cr.State {
294 fmt.Fprint(os.Stderr, neednewline)
295 log.Printf("container request state: %s", cr.State)
298 if subscribedUUID != cr.ContainerUUID {
299 fmt.Fprint(os.Stderr, neednewline)
301 if subscribedUUID != "" {
302 client.Unsubscribe(logch, subscribedUUID)
304 client.Subscribe(logch, cr.ContainerUUID)
305 subscribedUUID = cr.ContainerUUID
309 var reCrunchstat = regexp.MustCompile(`mem .* rss`)
311 for cr.State != arvados.ContainerRequestStateFinal {
314 err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
315 "container_request": map[string]interface{}{
320 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
323 case <-refreshTicker.C:
326 switch msg.EventType {
330 for _, line := range strings.Split(msg.Properties.Text, "\n") {
332 fmt.Fprint(os.Stderr, neednewline)
338 for _, line := range strings.Split(msg.Properties.Text, "\n") {
339 mem := reCrunchstat.FindString(line)
341 fmt.Fprintf(os.Stderr, "%s \r", mem)
348 fmt.Fprint(os.Stderr, neednewline)
350 if err := ctx.Err(); err != nil {
354 var c arvados.Container
355 err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
358 } else if c.State != arvados.ContainerStateComplete {
359 return "", fmt.Errorf("container did not complete: %s", c.State)
360 } else if c.ExitCode != 0 {
361 return "", fmt.Errorf("container exited %d", c.ExitCode)
363 return cr.OutputUUID, err
366 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
368 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
369 if runner.Mounts == nil {
370 runner.Mounts = make(map[string]map[string]interface{})
372 for _, path := range paths {
373 if *path == "" || *path == "-" {
376 m := collectionInPathRe.FindStringSubmatch(*path)
378 return fmt.Errorf("cannot find uuid in path: %q", *path)
381 mnt, ok := runner.Mounts["/mnt/"+uuid]
383 mnt = map[string]interface{}{
384 "kind": "collection",
387 runner.Mounts["/mnt/"+uuid] = mnt
389 *path = "/mnt/" + uuid + m[3]
394 var mtxMakeCommandCollection sync.Mutex
396 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
397 mtxMakeCommandCollection.Lock()
398 defer mtxMakeCommandCollection.Unlock()
399 exe, err := ioutil.ReadFile("/proc/self/exe")
403 b2 := blake2b.Sum256(exe)
404 cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
405 var existing arvados.CollectionList
406 err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
409 Filters: []arvados.Filter{
410 {Attr: "name", Operator: "=", Operand: cname},
411 {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
412 {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
418 if len(existing.Items) > 0 {
419 coll := existing.Items[0]
420 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
421 return coll.UUID, nil
423 log.Printf("writing lightning binary to new collection %q", cname)
424 ac, err := arvadosclient.New(runner.Client)
428 kc := keepclient.New(ac)
429 var coll arvados.Collection
430 fs, err := coll.FileSystem(runner.Client, kc)
434 f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
438 _, err = f.Write(exe)
446 mtxt, err := fs.MarshalManifest(".")
450 err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
451 "collection": map[string]interface{}{
452 "owner_uuid": runner.ProjectUUID,
453 "manifest_text": mtxt,
455 "properties": map[string]interface{}{
456 "blake2b": fmt.Sprintf("%x", b2),
463 log.Printf("stored lightning binary in new collection %s", coll.UUID)
464 return coll.UUID, nil
468 arvadosClientFromEnv = arvados.NewClientFromEnv()
469 siteFS arvados.CustomFileSystem
473 func open(fnm string) (io.ReadCloser, error) {
474 if os.Getenv("ARVADOS_API_HOST") == "" {
477 m := collectionInPathRe.FindStringSubmatch(fnm)
482 mnt := "/mnt/" + uuid + "/"
483 if !strings.HasPrefix(fnm, mnt) {
488 defer siteFSMtx.Unlock()
490 log.Info("setting up Arvados client")
491 ac, err := arvadosclient.New(arvadosClientFromEnv)
495 ac.Client = arvados.DefaultSecureClient
496 kc := keepclient.New(ac)
497 // Don't use keepclient's default short timeouts.
498 kc.HTTPClient = arvados.DefaultSecureClient
499 // Guess max concurrent readers, hope to avoid cache
501 kc.BlockCache = &keepclient.BlockCache{MaxBlocks: runtime.NumCPU() * 3}
502 siteFS = arvadosClientFromEnv.SiteFileSystem(kc)
505 log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid)
506 return siteFS.Open("by_id/" + uuid + "/" + fnm[len(mnt):])