Merge branch '10847-nodemanager-shutdown' refs #10847
[arvados.git] / sdk / go / dispatch / dispatch.go
index 371cbb1a62781f72d5f17914cabc369391b76538..261444a05fd8b91166245a98c35fe6da680175c2 100644 (file)
@@ -131,7 +131,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
                        log.Printf("Error getting list of containers: %q", err)
                        return
                }
-               more = list.ItemsAvailable > len(list.Items)
+               more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
                d.checkListForUpdates(list.Items)
        }
 }
@@ -175,7 +175,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
                                }
                                d.trackers[c.UUID] = d.start(c)
                        case Cancelled, Complete:
-                               tracker.close()
+                               // no-op (we already stopped monitoring)
                        }
                }
        }
@@ -203,36 +203,41 @@ func (d *Dispatcher) Unlock(uuid string) error {
        return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
 }
 
-// TrackContainer starts a tracker for given uuid if one is not already existing, despite its state.
-func (d *Dispatcher) TrackContainer(uuid string) {
-       if d.trackers == nil {
-               d.trackers = make(map[string]*runTracker)
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
+       var cntr arvados.Container
+       err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
+       if err != nil {
+               return err
        }
-
-       _, alreadyTracking := d.trackers[uuid]
-       if alreadyTracking {
-               return
+       if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+               return nil
        }
 
        d.mtx.Lock()
        defer d.mtx.Unlock()
-
-       _, alreadyTracking = d.trackers[uuid]
-       if alreadyTracking {
-               return
+       if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+               return nil
        }
-
-       var cntr arvados.Container
-       err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
-       if err != nil {
-               log.Printf("Error getting container %s: %s", uuid, err)
-               return
+       if d.trackers == nil {
+               d.trackers = make(map[string]*runTracker)
        }
-
-       tracker := &runTracker{updates: make(chan arvados.Container, 1)}
-       tracker.updates <- cntr
-
-       d.trackers[uuid] = tracker
+       d.trackers[uuid] = d.start(cntr)
+       switch cntr.State {
+       case Queued, Cancelled, Complete:
+               d.trackers[uuid].close()
+       }
+       return nil
 }
 
 type runTracker struct {