I/O pipeline, show arvados container logs.
[lightning.git] / arvados.go
index c1b54d98236661136a5112672950c17ed6605807..c9f2e7a3bac8b87c7dda29d8d43fef7ebd567225 100644 (file)
@@ -1,17 +1,22 @@
 package main
 
 import (
+       "encoding/json"
        "errors"
        "fmt"
        "io/ioutil"
        "log"
+       "net/url"
        "os"
        "regexp"
+       "strings"
+       "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "golang.org/x/crypto/blake2b"
+       "golang.org/x/net/websocket"
 )
 
 type arvadosContainerRunner struct {
@@ -25,9 +30,9 @@ type arvadosContainerRunner struct {
        Mounts      map[string]map[string]interface{}
 }
 
-func (runner *arvadosContainerRunner) Run() error {
+func (runner *arvadosContainerRunner) Run() (string, error) {
        if runner.ProjectUUID == "" {
-               return errors.New("cannot run arvados container: ProjectUUID not provided")
+               return "", errors.New("cannot run arvados container: ProjectUUID not provided")
        }
 
        mounts := map[string]map[string]interface{}{
@@ -46,7 +51,7 @@ func (runner *arvadosContainerRunner) Run() error {
                prog = "/mnt/cmd/lightning"
                cmdUUID, err := runner.makeCommandCollection()
                if err != nil {
-                       return err
+                       return "", err
                }
                mounts["/mnt/cmd"] = map[string]interface{}{
                        "kind": "collection",
@@ -76,7 +81,65 @@ func (runner *arvadosContainerRunner) Run() error {
                },
        })
        log.Print(cr.UUID)
-       return err
+
+       var logch <-chan string
+       var logstream *logStream
+       defer func() {
+               if logstream != nil {
+                       logstream.Close()
+               }
+       }()
+
+       ticker := time.NewTicker(5 * time.Second)
+       defer ticker.Stop()
+
+       lastState := cr.State
+       subscribedUUID := ""
+       for cr.State != arvados.ContainerRequestState(arvados.ContainerRequestStateFinal) {
+               if logch == nil && cr.ContainerUUID != subscribedUUID {
+                       if logstream != nil {
+                               logstream.Close()
+                       }
+                       logstream = runner.logStream(cr.ContainerUUID)
+                       logch = logstream.C
+               }
+               select {
+               case msg, ok := <-logch:
+                       if !ok {
+                               logstream.Close()
+                               logstream = nil
+                               logch = nil
+                               break
+                       }
+                       if msg != "" {
+                               log.Print(msg)
+                               continue
+                       }
+                       // empty message indicates an "update" event
+                       // -- fall out of the select and get the
+                       // latest version now, instead of waiting for
+                       // the next timer tick.
+               case <-ticker.C:
+               }
+               err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
+               if err != nil {
+                       return "", err
+               }
+               if lastState != cr.State {
+                       log.Printf("container state: %s", cr.State)
+                       lastState = cr.State
+               }
+       }
+
+       var c arvados.Container
+       err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
+       if err != nil {
+               return "", err
+       }
+       if c.ExitCode != 0 {
+               return "", fmt.Errorf("container exited %d", c.ExitCode)
+       }
+       return cr.OutputUUID, err
 }
 
 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
@@ -86,7 +149,7 @@ func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
                runner.Mounts = make(map[string]map[string]interface{})
        }
        for _, path := range paths {
-               if *path == "" {
+               if *path == "" || *path == "-" {
                        continue
                }
                m := collectionInPathRe.FindStringSubmatch(*path)
@@ -171,3 +234,73 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
        log.Printf("stored lightning binary in new collection %s", coll.UUID)
        return coll.UUID, nil
 }
+
+type logStream struct {
+       C     <-chan string
+       Close func() error
+}
+
+func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
+       ch := make(chan string)
+       done := make(chan struct{})
+       go func() {
+               defer close(ch)
+               var cluster arvados.Cluster
+               runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
+               wsURL := cluster.Services.Websocket.ExternalURL
+               wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
+               wsURL.Path = "/websocket"
+               wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
+               conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
+               if err != nil {
+                       ch <- fmt.Sprintf("websocket error: %s", err)
+                       return
+               }
+               w := json.NewEncoder(conn)
+               go w.Encode(map[string]interface{}{
+                       "method": "subscribe",
+                       "filters": [][]interface{}{
+                               {"object_uuid", "=", uuid},
+                               {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
+                       },
+               })
+               r := json.NewDecoder(conn)
+               for {
+                       var msg struct {
+                               Status     int
+                               ObjectUUID string `json:"object_uuid"`
+                               EventType  string `json:"event_type"`
+                               Properties struct {
+                                       Text string
+                               }
+                       }
+                       err := r.Decode(&msg)
+                       if err != nil {
+                               log.Printf("error decoding websocket message: %s", err)
+                               return
+                       }
+                       if msg.ObjectUUID == uuid {
+                               for _, line := range strings.Split(msg.Properties.Text, "\n") {
+                                       if line != "" {
+                                               ch <- line
+                                       }
+                               }
+                               if msg.EventType == "update" {
+                                       ch <- ""
+                               }
+                       }
+                       select {
+                       case <-done:
+                               return
+                       default:
+                       }
+               }
+       }()
+       return &logStream{
+               C: ch,
+               Close: func() error {
+                       close(done)
+                       return nil
+               },
+       }
+}