9187: If a container is reported Queued, but we are monitoring it, stop monitoring it.
[arvados.git] / sdk / go / dispatch / dispatch.go
index 355ed7c1084a0feed96cab06ab8bdbe5cd3d2d6c..9fe2e396fc03ea19cf716040a96842fcdd8f00cc 100644 (file)
@@ -1,3 +1,6 @@
+// Framework for monitoring the Arvados container Queue, Locks container
+// records, and runs goroutine callbacks which implement execution and
+// monitoring of the containers.
 package dispatch
 
 import (
@@ -28,7 +31,7 @@ type apiClientAuthorizationList struct {
        Items []apiClientAuthorization `json:"items"`
 }
 
-// Container data
+// Represents an Arvados container record
 type Container struct {
        UUID               string           `json:"uuid"`
        State              string           `json:"state"`
@@ -45,14 +48,32 @@ type ContainerList struct {
 
 // Dispatcher holds the state of the dispatcher
 type Dispatcher struct {
-       Arv            arvadosclient.ArvadosClient
-       RunContainer   func(*Dispatcher, Container, chan Container)
-       PollInterval   time.Duration
+       // The Arvados client
+       Arv arvadosclient.ArvadosClient
+
+       // When a new queued container appears and is either already owned by
+       // this dispatcher or is successfully locked, the dispatcher will call
+       // go RunContainer().  The RunContainer() goroutine gets a channel over
+       // which it will receive updates to the container state.  The
+       // RunContainer() goroutine should only assume status updates come when
+       // the container record changes on the API server; if it needs to
+       // monitor the job submission to the underlying slurm/grid engine/etc
+       // queue it should spin up its own polling goroutines.  When the
+       // channel is closed, that means the container is no longer being
+       // handled by this dispatcher and the goroutine should terminate.  The
+       // goroutine is responsible for draining the 'status' channel, failure
+       // to do so may deadlock the dispatcher.
+       RunContainer func(*Dispatcher, Container, chan Container)
+
+       // Amount of time to wait between polling for updates.
+       PollInterval time.Duration
+
+       // Channel used to signal that RunDispatcher loop should exit.
        DoneProcessing chan struct{}
 
        mineMutex  sync.Mutex
        mineMap    map[string]chan Container
-       auth       apiClientAuthorization
+       Auth       apiClientAuthorization
        containers chan Container
 }
 
@@ -82,14 +103,16 @@ func (dispatcher *Dispatcher) notMine(uuid string) {
 }
 
 // Check if there is a channel for updates associated with this container.  If
-// so send the container record on the channel and return true, if not return
-// false.
-func (dispatcher *Dispatcher) updateMine(c Container) bool {
+// so (and update is true) send the container record on the channel and return
+// true, if not return false.
+func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool {
        dispatcher.mineMutex.Lock()
        defer dispatcher.mineMutex.Unlock()
        ch, ok := dispatcher.mineMap[c.UUID]
        if ok {
-               ch <- c
+               if update {
+                       ch <- c
+               }
                return true
        }
        return false
@@ -100,17 +123,18 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
        err := dispatcher.Arv.List("containers", params, &containers)
        if err != nil {
                log.Printf("Error getting list of containers: %q", err)
-       } else {
-               if containers.ItemsAvailable > len(containers.Items) {
-                       // TODO: support paging
-                       log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
-                               containers.ItemsAvailable,
-                               len(containers.Items))
-               }
-               for _, container := range containers.Items {
-                       touched[container.UUID] = true
-                       dispatcher.containers <- container
-               }
+               return
+       }
+
+       if containers.ItemsAvailable > len(containers.Items) {
+               // TODO: support paging
+               log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+                       containers.ItemsAvailable,
+                       len(containers.Items))
+       }
+       for _, container := range containers.Items {
+               touched[container.UUID] = true
+               dispatcher.containers <- container
        }
 }
 
@@ -122,7 +146,7 @@ func (dispatcher *Dispatcher) pollContainers() {
                "order":   []string{"priority desc"},
                "limit":   "1000"}
        paramsP := arvadosclient.Dict{
-               "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+               "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
                "limit":   "1000"}
 
        for {
@@ -152,11 +176,23 @@ func (dispatcher *Dispatcher) pollContainers() {
 }
 
 func (dispatcher *Dispatcher) handleUpdate(container Container) {
-       if dispatcher.updateMine(container) {
-               if container.State == Complete || container.State == Cancelled {
-                       log.Printf("Container %v now in state %v", container.UUID, container.State)
-                       dispatcher.notMine(container.UUID)
-               }
+       if container.State == Queued && dispatcher.checkMine(container, false) {
+               dispatcher.notMine(container.UUID)
+       }
+
+       if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
+               // If container is Complete, Cancelled, or Queued, LockedByUUID
+               // will be nil.  If the container was formerly Locked, moved
+               // back to Queued and then locked by another dispatcher,
+               // LockedByUUID will be different.  In either case, we want
+               // to stop monitoring it.
+               log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
+               dispatcher.notMine(container.UUID)
+               return
+       }
+
+       if dispatcher.checkMine(container, true) {
+               // Already monitored, sent status update
                return
        }
 
@@ -169,6 +205,8 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
        }
 
        if container.State == Locked || container.State == Running {
+               // Not currently monitored but in Locked or Running state and
+               // owned by this dispatcher, so start monitoring.
                go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
        }
 }
@@ -180,7 +218,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
                        "container": arvadosclient.Dict{"state": newState}},
                nil)
        if err != nil {
-               log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+               log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
        }
        return err
 }
@@ -188,16 +226,8 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
 // RunDispatcher runs the main loop of the dispatcher until receiving a message
 // on the dispatcher.DoneProcessing channel.  It also installs a signal handler
 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-//
-// When a new queued container appears and is successfully locked, the
-// dispatcher will call RunContainer() followed by MonitorContainer().  If a
-// container appears that is Locked or Running but not known to the dispatcher,
-// it will only call monitorContainer().  The monitorContainer() callback is
-// passed a channel over which it will receive updates to the container state.
-// The callback is responsible for draining the channel, if it fails to do so
-// it will deadlock the dispatcher.
 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
-       err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+       err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
        if err != nil {
                log.Printf("Error getting my token UUID: %v", err)
                return