1 // Copyright (C) The Lightning Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.arvados.org/arvados.git/lib/cmd"
24 "git.arvados.org/arvados.git/sdk/go/arvados"
25 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
26 "git.arvados.org/arvados.git/sdk/go/keepclient"
27 "github.com/klauspost/pgzip"
28 log "github.com/sirupsen/logrus"
29 "golang.org/x/crypto/blake2b"
30 "golang.org/x/net/websocket"
33 type eventMessage struct {
35 ObjectUUID string `json:"object_uuid"`
36 EventType string `json:"event_type"`
42 type arvadosClient struct {
44 notifying map[string]map[chan<- eventMessage]int
45 wantClose chan struct{}
46 wsconn *websocket.Conn
50 // Listen for events concerning the given uuids. When an event occurs
51 // (and after connecting/reconnecting to the event stream), send each
52 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
53 // be sent only once for each update, but two Unsubscribe calls will
54 // be needed to stop sending them.
55 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
57 defer client.mtx.Unlock()
58 if client.notifying == nil {
59 client.notifying = map[string]map[chan<- eventMessage]int{}
60 client.wantClose = make(chan struct{})
61 go client.runNotifier()
63 chmap := client.notifying[uuid]
65 chmap = map[chan<- eventMessage]int{}
66 client.notifying[uuid] = chmap
69 for _, nch := range chmap {
76 if needSub && client.wsconn != nil {
77 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
78 "method": "subscribe",
79 "filters": [][]interface{}{
80 {"object_uuid", "=", uuid},
81 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
87 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
89 defer client.mtx.Unlock()
90 chmap := client.notifying[uuid]
91 if n := chmap[ch] - 1; n == 0 {
94 delete(client.notifying, uuid)
96 if client.wsconn != nil {
97 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
98 "method": "unsubscribe",
99 "filters": [][]interface{}{
100 {"object_uuid", "=", uuid},
101 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
110 func (client *arvadosClient) Close() {
112 defer client.mtx.Unlock()
113 if client.notifying != nil {
114 client.notifying = nil
115 close(client.wantClose)
119 func (client *arvadosClient) runNotifier() {
122 var cluster arvados.Cluster
123 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
125 log.Warnf("error getting cluster config: %s", err)
126 time.Sleep(5 * time.Second)
129 wsURL := cluster.Services.Websocket.ExternalURL
130 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
131 wsURL.Path = "/websocket"
132 wsURLNoToken := wsURL.String()
133 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
134 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
136 log.Warnf("websocket connection error: %s", err)
137 time.Sleep(5 * time.Second)
140 log.Printf("connected to websocket at %s", wsURLNoToken)
144 resubscribe := make([]string, 0, len(client.notifying))
145 for uuid := range client.notifying {
146 resubscribe = append(resubscribe, uuid)
151 w := json.NewEncoder(conn)
152 for _, uuid := range resubscribe {
153 w.Encode(map[string]interface{}{
154 "method": "subscribe",
155 "filters": [][]interface{}{
156 {"object_uuid", "=", uuid},
157 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
163 r := json.NewDecoder(conn)
166 err := r.Decode(&msg)
168 case <-client.wantClose:
172 log.Printf("error decoding websocket message: %s", err)
180 for ch := range client.notifying[msg.ObjectUUID] {
181 go func() { ch <- msg }()
189 var refreshTicker = time.NewTicker(5 * time.Second)
191 type arvadosContainerRunner struct {
192 Client *arvados.Client
199 Prog string // if empty, run /proc/self/exe
201 Mounts map[string]map[string]interface{}
203 KeepCache int // cache buffers per VCPU (0 for default)
207 func (runner *arvadosContainerRunner) Run() (string, error) {
208 return runner.RunContext(context.Background())
211 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
212 if runner.ProjectUUID == "" {
213 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
216 mounts := map[string]map[string]interface{}{
218 "kind": "collection",
222 for path, mnt := range runner.Mounts {
228 prog = "/mnt/cmd/lightning"
229 cmdUUID, err := runner.makeCommandCollection()
233 mounts["/mnt/cmd"] = map[string]interface{}{
234 "kind": "collection",
238 command := append([]string{prog}, runner.Args...)
240 priority := runner.Priority
244 keepCache := runner.KeepCache
248 rc := arvados.RuntimeConstraints{
249 API: runner.APIAccess,
252 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
254 outname := &runner.OutputName
258 var cr arvados.ContainerRequest
259 err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
260 "container_request": map[string]interface{}{
261 "owner_uuid": runner.ProjectUUID,
263 "container_image": "lightning-runtime",
266 "use_existing": true,
267 "output_path": "/mnt/output",
268 "output_name": outname,
269 "runtime_constraints": rc,
270 "priority": runner.Priority,
271 "state": arvados.ContainerRequestStateCommitted,
272 "scheduling_parameters": arvados.SchedulingParameters{
273 Preemptible: runner.Preemptible,
274 Partitions: []string{},
276 "environment": map[string]string{
277 "GOMAXPROCS": fmt.Sprintf("%d", rc.VCPUs),
279 "container_count_max": 1,
285 log.Printf("container request UUID: %s", cr.UUID)
286 log.Printf("container UUID: %s", cr.ContainerUUID)
288 logch := make(chan eventMessage)
289 client := arvadosClient{Client: runner.Client}
293 if subscribedUUID != "" {
294 log.Printf("unsubscribe container UUID: %s", subscribedUUID)
295 client.Unsubscribe(logch, subscribedUUID)
301 lastState := cr.State
302 refreshCR := func() {
303 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Minute))
305 err = runner.Client.RequestAndDecodeContext(ctx, &cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
307 fmt.Fprint(os.Stderr, neednewline)
309 log.Printf("error getting container request: %s", err)
312 if lastState != cr.State {
313 fmt.Fprint(os.Stderr, neednewline)
315 log.Printf("container request state: %s", cr.State)
318 if subscribedUUID != cr.ContainerUUID {
319 fmt.Fprint(os.Stderr, neednewline)
321 if subscribedUUID != "" {
322 log.Printf("unsubscribe container UUID: %s", subscribedUUID)
323 client.Unsubscribe(logch, subscribedUUID)
325 log.Printf("subscribe container UUID: %s", cr.ContainerUUID)
326 client.Subscribe(logch, cr.ContainerUUID)
327 subscribedUUID = cr.ContainerUUID
331 var reCrunchstat = regexp.MustCompile(`mem .* (\d+) rss`)
333 for cr.State != arvados.ContainerRequestStateFinal {
336 err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
337 "container_request": map[string]interface{}{
342 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
345 case <-refreshTicker.C:
348 switch msg.EventType {
352 for _, line := range strings.Split(msg.Properties.Text, "\n") {
354 fmt.Fprint(os.Stderr, neednewline)
360 for _, line := range strings.Split(msg.Properties.Text, "\n") {
361 m := reCrunchstat.FindStringSubmatch(line)
363 rss, _ := strconv.ParseInt(m[1], 10, 64)
364 fmt.Fprintf(os.Stderr, "%s rss %.3f GB \r", cr.UUID, float64(rss)/1e9)
371 fmt.Fprint(os.Stderr, neednewline)
373 if err := ctx.Err(); err != nil {
377 var c arvados.Container
378 err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
381 } else if c.State != arvados.ContainerStateComplete {
382 return "", fmt.Errorf("container did not complete: %s", c.State)
383 } else if c.ExitCode != 0 {
384 return "", fmt.Errorf("container exited %d", c.ExitCode)
386 return cr.OutputUUID, err
389 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
391 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
392 if runner.Mounts == nil {
393 runner.Mounts = make(map[string]map[string]interface{})
395 for _, path := range paths {
396 if *path == "" || *path == "-" {
399 m := collectionInPathRe.FindStringSubmatch(*path)
401 return fmt.Errorf("cannot find uuid in path: %q", *path)
404 mnt, ok := runner.Mounts["/mnt/"+collID]
406 mnt = map[string]interface{}{
407 "kind": "collection",
409 if len(collID) == 27 {
412 mnt["portable_data_hash"] = collID
414 runner.Mounts["/mnt/"+collID] = mnt
416 *path = "/mnt/" + collID + m[3]
421 var mtxMakeCommandCollection sync.Mutex
423 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
424 mtxMakeCommandCollection.Lock()
425 defer mtxMakeCommandCollection.Unlock()
426 exe, err := ioutil.ReadFile("/proc/self/exe")
430 b2 := blake2b.Sum256(exe)
431 cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
432 var existing arvados.CollectionList
433 err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
436 Filters: []arvados.Filter{
437 {Attr: "name", Operator: "=", Operand: cname},
438 {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
439 {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
445 if len(existing.Items) > 0 {
446 coll := existing.Items[0]
447 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
448 return coll.UUID, nil
450 log.Printf("writing lightning binary to new collection %q", cname)
451 ac, err := arvadosclient.New(runner.Client)
455 kc := keepclient.New(ac)
456 var coll arvados.Collection
457 fs, err := coll.FileSystem(runner.Client, kc)
461 f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
465 _, err = f.Write(exe)
473 mtxt, err := fs.MarshalManifest(".")
477 err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
478 "collection": map[string]interface{}{
479 "owner_uuid": runner.ProjectUUID,
480 "manifest_text": mtxt,
482 "properties": map[string]interface{}{
483 "blake2b": fmt.Sprintf("%x", b2),
490 log.Printf("stored lightning binary in new collection %s", coll.UUID)
491 return coll.UUID, nil
494 // zopen returns a reader for the given file, using the arvados API
495 // instead of arv-mount/fuse where applicable, and transparently
496 // decompressing the input if fnm ends with ".gz".
497 func zopen(fnm string) (io.ReadCloser, error) {
499 if err != nil || !strings.HasSuffix(fnm, ".gz") {
502 rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024))
507 return gzipr{rdr, f}, nil
510 // gzipr wraps a ReadCloser and a Closer, presenting a single Close()
511 // method that closes both wrapped objects.
517 func (gr gzipr) Close() error {
518 e1 := gr.ReadCloser.Close()
519 e2 := gr.Closer.Close()
527 arvadosClientFromEnv = arvados.NewClientFromEnv()
528 keepClient *keepclient.KeepClient
529 siteFS arvados.CustomFileSystem
533 type file interface {
536 Readdir(n int) ([]os.FileInfo, error)
539 func open(fnm string) (file, error) {
540 if os.Getenv("ARVADOS_API_HOST") == "" {
543 m := collectionInPathRe.FindStringSubmatch(fnm)
547 collectionUUID := m[2]
548 collectionPath := m[3]
551 defer siteFSMtx.Unlock()
553 log.Info("setting up Arvados client")
554 ac, err := arvadosclient.New(arvadosClientFromEnv)
558 ac.Client = arvados.DefaultSecureClient
559 keepClient = keepclient.New(ac)
560 // Don't use keepclient's default short timeouts.
561 keepClient.HTTPClient = arvados.DefaultSecureClient
562 keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4}
563 siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient)
565 keepClient.BlockCache.MaxBlocks += 2
568 log.Infof("reading %q from %s using Arvados client", collectionPath, collectionUUID)
569 f, err := siteFS.Open("by_id/" + collectionUUID + collectionPath)
573 return &reduceCacheOnClose{file: f}, nil
576 type reduceCacheOnClose struct {
581 func (rc *reduceCacheOnClose) Close() error {
582 rc.once.Do(func() { keepClient.BlockCache.MaxBlocks -= 2 })
583 return rc.file.Close()