Merge branch '13146-shared-rails' refs #13146
[arvados.git] / sdk / go / dispatch / dispatch.go
index ca2dbc48d4053f4e3036f0db6bcdaf4e03a0e064..4e25ba4f0603699569402d619127dd4b9fd99fb1 100644 (file)
@@ -29,6 +29,9 @@ const (
 type Dispatcher struct {
        Arv *arvadosclient.ArvadosClient
 
+       // Batch size for container queries
+       BatchSize int64
+
        // Queue polling frequency
        PollPeriod time.Duration
 
@@ -72,6 +75,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
 
+       if d.BatchSize == 0 {
+               d.BatchSize = 100
+       }
+
        for {
                select {
                case <-poll.C:
@@ -159,22 +166,44 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 }
 
 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       var countList arvados.ContainerList
        params := arvadosclient.Dict{
                "filters": filters,
+               "count":   "exact",
+               "limit":   0,
                "order":   []string{"priority desc"}}
-
-       var list arvados.ContainerList
-       for offset, more := 0, true; more; offset += len(list.Items) {
+       err := d.Arv.List("containers", params, &countList)
+       if err != nil {
+               log.Printf("error getting count of containers: %q", err)
+               return false
+       }
+       itemsAvailable := countList.ItemsAvailable
+       params = arvadosclient.Dict{
+               "filters": filters,
+               "count":   "none",
+               "limit":   d.BatchSize,
+               "order":   []string{"priority desc"}}
+       offset := 0
+       for {
                params["offset"] = offset
+
+               // This list variable must be a new one declared
+               // inside the loop: otherwise, items in the API
+               // response would get deep-merged into the items
+               // loaded in previous iterations.
+               var list arvados.ContainerList
+
                err := d.Arv.List("containers", params, &list)
                if err != nil {
                        log.Printf("Error getting list of containers: %q", err)
                        return false
                }
-               more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
                d.checkListForUpdates(list.Items, todo)
+               offset += len(list.Items)
+               if len(list.Items) == 0 || itemsAvailable <= offset {
+                       return true
+               }
        }
-       return true
 }
 
 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
@@ -195,13 +224,6 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
                        case Queued:
                                tracker.close()
                        case Locked, Running:
-                               if c.SchedulingParameters.MaxRunTime > 0 {
-                                       maxRunTime := time.Duration(c.SchedulingParameters.MaxRunTime) * time.Second
-                                       if time.Since(c.StartedAt) >= maxRunTime {
-                                               // Time's up, schedule container for cancellation
-                                               c.Priority = 0
-                                       }
-                               }
                                tracker.update(c)
                        case Cancelled, Complete:
                                tracker.close()