Fix up log/event handling.
authorTom Clegg <tom@tomclegg.ca>
Thu, 5 Mar 2020 15:19:14 +0000 (10:19 -0500)
committerTom Clegg <tom@tomclegg.ca>
Thu, 5 Mar 2020 15:19:14 +0000 (10:19 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

arvados.go

index c9f2e7a3bac8b87c7dda29d8d43fef7ebd567225..bb0b2d1ed79632761f7ceff27328055a079e09f3 100644 (file)
@@ -80,9 +80,10 @@ func (runner *arvadosContainerRunner) Run() (string, error) {
                        "state":               arvados.ContainerRequestStateCommitted,
                },
        })
-       log.Print(cr.UUID)
+       log.Printf("container request UUID: %s", cr.UUID)
+       log.Printf("container UUID: %s", cr.ContainerUUID)
 
-       var logch <-chan string
+       var logch <-chan eventMessage
        var logstream *logStream
        defer func() {
                if logstream != nil {
@@ -94,6 +95,18 @@ func (runner *arvadosContainerRunner) Run() (string, error) {
        defer ticker.Stop()
 
        lastState := cr.State
+       refreshCR := func() {
+               err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
+               if err != nil {
+                       log.Printf("error getting container request: %s", err)
+                       return
+               }
+               if lastState != cr.State {
+                       log.Printf("container state: %s", cr.State)
+                       lastState = cr.State
+               }
+       }
+
        subscribedUUID := ""
        for cr.State != arvados.ContainerRequestState(arvados.ContainerRequestStateFinal) {
                if logch == nil && cr.ContainerUUID != subscribedUUID {
@@ -111,23 +124,18 @@ func (runner *arvadosContainerRunner) Run() (string, error) {
                                logch = nil
                                break
                        }
-                       if msg != "" {
-                               log.Print(msg)
-                               continue
+                       switch msg.EventType {
+                       case "update":
+                               refreshCR()
+                       default:
+                               for _, line := range strings.Split(msg.Properties.Text, "\n") {
+                                       if line != "" {
+                                               log.Print(line)
+                                       }
+                               }
                        }
-                       // empty message indicates an "update" event
-                       // -- fall out of the select and get the
-                       // latest version now, instead of waiting for
-                       // the next timer tick.
                case <-ticker.C:
-               }
-               err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
-               if err != nil {
-                       return "", err
-               }
-               if lastState != cr.State {
-                       log.Printf("container state: %s", cr.State)
-                       lastState = cr.State
+                       refreshCR()
                }
        }
 
@@ -235,13 +243,22 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
        return coll.UUID, nil
 }
 
+type eventMessage struct {
+       Status     int
+       ObjectUUID string `json:"object_uuid"`
+       EventType  string `json:"event_type"`
+       Properties struct {
+               Text string
+       }
+}
+
 type logStream struct {
-       C     <-chan string
+       C     <-chan eventMessage
        Close func() error
 }
 
 func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
-       ch := make(chan string)
+       ch := make(chan eventMessage)
        done := make(chan struct{})
        go func() {
                defer close(ch)
@@ -253,7 +270,7 @@ func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
                wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
                conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
                if err != nil {
-                       ch <- fmt.Sprintf("websocket error: %s", err)
+                       log.Printf("websocket error: %s", err)
                        return
                }
                w := json.NewEncoder(conn)
@@ -266,29 +283,13 @@ func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
                })
                r := json.NewDecoder(conn)
                for {
-                       var msg struct {
-                               Status     int
-                               ObjectUUID string `json:"object_uuid"`
-                               EventType  string `json:"event_type"`
-                               Properties struct {
-                                       Text string
-                               }
-                       }
+                       var msg eventMessage
                        err := r.Decode(&msg)
                        if err != nil {
                                log.Printf("error decoding websocket message: %s", err)
                                return
                        }
-                       if msg.ObjectUUID == uuid {
-                               for _, line := range strings.Split(msg.Properties.Text, "\n") {
-                                       if line != "" {
-                                               ch <- line
-                                       }
-                               }
-                               if msg.EventType == "update" {
-                                       ch <- ""
-                               }
-                       }
+                       ch <- msg
                        select {
                        case <-done:
                                return