12446: Avoid listing every container uuid in status query.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 17 Oct 2017 14:30:42 +0000 (10:30 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 19 Oct 2017 15:14:39 +0000 (11:14 -0400)
sdk/go/dispatch/dispatch.go

index 356d087a46f31c7f97ce92ad975300f0a0959990..3289c67b013f37a67ae8ddeaa52d3fd74abe34e5 100644 (file)
@@ -73,38 +73,73 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        defer poll.Stop()
 
        for {
-               tracked := d.trackedUUIDs()
-               d.checkForUpdates([][]interface{}{
-                       {"uuid", "in", tracked}})
-               d.checkForUpdates([][]interface{}{
-                       {"locked_by_uuid", "=", d.auth.UUID},
-                       {"uuid", "not in", tracked}})
-               d.checkForUpdates([][]interface{}{
-                       {"state", "=", Queued},
-                       {"priority", ">", "0"},
-                       {"uuid", "not in", tracked}})
                select {
                case <-poll.C:
-                       continue
+                       break
                case <-ctx.Done():
                        return ctx.Err()
                }
-       }
-}
 
-func (d *Dispatcher) trackedUUIDs() []string {
-       d.mtx.Lock()
-       defer d.mtx.Unlock()
-       if len(d.trackers) == 0 {
-               // API bug: ["uuid", "not in", []] does not work as
-               // expected, but this does:
-               return []string{"this-uuid-does-not-exist"}
-       }
-       uuids := make([]string, 0, len(d.trackers))
-       for x := range d.trackers {
-               uuids = append(uuids, x)
+               todo := make(map[string]*runTracker)
+               d.mtx.Lock()
+               // Make a copy of trackers
+               for uuid, tracker := range d.trackers {
+                       todo[uuid] = tracker
+               }
+               d.mtx.Unlock()
+
+               // Containers I currently own (Locked/Running)
+               querySuccess := d.checkForUpdates([][]interface{}{
+                       {"locked_by_uuid", "=", d.auth.UUID}}, todo)
+
+               // Containers I should try to dispatch
+               querySuccess = d.checkForUpdates([][]interface{}{
+                       {"state", "=", Queued},
+                       {"priority", ">", "0"}}, todo) && querySuccess
+
+               if !querySuccess {
+                       // There was an error in one of the previous queries,
+                       // we probably didn't get updates for all the
+                       // containers we should have.  Don't check them
+                       // individually because it may be expensive.
+                       continue
+               }
+
+               // Containers I know about but didn't fall into the
+               // above two categories (probably Complete/Cancelled)
+               var missed []string
+               for uuid := range todo {
+                       missed = append(missed, uuid)
+               }
+
+               for len(missed) > 0 {
+                       var batch []string
+                       if len(missed) > 20 {
+                               batch = missed[0:20]
+                               missed = missed[20:]
+                       } else {
+                               batch = missed
+                               missed = missed[0:0]
+                       }
+                       querySuccess = d.checkForUpdates([][]interface{}{
+                               {"uuid", "in", batch}}, todo) && querySuccess
+               }
+
+               if !querySuccess {
+                       // There was an error in one of the previous queries, we probably
+                       // didn't see all the containers we should have, so don't shut down
+                       // the missed containers.
+                       continue
+               }
+
+               // Containers that I know about that didn't show up in any
+               // query should be let go.
+               for uuid, tracker := range todo {
+                       log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+                       tracker.close()
+               }
+
        }
-       return uuids
 }
 
 // Start a runner in a new goroutine, and send the initial container
@@ -114,7 +149,8 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
        tracker.updates <- c
        go func() {
                d.RunContainer(d, c, tracker.updates)
-
+               // RunContainer blocks for the lifetime of the container.  When
+               // it returns, the tracker should delete itself.
                d.mtx.Lock()
                delete(d.trackers, c.UUID)
                d.mtx.Unlock()
@@ -122,7 +158,7 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
        return tracker
 }
 
-func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
        params := arvadosclient.Dict{
                "filters": filters,
                "order":   []string{"priority desc"}}
@@ -133,14 +169,15 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
                err := d.Arv.List("containers", params, &list)
                if err != nil {
                        log.Printf("Error getting list of containers: %q", err)
-                       return
+                       return false
                }
                more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
-               d.checkListForUpdates(list.Items)
+               d.checkListForUpdates(list.Items, todo)
        }
+       return true
 }
 
-func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
        d.mtx.Lock()
        defer d.mtx.Unlock()
        if d.trackers == nil {
@@ -149,6 +186,8 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
 
        for _, c := range containers {
                tracker, alreadyTracking := d.trackers[c.UUID]
+               delete(todo, c.UUID)
+
                if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
                        log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
                } else if alreadyTracking {