X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/148039cae7d9c4260b290a97414b2f22495a8759..8a9c807d4b80f61a8485c0fc07abfe23f9d50202:/arvados.go diff --git a/arvados.go b/arvados.go index 9cef74a21c..cc41ba2c9a 100644 --- a/arvados.go +++ b/arvados.go @@ -9,6 +9,7 @@ import ( "os" "regexp" "strings" + "sync" "time" "git.arvados.org/arvados.git/sdk/go/arvados" @@ -19,6 +20,162 @@ import ( "golang.org/x/net/websocket" ) +type eventMessage struct { + Status int + ObjectUUID string `json:"object_uuid"` + EventType string `json:"event_type"` + Properties struct { + Text string + } +} + +type arvadosClient struct { + *arvados.Client + notifying map[string]map[chan<- eventMessage]int + wantClose chan struct{} + wsconn *websocket.Conn + mtx sync.Mutex +} + +// Listen for events concerning the given uuids. When an event occurs +// (and after connecting/reconnecting to the event stream), send each +// uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will +// be sent only once for each update, but two Unsubscribe calls will +// be needed to stop sending them. +func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) { + client.mtx.Lock() + defer client.mtx.Unlock() + if client.notifying == nil { + client.notifying = map[string]map[chan<- eventMessage]int{} + client.wantClose = make(chan struct{}) + go client.runNotifier() + } + chmap := client.notifying[uuid] + if chmap == nil { + chmap = map[chan<- eventMessage]int{} + client.notifying[uuid] = chmap + } + needSub := true + for _, nch := range chmap { + if nch > 0 { + needSub = false + break + } + } + chmap[ch]++ + if needSub && client.wsconn != nil { + go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{ + "method": "subscribe", + "filters": [][]interface{}{ + {"object_uuid", "=", uuid}, + {"event_type", "in", []string{"stderr", "crunch-run", "update"}}, + }, + }) + } +} + +func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) { + client.mtx.Lock() + defer client.mtx.Unlock() + chmap := client.notifying[uuid] + if n := chmap[ch] - 1; n == 0 { + delete(chmap, ch) + if len(chmap) == 0 { + delete(client.notifying, uuid) + } + if client.wsconn != nil { + go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{ + "method": "unsubscribe", + "filters": [][]interface{}{ + {"object_uuid", "=", uuid}, + {"event_type", "in", []string{"stderr", "crunch-run", "update"}}, + }, + }) + } + } else if n > 0 { + chmap[ch] = n + } +} + +func (client *arvadosClient) Close() { + client.mtx.Lock() + defer client.mtx.Unlock() + if client.notifying != nil { + client.notifying = nil + close(client.wantClose) + } +} + +func (client *arvadosClient) runNotifier() { +reconnect: + for { + var cluster arvados.Cluster + err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil) + if err != nil { + log.Warnf("error getting cluster config: %s", err) + time.Sleep(5 * time.Second) + continue reconnect + } + wsURL := cluster.Services.Websocket.ExternalURL + wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1) + wsURL.Path = "/websocket" + wsURLNoToken := wsURL.String() + wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode() + conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String()) + if err != nil { + log.Warnf("websocket connection error: %s", err) + time.Sleep(5 * time.Second) + continue reconnect + } + log.Printf("connected to websocket at %s", wsURLNoToken) + + client.mtx.Lock() + client.wsconn = conn + resubscribe := make([]string, 0, len(client.notifying)) + for uuid := range client.notifying { + resubscribe = append(resubscribe, uuid) + } + client.mtx.Unlock() + + go func() { + w := json.NewEncoder(conn) + for _, uuid := range resubscribe { + w.Encode(map[string]interface{}{ + "method": "subscribe", + "filters": [][]interface{}{ + {"object_uuid", "=", uuid}, + {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}}, + }, + }) + } + }() + + r := json.NewDecoder(conn) + for { + var msg eventMessage + err := r.Decode(&msg) + select { + case <-client.wantClose: + return + default: + if err != nil { + log.Printf("error decoding websocket message: %s", err) + client.mtx.Lock() + client.wsconn = nil + client.mtx.Unlock() + go conn.Close() + continue reconnect + } + client.mtx.Lock() + for ch := range client.notifying[msg.ObjectUUID] { + ch <- msg + } + client.mtx.Unlock() + } + } + } +} + type arvadosContainerRunner struct { Client *arvados.Client Name string @@ -28,6 +185,7 @@ type arvadosContainerRunner struct { Prog string // if empty, run /proc/self/exe Args []string Mounts map[string]map[string]interface{} + Priority int } func (runner *arvadosContainerRunner) Run() (string, error) { @@ -37,9 +195,8 @@ func (runner *arvadosContainerRunner) Run() (string, error) { mounts := map[string]map[string]interface{}{ "/mnt/output": { - "kind": "tmp", + "kind": "collection", "writable": true, - "capacity": 100000000000, }, } for path, mnt := range runner.Mounts { @@ -60,6 +217,10 @@ func (runner *arvadosContainerRunner) Run() (string, error) { } command := append([]string{prog}, runner.Args...) + priority := runner.Priority + if priority < 1 { + priority = 500 + } rc := arvados.RuntimeConstraints{ VCPUs: runner.VCPUs, RAM: runner.RAM, @@ -76,75 +237,92 @@ func (runner *arvadosContainerRunner) Run() (string, error) { "use_existing": true, "output_path": "/mnt/output", "runtime_constraints": rc, - "priority": 1, + "priority": runner.Priority, "state": arvados.ContainerRequestStateCommitted, }, }) + if err != nil { + return "", err + } log.Printf("container request UUID: %s", cr.UUID) log.Printf("container UUID: %s", cr.ContainerUUID) - var logch <-chan eventMessage - var logstream *logStream + logch := make(chan eventMessage) + client := arvadosClient{Client: runner.Client} + defer client.Close() + subscribedUUID := "" defer func() { - if logstream != nil { - logstream.Close() + if subscribedUUID != "" { + client.Unsubscribe(logch, subscribedUUID) } }() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() + neednewline := "" + lastState := cr.State refreshCR := func() { err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil) if err != nil { + fmt.Fprint(os.Stderr, neednewline) log.Printf("error getting container request: %s", err) return } if lastState != cr.State { - log.Printf("container state: %s", cr.State) + fmt.Fprint(os.Stderr, neednewline) + log.Printf("container request state: %s", cr.State) lastState = cr.State } + if subscribedUUID != cr.ContainerUUID { + fmt.Fprint(os.Stderr, neednewline) + neednewline = "" + if subscribedUUID != "" { + client.Unsubscribe(logch, subscribedUUID) + } + client.Subscribe(logch, cr.ContainerUUID) + subscribedUUID = cr.ContainerUUID + } } - subscribedUUID := "" + var reCrunchstat = regexp.MustCompile(`mem .* rss`) for cr.State != arvados.ContainerRequestStateFinal { - if logch == nil && cr.ContainerUUID != subscribedUUID { - if logstream != nil { - logstream.Close() - } - logstream = runner.logStream(cr.ContainerUUID) - logch = logstream.C - } select { - case msg, ok := <-logch: - if !ok { - logstream.Close() - logstream = nil - logch = nil - break - } + case <-ticker.C: + refreshCR() + case msg := <-logch: switch msg.EventType { case "update": refreshCR() - default: + case "stderr": for _, line := range strings.Split(msg.Properties.Text, "\n") { if line != "" { + fmt.Fprint(os.Stderr, neednewline) + neednewline = "" log.Print(line) } } + case "crunchstat": + for _, line := range strings.Split(msg.Properties.Text, "\n") { + mem := reCrunchstat.FindString(line) + if mem != "" { + fmt.Fprintf(os.Stderr, "%s \r", mem) + neednewline = "\n" + } + } } - case <-ticker.C: - refreshCR() } } + fmt.Fprint(os.Stderr, neednewline) var c arvados.Container err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil) if err != nil { return "", err - } - if c.ExitCode != 0 { + } else if c.State != arvados.ContainerStateComplete { + return "", fmt.Errorf("container did not complete: %s", c.State) + } else if c.ExitCode != 0 { return "", fmt.Errorf("container exited %d", c.ExitCode) } return cr.OutputUUID, err @@ -242,66 +420,3 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { log.Printf("stored lightning binary in new collection %s", coll.UUID) return coll.UUID, nil } - -type eventMessage struct { - Status int - ObjectUUID string `json:"object_uuid"` - EventType string `json:"event_type"` - Properties struct { - Text string - } -} - -type logStream struct { - C <-chan eventMessage - Close func() error -} - -func (runner *arvadosContainerRunner) logStream(uuid string) *logStream { - ch := make(chan eventMessage) - done := make(chan struct{}) - go func() { - defer close(ch) - var cluster arvados.Cluster - runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil) - wsURL := cluster.Services.Websocket.ExternalURL - wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1) - wsURL.Path = "/websocket" - wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode() - conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String()) - if err != nil { - log.Printf("websocket error: %s", err) - return - } - w := json.NewEncoder(conn) - go w.Encode(map[string]interface{}{ - "method": "subscribe", - "filters": [][]interface{}{ - {"object_uuid", "=", uuid}, - {"event_type", "in", []string{"stderr", "crunch-run", "update"}}, - }, - }) - r := json.NewDecoder(conn) - for { - var msg eventMessage - err := r.Decode(&msg) - if err != nil { - log.Printf("error decoding websocket message: %s", err) - return - } - ch <- msg - select { - case <-done: - return - default: - } - } - }() - return &logStream{ - C: ch, - Close: func() error { - close(done) - return nil - }, - } -}