X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/57c0644d4fdc9c0cb67bba753c966a5e4d80d311..8a9c807d4b80f61a8485c0fc07abfe23f9d50202:/arvados.go diff --git a/arvados.go b/arvados.go index bd4f58a5e5..cc41ba2c9a 100644 --- a/arvados.go +++ b/arvados.go @@ -119,6 +119,7 @@ reconnect: wsURL := cluster.Services.Websocket.ExternalURL wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1) wsURL.Path = "/websocket" + wsURLNoToken := wsURL.String() wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode() conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String()) if err != nil { @@ -126,7 +127,7 @@ reconnect: time.Sleep(5 * time.Second) continue reconnect } - log.Printf("connected to websocket at %s", wsURL) + log.Printf("connected to websocket at %s", wsURLNoToken) client.mtx.Lock() client.wsconn = conn @@ -143,7 +144,7 @@ reconnect: "method": "subscribe", "filters": [][]interface{}{ {"object_uuid", "=", uuid}, - {"event_type", "in", []string{"stderr", "crunch-run", "update"}}, + {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}}, }, }) } @@ -259,18 +260,24 @@ func (runner *arvadosContainerRunner) Run() (string, error) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() + neednewline := "" + lastState := cr.State refreshCR := func() { err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil) if err != nil { + fmt.Fprint(os.Stderr, neednewline) log.Printf("error getting container request: %s", err) return } if lastState != cr.State { + fmt.Fprint(os.Stderr, neednewline) log.Printf("container request state: %s", cr.State) lastState = cr.State } if subscribedUUID != cr.ContainerUUID { + fmt.Fprint(os.Stderr, neednewline) + neednewline = "" if subscribedUUID != "" { client.Unsubscribe(logch, subscribedUUID) } @@ -279,6 +286,7 @@ func (runner *arvadosContainerRunner) Run() (string, error) { } } + var reCrunchstat = regexp.MustCompile(`mem .* rss`) for cr.State != arvados.ContainerRequestStateFinal { select { case <-ticker.C: @@ -287,15 +295,26 @@ func (runner *arvadosContainerRunner) Run() (string, error) { switch msg.EventType { case "update": refreshCR() - default: + case "stderr": for _, line := range strings.Split(msg.Properties.Text, "\n") { if line != "" { + fmt.Fprint(os.Stderr, neednewline) + neednewline = "" log.Print(line) } } + case "crunchstat": + for _, line := range strings.Split(msg.Properties.Text, "\n") { + mem := reCrunchstat.FindString(line) + if mem != "" { + fmt.Fprintf(os.Stderr, "%s \r", mem) + neednewline = "\n" + } + } } } } + fmt.Fprint(os.Stderr, neednewline) var c arvados.Container err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)