Merge branch '13933-dispatch-batch-size'
[arvados.git] / sdk / go / dispatch / dispatch.go
index c3d60309992c9368ba7a2d75586db58829c8f2c0..4e25ba4f0603699569402d619127dd4b9fd99fb1 100644 (file)
@@ -29,6 +29,9 @@ const (
 type Dispatcher struct {
        Arv *arvadosclient.ArvadosClient
 
+       // Batch size for container queries
+       BatchSize int64
+
        // Queue polling frequency
        PollPeriod time.Duration
 
@@ -72,6 +75,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
 
+       if d.BatchSize == 0 {
+               d.BatchSize = 100
+       }
+
        for {
                select {
                case <-poll.C:
@@ -159,13 +166,33 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 }
 
 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       var countList arvados.ContainerList
        params := arvadosclient.Dict{
                "filters": filters,
+               "count":   "exact",
+               "limit":   0,
+               "order":   []string{"priority desc"}}
+       err := d.Arv.List("containers", params, &countList)
+       if err != nil {
+               log.Printf("error getting count of containers: %q", err)
+               return false
+       }
+       itemsAvailable := countList.ItemsAvailable
+       params = arvadosclient.Dict{
+               "filters": filters,
+               "count":   "none",
+               "limit":   d.BatchSize,
                "order":   []string{"priority desc"}}
        offset := 0
        for {
                params["offset"] = offset
+
+               // This list variable must be a new one declared
+               // inside the loop: otherwise, items in the API
+               // response would get deep-merged into the items
+               // loaded in previous iterations.
                var list arvados.ContainerList
+
                err := d.Arv.List("containers", params, &list)
                if err != nil {
                        log.Printf("Error getting list of containers: %q", err)
@@ -173,7 +200,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r
                }
                d.checkListForUpdates(list.Items, todo)
                offset += len(list.Items)
-               if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+               if len(list.Items) == 0 || itemsAvailable <= offset {
                        return true
                }
        }