Container request priority flag.
[lightning.git] / arvados.go
index c9f2e7a3bac8b87c7dda29d8d43fef7ebd567225..55d96bde0109500e43efc95f46c0af096912d55a 100644 (file)
@@ -5,7 +5,6 @@ import (
        "errors"
        "fmt"
        "io/ioutil"
-       "log"
        "net/url"
        "os"
        "regexp"
@@ -15,6 +14,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
+       log "github.com/sirupsen/logrus"
        "golang.org/x/crypto/blake2b"
        "golang.org/x/net/websocket"
 )
@@ -28,6 +28,7 @@ type arvadosContainerRunner struct {
        Prog        string // if empty, run /proc/self/exe
        Args        []string
        Mounts      map[string]map[string]interface{}
+       Priority    int
 }
 
 func (runner *arvadosContainerRunner) Run() (string, error) {
@@ -60,6 +61,10 @@ func (runner *arvadosContainerRunner) Run() (string, error) {
        }
        command := append([]string{prog}, runner.Args...)
 
+       priority := runner.Priority
+       if priority < 1 {
+               priority = 500
+       }
        rc := arvados.RuntimeConstraints{
                VCPUs:        runner.VCPUs,
                RAM:          runner.RAM,
@@ -76,13 +81,14 @@ func (runner *arvadosContainerRunner) Run() (string, error) {
                        "use_existing":        true,
                        "output_path":         "/mnt/output",
                        "runtime_constraints": rc,
-                       "priority":            1,
+                       "priority":            runner.Priority,
                        "state":               arvados.ContainerRequestStateCommitted,
                },
        })
-       log.Print(cr.UUID)
+       log.Printf("container request UUID: %s", cr.UUID)
+       log.Printf("container UUID: %s", cr.ContainerUUID)
 
-       var logch <-chan string
+       var logch <-chan eventMessage
        var logstream *logStream
        defer func() {
                if logstream != nil {
@@ -94,8 +100,20 @@ func (runner *arvadosContainerRunner) Run() (string, error) {
        defer ticker.Stop()
 
        lastState := cr.State
+       refreshCR := func() {
+               err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
+               if err != nil {
+                       log.Printf("error getting container request: %s", err)
+                       return
+               }
+               if lastState != cr.State {
+                       log.Printf("container state: %s", cr.State)
+                       lastState = cr.State
+               }
+       }
+
        subscribedUUID := ""
-       for cr.State != arvados.ContainerRequestState(arvados.ContainerRequestStateFinal) {
+       for cr.State != arvados.ContainerRequestStateFinal {
                if logch == nil && cr.ContainerUUID != subscribedUUID {
                        if logstream != nil {
                                logstream.Close()
@@ -111,23 +129,18 @@ func (runner *arvadosContainerRunner) Run() (string, error) {
                                logch = nil
                                break
                        }
-                       if msg != "" {
-                               log.Print(msg)
-                               continue
+                       switch msg.EventType {
+                       case "update":
+                               refreshCR()
+                       default:
+                               for _, line := range strings.Split(msg.Properties.Text, "\n") {
+                                       if line != "" {
+                                               log.Print(line)
+                                       }
+                               }
                        }
-                       // empty message indicates an "update" event
-                       // -- fall out of the select and get the
-                       // latest version now, instead of waiting for
-                       // the next timer tick.
                case <-ticker.C:
-               }
-               err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
-               if err != nil {
-                       return "", err
-               }
-               if lastState != cr.State {
-                       log.Printf("container state: %s", cr.State)
-                       lastState = cr.State
+                       refreshCR()
                }
        }
 
@@ -235,13 +248,22 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
        return coll.UUID, nil
 }
 
+type eventMessage struct {
+       Status     int
+       ObjectUUID string `json:"object_uuid"`
+       EventType  string `json:"event_type"`
+       Properties struct {
+               Text string
+       }
+}
+
 type logStream struct {
-       C     <-chan string
+       C     <-chan eventMessage
        Close func() error
 }
 
 func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
-       ch := make(chan string)
+       ch := make(chan eventMessage)
        done := make(chan struct{})
        go func() {
                defer close(ch)
@@ -253,7 +275,7 @@ func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
                wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
                conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
                if err != nil {
-                       ch <- fmt.Sprintf("websocket error: %s", err)
+                       log.Printf("websocket error: %s", err)
                        return
                }
                w := json.NewEncoder(conn)
@@ -266,28 +288,14 @@ func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
                })
                r := json.NewDecoder(conn)
                for {
-                       var msg struct {
-                               Status     int
-                               ObjectUUID string `json:"object_uuid"`
-                               EventType  string `json:"event_type"`
-                               Properties struct {
-                                       Text string
-                               }
-                       }
+                       var msg eventMessage
                        err := r.Decode(&msg)
                        if err != nil {
                                log.Printf("error decoding websocket message: %s", err)
                                return
                        }
                        if msg.ObjectUUID == uuid {
-                               for _, line := range strings.Split(msg.Properties.Text, "\n") {
-                                       if line != "" {
-                                               ch <- line
-                                       }
-                               }
-                               if msg.EventType == "update" {
-                                       ch <- ""
-                               }
+                               ch <- msg
                        }
                        select {
                        case <-done: