17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19 "git.arvados.org/arvados.git/sdk/go/keepclient"
20 log "github.com/sirupsen/logrus"
21 "golang.org/x/crypto/blake2b"
22 "golang.org/x/net/websocket"
25 type eventMessage struct {
27 ObjectUUID string `json:"object_uuid"`
28 EventType string `json:"event_type"`
34 type arvadosClient struct {
36 notifying map[string]map[chan<- eventMessage]int
37 wantClose chan struct{}
38 wsconn *websocket.Conn
42 // Listen for events concerning the given uuids. When an event occurs
43 // (and after connecting/reconnecting to the event stream), send each
44 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
45 // be sent only once for each update, but two Unsubscribe calls will
46 // be needed to stop sending them.
47 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
49 defer client.mtx.Unlock()
50 if client.notifying == nil {
51 client.notifying = map[string]map[chan<- eventMessage]int{}
52 client.wantClose = make(chan struct{})
53 go client.runNotifier()
55 chmap := client.notifying[uuid]
57 chmap = map[chan<- eventMessage]int{}
58 client.notifying[uuid] = chmap
61 for _, nch := range chmap {
68 if needSub && client.wsconn != nil {
69 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
70 "method": "subscribe",
71 "filters": [][]interface{}{
72 {"object_uuid", "=", uuid},
73 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
79 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
81 defer client.mtx.Unlock()
82 chmap := client.notifying[uuid]
83 if n := chmap[ch] - 1; n == 0 {
86 delete(client.notifying, uuid)
88 if client.wsconn != nil {
89 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
90 "method": "unsubscribe",
91 "filters": [][]interface{}{
92 {"object_uuid", "=", uuid},
93 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
102 func (client *arvadosClient) Close() {
104 defer client.mtx.Unlock()
105 if client.notifying != nil {
106 client.notifying = nil
107 close(client.wantClose)
111 func (client *arvadosClient) runNotifier() {
114 var cluster arvados.Cluster
115 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
117 log.Warnf("error getting cluster config: %s", err)
118 time.Sleep(5 * time.Second)
121 wsURL := cluster.Services.Websocket.ExternalURL
122 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
123 wsURL.Path = "/websocket"
124 wsURLNoToken := wsURL.String()
125 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
126 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
128 log.Warnf("websocket connection error: %s", err)
129 time.Sleep(5 * time.Second)
132 log.Printf("connected to websocket at %s", wsURLNoToken)
136 resubscribe := make([]string, 0, len(client.notifying))
137 for uuid := range client.notifying {
138 resubscribe = append(resubscribe, uuid)
143 w := json.NewEncoder(conn)
144 for _, uuid := range resubscribe {
145 w.Encode(map[string]interface{}{
146 "method": "subscribe",
147 "filters": [][]interface{}{
148 {"object_uuid", "=", uuid},
149 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
155 r := json.NewDecoder(conn)
158 err := r.Decode(&msg)
160 case <-client.wantClose:
164 log.Printf("error decoding websocket message: %s", err)
172 for ch := range client.notifying[msg.ObjectUUID] {
181 var refreshTicker = time.NewTicker(5 * time.Second)
183 type arvadosContainerRunner struct {
184 Client *arvados.Client
191 Prog string // if empty, run /proc/self/exe
193 Mounts map[string]map[string]interface{}
195 KeepCache int // cache buffers per VCPU (0 for default)
198 func (runner *arvadosContainerRunner) Run() (string, error) {
199 return runner.RunContext(context.Background())
202 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
203 if runner.ProjectUUID == "" {
204 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
207 mounts := map[string]map[string]interface{}{
209 "kind": "collection",
213 for path, mnt := range runner.Mounts {
219 prog = "/mnt/cmd/lightning"
220 cmdUUID, err := runner.makeCommandCollection()
224 mounts["/mnt/cmd"] = map[string]interface{}{
225 "kind": "collection",
229 command := append([]string{prog}, runner.Args...)
231 priority := runner.Priority
235 keepCache := runner.KeepCache
239 rc := arvados.RuntimeConstraints{
240 API: &runner.APIAccess,
243 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
245 outname := &runner.OutputName
249 var cr arvados.ContainerRequest
250 err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
251 "container_request": map[string]interface{}{
252 "owner_uuid": runner.ProjectUUID,
254 "container_image": "lightning-runtime",
257 "use_existing": true,
258 "output_path": "/mnt/output",
259 "output_name": outname,
260 "runtime_constraints": rc,
261 "priority": runner.Priority,
262 "state": arvados.ContainerRequestStateCommitted,
268 log.Printf("container request UUID: %s", cr.UUID)
269 log.Printf("container UUID: %s", cr.ContainerUUID)
271 logch := make(chan eventMessage)
272 client := arvadosClient{Client: runner.Client}
276 if subscribedUUID != "" {
277 client.Unsubscribe(logch, subscribedUUID)
283 lastState := cr.State
284 refreshCR := func() {
285 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
287 fmt.Fprint(os.Stderr, neednewline)
288 log.Printf("error getting container request: %s", err)
291 if lastState != cr.State {
292 fmt.Fprint(os.Stderr, neednewline)
293 log.Printf("container request state: %s", cr.State)
296 if subscribedUUID != cr.ContainerUUID {
297 fmt.Fprint(os.Stderr, neednewline)
299 if subscribedUUID != "" {
300 client.Unsubscribe(logch, subscribedUUID)
302 client.Subscribe(logch, cr.ContainerUUID)
303 subscribedUUID = cr.ContainerUUID
307 var reCrunchstat = regexp.MustCompile(`mem .* rss`)
309 for cr.State != arvados.ContainerRequestStateFinal {
312 err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
313 "container_request": map[string]interface{}{
318 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
321 case <-refreshTicker.C:
324 switch msg.EventType {
328 for _, line := range strings.Split(msg.Properties.Text, "\n") {
330 fmt.Fprint(os.Stderr, neednewline)
336 for _, line := range strings.Split(msg.Properties.Text, "\n") {
337 mem := reCrunchstat.FindString(line)
339 fmt.Fprintf(os.Stderr, "%s \r", mem)
346 fmt.Fprint(os.Stderr, neednewline)
348 if err := ctx.Err(); err != nil {
352 var c arvados.Container
353 err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
356 } else if c.State != arvados.ContainerStateComplete {
357 return "", fmt.Errorf("container did not complete: %s", c.State)
358 } else if c.ExitCode != 0 {
359 return "", fmt.Errorf("container exited %d", c.ExitCode)
361 return cr.OutputUUID, err
364 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
366 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
367 if runner.Mounts == nil {
368 runner.Mounts = make(map[string]map[string]interface{})
370 for _, path := range paths {
371 if *path == "" || *path == "-" {
374 m := collectionInPathRe.FindStringSubmatch(*path)
376 return fmt.Errorf("cannot find uuid in path: %q", *path)
379 mnt, ok := runner.Mounts["/mnt/"+uuid]
381 mnt = map[string]interface{}{
382 "kind": "collection",
385 runner.Mounts["/mnt/"+uuid] = mnt
387 *path = "/mnt/" + uuid + m[3]
392 var mtxMakeCommandCollection sync.Mutex
394 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
395 mtxMakeCommandCollection.Lock()
396 defer mtxMakeCommandCollection.Unlock()
397 exe, err := ioutil.ReadFile("/proc/self/exe")
401 b2 := blake2b.Sum256(exe)
402 cname := fmt.Sprintf("lightning-%x", b2)
403 var existing arvados.CollectionList
404 err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
407 Filters: []arvados.Filter{
408 {Attr: "name", Operator: "=", Operand: cname},
409 {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
415 if len(existing.Items) > 0 {
416 uuid := existing.Items[0].UUID
417 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
420 log.Printf("writing lightning binary to new collection %q", cname)
421 ac, err := arvadosclient.New(runner.Client)
425 kc := keepclient.New(ac)
426 var coll arvados.Collection
427 fs, err := coll.FileSystem(runner.Client, kc)
431 f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
435 _, err = f.Write(exe)
443 mtxt, err := fs.MarshalManifest(".")
447 err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
448 "collection": map[string]interface{}{
449 "owner_uuid": runner.ProjectUUID,
450 "manifest_text": mtxt,
457 log.Printf("stored lightning binary in new collection %s", coll.UUID)
458 return coll.UUID, nil
461 var arvadosClientFromEnv = arvados.NewClientFromEnv()
463 func open(fnm string) (io.ReadCloser, error) {
464 if os.Getenv("ARVADOS_API_HOST") == "" {
467 m := collectionInPathRe.FindStringSubmatch(fnm)
472 mnt := "/mnt/" + uuid + "/"
473 if !strings.HasPrefix(fnm, mnt) {
477 log.Infof("reading %q from %s using Arvados client library", fnm[len(mnt):], uuid)
478 ac, err := arvadosclient.New(arvadosClientFromEnv)
482 ac.Client = arvados.DefaultSecureClient
483 kc := keepclient.New(ac)
484 // Don't use keepclient's default short timeouts.
485 kc.HTTPClient = arvados.DefaultSecureClient
486 // Don't cache more than one block for this file.
487 kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 1}
489 var coll arvados.Collection
490 err = arvadosClientFromEnv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+uuid, nil, arvados.GetOptions{Select: []string{"uuid", "manifest_text"}})
494 fs, err := coll.FileSystem(arvadosClientFromEnv, kc)
498 return fs.Open(fnm[len(mnt):])