X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/9fa407809201ed8dae34f62c983d997c161a547a..8a9c807d4b80f61a8485c0fc07abfe23f9d50202:/arvados.go diff --git a/arvados.go b/arvados.go index bfeedc2679..cc41ba2c9a 100644 --- a/arvados.go +++ b/arvados.go @@ -119,6 +119,7 @@ reconnect: wsURL := cluster.Services.Websocket.ExternalURL wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1) wsURL.Path = "/websocket" + wsURLNoToken := wsURL.String() wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode() conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String()) if err != nil { @@ -126,20 +127,28 @@ reconnect: time.Sleep(5 * time.Second) continue reconnect } + log.Printf("connected to websocket at %s", wsURLNoToken) + client.mtx.Lock() client.wsconn = conn - client.mtx.Unlock() - - w := json.NewEncoder(conn) + resubscribe := make([]string, 0, len(client.notifying)) for uuid := range client.notifying { - w.Encode(map[string]interface{}{ - "method": "subscribe", - "filters": [][]interface{}{ - {"object_uuid", "=", uuid}, - {"event_type", "in", []string{"stderr", "crunch-run", "update"}}, - }, - }) + resubscribe = append(resubscribe, uuid) } + client.mtx.Unlock() + + go func() { + w := json.NewEncoder(conn) + for _, uuid := range resubscribe { + w.Encode(map[string]interface{}{ + "method": "subscribe", + "filters": [][]interface{}{ + {"object_uuid", "=", uuid}, + {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}}, + }, + }) + } + }() r := json.NewDecoder(conn) for { @@ -157,9 +166,11 @@ reconnect: go conn.Close() continue reconnect } + client.mtx.Lock() for ch := range client.notifying[msg.ObjectUUID] { ch <- msg } + client.mtx.Unlock() } } } @@ -184,9 +195,8 @@ func (runner *arvadosContainerRunner) Run() (string, error) { mounts := map[string]map[string]interface{}{ "/mnt/output": { - "kind": "tmp", + "kind": "collection", "writable": true, - "capacity": 100000000000, }, } for path, mnt := range runner.Mounts { @@ -250,18 +260,24 @@ func (runner *arvadosContainerRunner) Run() (string, error) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() + neednewline := "" + lastState := cr.State refreshCR := func() { err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil) if err != nil { + fmt.Fprint(os.Stderr, neednewline) log.Printf("error getting container request: %s", err) return } if lastState != cr.State { - log.Printf("container state: %s", cr.State) + fmt.Fprint(os.Stderr, neednewline) + log.Printf("container request state: %s", cr.State) lastState = cr.State } if subscribedUUID != cr.ContainerUUID { + fmt.Fprint(os.Stderr, neednewline) + neednewline = "" if subscribedUUID != "" { client.Unsubscribe(logch, subscribedUUID) } @@ -270,6 +286,7 @@ func (runner *arvadosContainerRunner) Run() (string, error) { } } + var reCrunchstat = regexp.MustCompile(`mem .* rss`) for cr.State != arvados.ContainerRequestStateFinal { select { case <-ticker.C: @@ -278,22 +295,34 @@ func (runner *arvadosContainerRunner) Run() (string, error) { switch msg.EventType { case "update": refreshCR() - default: + case "stderr": for _, line := range strings.Split(msg.Properties.Text, "\n") { if line != "" { + fmt.Fprint(os.Stderr, neednewline) + neednewline = "" log.Print(line) } } + case "crunchstat": + for _, line := range strings.Split(msg.Properties.Text, "\n") { + mem := reCrunchstat.FindString(line) + if mem != "" { + fmt.Fprintf(os.Stderr, "%s \r", mem) + neednewline = "\n" + } + } } } } + fmt.Fprint(os.Stderr, neednewline) var c arvados.Container err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil) if err != nil { return "", err - } - if c.ExitCode != 0 { + } else if c.State != arvados.ContainerStateComplete { + return "", fmt.Errorf("container did not complete: %s", c.State) + } else if c.ExitCode != 0 { return "", fmt.Errorf("container exited %d", c.ExitCode) } return cr.OutputUUID, err