+
+type logStream struct {
+ C <-chan string
+ Close func() error
+}
+
+func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
+ ch := make(chan string)
+ done := make(chan struct{})
+ go func() {
+ defer close(ch)
+ var cluster arvados.Cluster
+ runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
+ wsURL := cluster.Services.Websocket.ExternalURL
+ wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
+ wsURL.Path = "/websocket"
+ wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
+ conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
+ if err != nil {
+ ch <- fmt.Sprintf("websocket error: %s", err)
+ return
+ }
+ w := json.NewEncoder(conn)
+ go w.Encode(map[string]interface{}{
+ "method": "subscribe",
+ "filters": [][]interface{}{
+ {"object_uuid", "=", uuid},
+ {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
+ },
+ })
+ r := json.NewDecoder(conn)
+ for {
+ var msg struct {
+ Status int
+ ObjectUUID string `json:"object_uuid"`
+ EventType string `json:"event_type"`
+ Properties struct {
+ Text string
+ }
+ }
+ err := r.Decode(&msg)
+ if err != nil {
+ log.Printf("error decoding websocket message: %s", err)
+ return
+ }
+ if msg.ObjectUUID == uuid {
+ for _, line := range strings.Split(msg.Properties.Text, "\n") {
+ if line != "" {
+ ch <- line
+ }
+ }
+ if msg.EventType == "update" {
+ ch <- ""
+ }
+ }
+ select {
+ case <-done:
+ return
+ default:
+ }
+ }
+ }()
+ return &logStream{
+ C: ch,
+ Close: func() error {
+ close(done)
+ return nil
+ },
+ }
+}