Merge branch '10847-nodemanager-shutdown' refs #10847
[arvados.git] / sdk / go / dispatch / dispatch.go
index 5b19485c9e398f5ffba1a3f5c4080b156edb4194..261444a05fd8b91166245a98c35fe6da680175c2 100644 (file)
@@ -21,6 +21,7 @@ const (
        Cancelled = arvados.ContainerStateCancelled
 )
 
+// Dispatcher struct
 type Dispatcher struct {
        Arv *arvadosclient.ArvadosClient
 
@@ -36,7 +37,7 @@ type Dispatcher struct {
 
        auth     arvados.APIClientAuthorization
        mtx      sync.Mutex
-       running  map[string]*runTracker
+       trackers map[string]*runTracker
        throttle throttle
 }
 
@@ -68,15 +69,16 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        defer poll.Stop()
 
        for {
+               tracked := d.trackedUUIDs()
                d.checkForUpdates([][]interface{}{
-                       {"uuid", "in", d.runningUUIDs()}})
+                       {"uuid", "in", tracked}})
                d.checkForUpdates([][]interface{}{
                        {"locked_by_uuid", "=", d.auth.UUID},
-                       {"uuid", "not in", d.runningUUIDs()}})
+                       {"uuid", "not in", tracked}})
                d.checkForUpdates([][]interface{}{
                        {"state", "=", Queued},
                        {"priority", ">", "0"},
-                       {"uuid", "not in", d.runningUUIDs()}})
+                       {"uuid", "not in", tracked}})
                select {
                case <-poll.C:
                        continue
@@ -86,15 +88,16 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        }
 }
 
-func (d *Dispatcher) runningUUIDs() []string {
+func (d *Dispatcher) trackedUUIDs() []string {
        d.mtx.Lock()
        defer d.mtx.Unlock()
-       if len(d.running) == 0 {
-               // API bug: ["uuid", "not in", []] does not match everything
-               return []string{"X"}
+       if len(d.trackers) == 0 {
+               // API bug: ["uuid", "not in", []] does not work as
+               // expected, but this does:
+               return []string{"this-uuid-does-not-exist"}
        }
-       uuids := make([]string, 0, len(d.running))
-       for x := range d.running {
+       uuids := make([]string, 0, len(d.trackers))
+       for x := range d.trackers {
                uuids = append(uuids, x)
        }
        return uuids
@@ -109,7 +112,7 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
                d.RunContainer(d, c, tracker.updates)
 
                d.mtx.Lock()
-               delete(d.running, c.UUID)
+               delete(d.trackers, c.UUID)
                d.mtx.Unlock()
        }()
        return tracker
@@ -128,7 +131,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
                        log.Printf("Error getting list of containers: %q", err)
                        return
                }
-               more = list.ItemsAvailable > len(list.Items)
+               more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
                d.checkListForUpdates(list.Items)
        }
 }
@@ -136,15 +139,15 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
        d.mtx.Lock()
        defer d.mtx.Unlock()
-       if d.running == nil {
-               d.running = make(map[string]*runTracker)
+       if d.trackers == nil {
+               d.trackers = make(map[string]*runTracker)
        }
 
        for _, c := range containers {
-               tracker, running := d.running[c.UUID]
+               tracker, alreadyTracking := d.trackers[c.UUID]
                if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
                        log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
-               } else if running {
+               } else if alreadyTracking {
                        switch c.State {
                        case Queued:
                                tracker.close()
@@ -165,14 +168,14 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
                                        break
                                }
                                c.State = Locked
-                               d.running[c.UUID] = d.start(c)
+                               d.trackers[c.UUID] = d.start(c)
                        case Locked, Running:
                                if !d.throttle.Check(c.UUID) {
                                        break
                                }
-                               d.running[c.UUID] = d.start(c)
+                               d.trackers[c.UUID] = d.start(c)
                        case Cancelled, Complete:
-                               tracker.close()
+                               // no-op (we already stopped monitoring)
                        }
                }
        }
@@ -200,6 +203,43 @@ func (d *Dispatcher) Unlock(uuid string) error {
        return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
 }
 
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
+       var cntr arvados.Container
+       err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
+       if err != nil {
+               return err
+       }
+       if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+               return nil
+       }
+
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+       if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+               return nil
+       }
+       if d.trackers == nil {
+               d.trackers = make(map[string]*runTracker)
+       }
+       d.trackers[uuid] = d.start(cntr)
+       switch cntr.State {
+       case Queued, Cancelled, Complete:
+               d.trackers[uuid].close()
+       }
+       return nil
+}
+
 type runTracker struct {
        closing bool
        updates chan arvados.Container