"state": arvados.ContainerRequestStateCommitted,
},
})
- log.Print(cr.UUID)
+ log.Printf("container request UUID: %s", cr.UUID)
+ log.Printf("container UUID: %s", cr.ContainerUUID)
- var logch <-chan string
+ var logch <-chan eventMessage
var logstream *logStream
defer func() {
if logstream != nil {
defer ticker.Stop()
lastState := cr.State
+ refreshCR := func() {
+ err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
+ if err != nil {
+ log.Printf("error getting container request: %s", err)
+ return
+ }
+ if lastState != cr.State {
+ log.Printf("container state: %s", cr.State)
+ lastState = cr.State
+ }
+ }
+
subscribedUUID := ""
for cr.State != arvados.ContainerRequestState(arvados.ContainerRequestStateFinal) {
if logch == nil && cr.ContainerUUID != subscribedUUID {
logch = nil
break
}
- if msg != "" {
- log.Print(msg)
- continue
+ switch msg.EventType {
+ case "update":
+ refreshCR()
+ default:
+ for _, line := range strings.Split(msg.Properties.Text, "\n") {
+ if line != "" {
+ log.Print(line)
+ }
+ }
}
- // empty message indicates an "update" event
- // -- fall out of the select and get the
- // latest version now, instead of waiting for
- // the next timer tick.
case <-ticker.C:
- }
- err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
- if err != nil {
- return "", err
- }
- if lastState != cr.State {
- log.Printf("container state: %s", cr.State)
- lastState = cr.State
+ refreshCR()
}
}
return coll.UUID, nil
}
+type eventMessage struct {
+ Status int
+ ObjectUUID string `json:"object_uuid"`
+ EventType string `json:"event_type"`
+ Properties struct {
+ Text string
+ }
+}
+
type logStream struct {
- C <-chan string
+ C <-chan eventMessage
Close func() error
}
func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
- ch := make(chan string)
+ ch := make(chan eventMessage)
done := make(chan struct{})
go func() {
defer close(ch)
wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
if err != nil {
- ch <- fmt.Sprintf("websocket error: %s", err)
+ log.Printf("websocket error: %s", err)
return
}
w := json.NewEncoder(conn)
})
r := json.NewDecoder(conn)
for {
- var msg struct {
- Status int
- ObjectUUID string `json:"object_uuid"`
- EventType string `json:"event_type"`
- Properties struct {
- Text string
- }
- }
+ var msg eventMessage
err := r.Decode(&msg)
if err != nil {
log.Printf("error decoding websocket message: %s", err)
return
}
- if msg.ObjectUUID == uuid {
- for _, line := range strings.Split(msg.Properties.Text, "\n") {
- if line != "" {
- ch <- line
- }
- }
- if msg.EventType == "update" {
- ch <- ""
- }
- }
+ ch <- msg
select {
case <-done:
return