1 // Copyright (C) The Lightning Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/cmd"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25 "git.arvados.org/arvados.git/sdk/go/keepclient"
26 "github.com/klauspost/pgzip"
27 log "github.com/sirupsen/logrus"
28 "golang.org/x/crypto/blake2b"
29 "golang.org/x/net/websocket"
32 type eventMessage struct {
34 ObjectUUID string `json:"object_uuid"`
35 EventType string `json:"event_type"`
41 type arvadosClient struct {
43 notifying map[string]map[chan<- eventMessage]int
44 wantClose chan struct{}
45 wsconn *websocket.Conn
49 // Listen for events concerning the given uuids. When an event occurs
50 // (and after connecting/reconnecting to the event stream), send each
51 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
52 // be sent only once for each update, but two Unsubscribe calls will
53 // be needed to stop sending them.
54 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
56 defer client.mtx.Unlock()
57 if client.notifying == nil {
58 client.notifying = map[string]map[chan<- eventMessage]int{}
59 client.wantClose = make(chan struct{})
60 go client.runNotifier()
62 chmap := client.notifying[uuid]
64 chmap = map[chan<- eventMessage]int{}
65 client.notifying[uuid] = chmap
68 for _, nch := range chmap {
75 if needSub && client.wsconn != nil {
76 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
77 "method": "subscribe",
78 "filters": [][]interface{}{
79 {"object_uuid", "=", uuid},
80 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
86 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
88 defer client.mtx.Unlock()
89 chmap := client.notifying[uuid]
90 if n := chmap[ch] - 1; n == 0 {
93 delete(client.notifying, uuid)
95 if client.wsconn != nil {
96 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
97 "method": "unsubscribe",
98 "filters": [][]interface{}{
99 {"object_uuid", "=", uuid},
100 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
109 func (client *arvadosClient) Close() {
111 defer client.mtx.Unlock()
112 if client.notifying != nil {
113 client.notifying = nil
114 close(client.wantClose)
118 func (client *arvadosClient) runNotifier() {
121 var cluster arvados.Cluster
122 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
124 log.Warnf("error getting cluster config: %s", err)
125 time.Sleep(5 * time.Second)
128 wsURL := cluster.Services.Websocket.ExternalURL
129 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
130 wsURL.Path = "/websocket"
131 wsURLNoToken := wsURL.String()
132 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
133 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
135 log.Warnf("websocket connection error: %s", err)
136 time.Sleep(5 * time.Second)
139 log.Printf("connected to websocket at %s", wsURLNoToken)
143 resubscribe := make([]string, 0, len(client.notifying))
144 for uuid := range client.notifying {
145 resubscribe = append(resubscribe, uuid)
150 w := json.NewEncoder(conn)
151 for _, uuid := range resubscribe {
152 w.Encode(map[string]interface{}{
153 "method": "subscribe",
154 "filters": [][]interface{}{
155 {"object_uuid", "=", uuid},
156 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
162 r := json.NewDecoder(conn)
165 err := r.Decode(&msg)
167 case <-client.wantClose:
171 log.Printf("error decoding websocket message: %s", err)
179 for ch := range client.notifying[msg.ObjectUUID] {
188 var refreshTicker = time.NewTicker(5 * time.Second)
190 type arvadosContainerRunner struct {
191 Client *arvados.Client
198 Prog string // if empty, run /proc/self/exe
200 Mounts map[string]map[string]interface{}
202 KeepCache int // cache buffers per VCPU (0 for default)
205 func (runner *arvadosContainerRunner) Run() (string, error) {
206 return runner.RunContext(context.Background())
209 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
210 if runner.ProjectUUID == "" {
211 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
214 mounts := map[string]map[string]interface{}{
216 "kind": "collection",
220 for path, mnt := range runner.Mounts {
226 prog = "/mnt/cmd/lightning"
227 cmdUUID, err := runner.makeCommandCollection()
231 mounts["/mnt/cmd"] = map[string]interface{}{
232 "kind": "collection",
236 command := append([]string{prog}, runner.Args...)
238 priority := runner.Priority
242 keepCache := runner.KeepCache
246 rc := arvados.RuntimeConstraints{
247 API: &runner.APIAccess,
250 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
252 outname := &runner.OutputName
256 var cr arvados.ContainerRequest
257 err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
258 "container_request": map[string]interface{}{
259 "owner_uuid": runner.ProjectUUID,
261 "container_image": "lightning-runtime",
264 "use_existing": true,
265 "output_path": "/mnt/output",
266 "output_name": outname,
267 "runtime_constraints": rc,
268 "priority": runner.Priority,
269 "state": arvados.ContainerRequestStateCommitted,
270 "scheduling_parameters": arvados.SchedulingParameters{
272 Partitions: []string{},
274 "environment": map[string]string{
275 "GOMAXPROCS": fmt.Sprintf("%d", rc.VCPUs),
282 log.Printf("container request UUID: %s", cr.UUID)
283 log.Printf("container UUID: %s", cr.ContainerUUID)
285 logch := make(chan eventMessage)
286 client := arvadosClient{Client: runner.Client}
290 if subscribedUUID != "" {
291 log.Printf("unsubscribe container UUID: %s", subscribedUUID)
292 client.Unsubscribe(logch, subscribedUUID)
298 lastState := cr.State
299 refreshCR := func() {
300 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
302 fmt.Fprint(os.Stderr, neednewline)
303 log.Printf("error getting container request: %s", err)
306 if lastState != cr.State {
307 fmt.Fprint(os.Stderr, neednewline)
308 log.Printf("container request state: %s", cr.State)
311 if subscribedUUID != cr.ContainerUUID {
312 fmt.Fprint(os.Stderr, neednewline)
314 if subscribedUUID != "" {
315 log.Printf("unsubscribe container UUID: %s", subscribedUUID)
316 client.Unsubscribe(logch, subscribedUUID)
318 log.Printf("subscribe container UUID: %s", cr.ContainerUUID)
319 client.Subscribe(logch, cr.ContainerUUID)
320 subscribedUUID = cr.ContainerUUID
324 var reCrunchstat = regexp.MustCompile(`mem .* rss`)
326 for cr.State != arvados.ContainerRequestStateFinal {
329 err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
330 "container_request": map[string]interface{}{
335 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
338 case <-refreshTicker.C:
341 switch msg.EventType {
345 for _, line := range strings.Split(msg.Properties.Text, "\n") {
347 fmt.Fprint(os.Stderr, neednewline)
353 for _, line := range strings.Split(msg.Properties.Text, "\n") {
354 mem := reCrunchstat.FindString(line)
356 fmt.Fprintf(os.Stderr, "%s \r", mem)
363 fmt.Fprint(os.Stderr, neednewline)
365 if err := ctx.Err(); err != nil {
369 var c arvados.Container
370 err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
373 } else if c.State != arvados.ContainerStateComplete {
374 return "", fmt.Errorf("container did not complete: %s", c.State)
375 } else if c.ExitCode != 0 {
376 return "", fmt.Errorf("container exited %d", c.ExitCode)
378 return cr.OutputUUID, err
381 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
383 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
384 if runner.Mounts == nil {
385 runner.Mounts = make(map[string]map[string]interface{})
387 for _, path := range paths {
388 if *path == "" || *path == "-" {
391 m := collectionInPathRe.FindStringSubmatch(*path)
393 return fmt.Errorf("cannot find uuid in path: %q", *path)
396 mnt, ok := runner.Mounts["/mnt/"+uuid]
398 mnt = map[string]interface{}{
399 "kind": "collection",
402 runner.Mounts["/mnt/"+uuid] = mnt
404 *path = "/mnt/" + uuid + m[3]
409 var mtxMakeCommandCollection sync.Mutex
411 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
412 mtxMakeCommandCollection.Lock()
413 defer mtxMakeCommandCollection.Unlock()
414 exe, err := ioutil.ReadFile("/proc/self/exe")
418 b2 := blake2b.Sum256(exe)
419 cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
420 var existing arvados.CollectionList
421 err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
424 Filters: []arvados.Filter{
425 {Attr: "name", Operator: "=", Operand: cname},
426 {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
427 {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
433 if len(existing.Items) > 0 {
434 coll := existing.Items[0]
435 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
436 return coll.UUID, nil
438 log.Printf("writing lightning binary to new collection %q", cname)
439 ac, err := arvadosclient.New(runner.Client)
443 kc := keepclient.New(ac)
444 var coll arvados.Collection
445 fs, err := coll.FileSystem(runner.Client, kc)
449 f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
453 _, err = f.Write(exe)
461 mtxt, err := fs.MarshalManifest(".")
465 err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
466 "collection": map[string]interface{}{
467 "owner_uuid": runner.ProjectUUID,
468 "manifest_text": mtxt,
470 "properties": map[string]interface{}{
471 "blake2b": fmt.Sprintf("%x", b2),
478 log.Printf("stored lightning binary in new collection %s", coll.UUID)
479 return coll.UUID, nil
482 // zopen returns a reader for the given file, using the arvados API
483 // instead of arv-mount/fuse where applicable, and transparently
484 // decompressing the input if fnm ends with ".gz".
485 func zopen(fnm string) (io.ReadCloser, error) {
487 if err != nil || !strings.HasSuffix(fnm, ".gz") {
490 rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024))
495 return gzipr{rdr, f}, nil
498 // gzipr wraps a ReadCloser and a Closer, presenting a single Close()
499 // method that closes both wrapped objects.
505 func (gr gzipr) Close() error {
506 e1 := gr.ReadCloser.Close()
507 e2 := gr.Closer.Close()
515 arvadosClientFromEnv = arvados.NewClientFromEnv()
516 keepClient *keepclient.KeepClient
517 siteFS arvados.CustomFileSystem
521 type file interface {
523 Readdir(n int) ([]os.FileInfo, error)
526 func open(fnm string) (file, error) {
527 if os.Getenv("ARVADOS_API_HOST") == "" {
530 m := collectionInPathRe.FindStringSubmatch(fnm)
535 mnt := "/mnt/" + uuid
536 if fnm != mnt && !strings.HasPrefix(fnm, mnt+"/") {
541 defer siteFSMtx.Unlock()
543 log.Info("setting up Arvados client")
544 ac, err := arvadosclient.New(arvadosClientFromEnv)
548 ac.Client = arvados.DefaultSecureClient
549 keepClient = keepclient.New(ac)
550 // Don't use keepclient's default short timeouts.
551 keepClient.HTTPClient = arvados.DefaultSecureClient
552 keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4}
553 siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient)
555 keepClient.BlockCache.MaxBlocks++
558 log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid)
559 return siteFS.Open("by_id/" + uuid + fnm[len(mnt):])