- touched := make(map[string]bool)
- dispatcher.getContainers(paramsQ, touched)
- dispatcher.getContainers(paramsP, touched)
- dispatcher.mineMutex.Lock()
- var monitored []string
- for k := range dispatcher.mineMap {
- if _, ok := touched[k]; !ok {
- monitored = append(monitored, k)
+ select {
+ case <-poll.C:
+ break
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+
+ todo := make(map[string]*runTracker)
+ d.mtx.Lock()
+ // Make a copy of trackers
+ for uuid, tracker := range d.trackers {
+ todo[uuid] = tracker
+ }
+ d.mtx.Unlock()
+
+ // Containers I currently own (Locked/Running)
+ querySuccess := d.checkForUpdates([][]interface{}{
+ {"locked_by_uuid", "=", d.auth.UUID}}, todo)
+
+ // Containers I should try to dispatch
+ querySuccess = d.checkForUpdates([][]interface{}{
+ {"state", "=", Queued},
+ {"priority", ">", "0"}}, todo) && querySuccess
+
+ if !querySuccess {
+ // There was an error in one of the previous queries,
+ // we probably didn't get updates for all the
+ // containers we should have. Don't check them
+ // individually because it may be expensive.
+ continue
+ }
+
+ // Containers I know about but didn't fall into the
+ // above two categories (probably Complete/Cancelled)
+ var missed []string
+ for uuid := range todo {
+ missed = append(missed, uuid)
+ }
+
+ for len(missed) > 0 {
+ var batch []string
+ if len(missed) > 20 {
+ batch = missed[0:20]
+ missed = missed[20:]
+ } else {
+ batch = missed
+ missed = missed[0:0]