Merge branch '9857-cwl-acceptlist-re' refs #9857
[arvados.git] / sdk / go / dispatch / dispatch.go
index fb7b5fb799c6b3d30b9c0975a6318be1626d1289..4987c01055203e262b2534e2f001b200c7de21eb 100644 (file)
@@ -4,6 +4,7 @@
 package dispatch
 
 import (
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "log"
        "os"
@@ -13,43 +14,18 @@ import (
        "time"
 )
 
-// Constants for container states
 const (
-       Queued    = "Queued"
-       Locked    = "Locked"
-       Running   = "Running"
-       Complete  = "Complete"
-       Cancelled = "Cancelled"
+       Queued    = arvados.ContainerStateQueued
+       Locked    = arvados.ContainerStateLocked
+       Running   = arvados.ContainerStateRunning
+       Complete  = arvados.ContainerStateComplete
+       Cancelled = arvados.ContainerStateCancelled
 )
 
-type apiClientAuthorization struct {
-       UUID     string `json:"uuid"`
-       APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
-       Items []apiClientAuthorization `json:"items"`
-}
-
-// Represents an Arvados container record
-type Container struct {
-       UUID               string           `json:"uuid"`
-       State              string           `json:"state"`
-       Priority           int              `json:"priority"`
-       RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
-       LockedByUUID       string           `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
-       Items          []Container `json:"items"`
-       ItemsAvailable int         `json:"items_available"`
-}
-
 // Dispatcher holds the state of the dispatcher
 type Dispatcher struct {
        // The Arvados client
-       Arv arvadosclient.ArvadosClient
+       Arv *arvadosclient.ArvadosClient
 
        // When a new queued container appears and is either already owned by
        // this dispatcher or is successfully locked, the dispatcher will call
@@ -63,7 +39,7 @@ type Dispatcher struct {
        // handled by this dispatcher and the goroutine should terminate.  The
        // goroutine is responsible for draining the 'status' channel, failure
        // to do so may deadlock the dispatcher.
-       RunContainer func(*Dispatcher, Container, chan Container)
+       RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
 
        // Amount of time to wait between polling for updates.
        PollInterval time.Duration
@@ -72,22 +48,22 @@ type Dispatcher struct {
        DoneProcessing chan struct{}
 
        mineMutex  sync.Mutex
-       mineMap    map[string]chan Container
-       Auth       apiClientAuthorization
-       containers chan Container
+       mineMap    map[string]chan arvados.Container
+       Auth       arvados.APIClientAuthorization
+       containers chan arvados.Container
 }
 
 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
 // for which this process is actively starting/monitoring.  Returns channel to
 // be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
        dispatcher.mineMutex.Lock()
        defer dispatcher.mineMutex.Unlock()
        if ch, ok := dispatcher.mineMap[uuid]; ok {
                return ch
        }
 
-       ch := make(chan Container)
+       ch := make(chan arvados.Container)
        dispatcher.mineMap[uuid] = ch
        return ch
 }
@@ -102,22 +78,24 @@ func (dispatcher *Dispatcher) notMine(uuid string) {
        }
 }
 
-// Check if there is a channel for updates associated with this container.  If
-// so send the container record on the channel and return true, if not return
-// false.
-func (dispatcher *Dispatcher) updateMine(c Container) bool {
+// checkMine returns true if there is a channel for updates associated
+// with container c.  If update is true, also send the container record on
+// the channel.
+func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
        dispatcher.mineMutex.Lock()
        defer dispatcher.mineMutex.Unlock()
        ch, ok := dispatcher.mineMap[c.UUID]
        if ok {
-               ch <- c
+               if update {
+                       ch <- c
+               }
                return true
        }
        return false
 }
 
 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
-       var containers ContainerList
+       var containers arvados.ContainerList
        err := dispatcher.Arv.List("containers", params, &containers)
        if err != nil {
                log.Printf("Error getting list of containers: %q", err)
@@ -173,7 +151,15 @@ func (dispatcher *Dispatcher) pollContainers() {
        }
 }
 
-func (dispatcher *Dispatcher) handleUpdate(container Container) {
+func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
+       if container.State == Queued && dispatcher.checkMine(container, false) {
+               // If we previously started the job, something failed, and it
+               // was re-queued, this dispatcher might still be monitoring it.
+               // Stop the existing monitor, then try to lock and run it
+               // again.
+               dispatcher.notMine(container.UUID)
+       }
+
        if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
                // If container is Complete, Cancelled, or Queued, LockedByUUID
                // will be nil.  If the container was formerly Locked, moved
@@ -185,14 +171,14 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
                return
        }
 
-       if dispatcher.updateMine(container) {
+       if dispatcher.checkMine(container, true) {
                // Already monitored, sent status update
                return
        }
 
-       if container.State == Queued {
+       if container.State == Queued && container.Priority > 0 {
                // Try to take the lock
-               if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+               if err := dispatcher.Lock(container.UUID); err != nil {
                        return
                }
                container.State = Locked
@@ -206,7 +192,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
 }
 
 // UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
        err := dispatcher.Arv.Update("containers", uuid,
                arvadosclient.Dict{
                        "container": arvadosclient.Dict{"state": newState}},
@@ -217,6 +203,24 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
        return err
 }
 
+// Lock makes the lock API call which updates the state of a container to Locked.
+func (dispatcher *Dispatcher) Lock(uuid string) error {
+       err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
+       if err != nil {
+               log.Printf("Error locking container %s: %q", uuid, err)
+       }
+       return err
+}
+
+// Unlock makes the unlock API call which updates the state of a container to Queued.
+func (dispatcher *Dispatcher) Unlock(uuid string) error {
+       err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
+       if err != nil {
+               log.Printf("Error unlocking container %s: %q", uuid, err)
+       }
+       return err
+}
+
 // RunDispatcher runs the main loop of the dispatcher until receiving a message
 // on the dispatcher.DoneProcessing channel.  It also installs a signal handler
 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
@@ -227,8 +231,8 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) {
                return
        }
 
-       dispatcher.mineMap = make(map[string]chan Container)
-       dispatcher.containers = make(chan Container)
+       dispatcher.mineMap = make(map[string]chan arvados.Container)
+       dispatcher.containers = make(chan arvados.Container)
 
        // Graceful shutdown on signal
        sigChan := make(chan os.Signal)