13933: Fix formatting.
[arvados.git] / sdk / go / dispatch / dispatch.go
index 3289c67b013f37a67ae8ddeaa52d3fd74abe34e5..ba2cf289c8cb029b1c38369d833d3cc8dfde9cbb 100644 (file)
@@ -29,6 +29,9 @@ const (
 type Dispatcher struct {
        Arv *arvadosclient.ArvadosClient
 
+       // Batch size for container queries
+       BatchSize int64
+
        // Queue polling frequency
        PollPeriod time.Duration
 
@@ -72,6 +75,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
 
+       if d.BatchSize == 0 {
+               d.BatchSize = 100
+       }
+
        for {
                select {
                case <-poll.C:
@@ -159,22 +166,38 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 }
 
 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       var countList arvados.ContainerList
        params := arvadosclient.Dict{
                "filters": filters,
+               "count":   "exact",
+               "limit":   0,
                "order":   []string{"priority desc"}}
-
-       var list arvados.ContainerList
-       for offset, more := 0, true; more; offset += len(list.Items) {
+       err := d.Arv.List("containers", params, &countList)
+       if err != nil {
+               log.Printf("error getting count of containers: %q", err)
+               return false
+       }
+       itemsAvailable := countList.ItemsAvailable
+       params = arvadosclient.Dict{
+               "filters": filters,
+               "count":   "none",
+               "limit":   d.BatchSize,
+               "order":   []string{"priority desc"}}
+       offset := 0
+       for {
                params["offset"] = offset
+               var list arvados.ContainerList
                err := d.Arv.List("containers", params, &list)
                if err != nil {
                        log.Printf("Error getting list of containers: %q", err)
                        return false
                }
-               more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
                d.checkListForUpdates(list.Items, todo)
+               offset += len(list.Items)
+               if len(list.Items) == 0 || itemsAvailable <= offset {
+                       return true
+               }
        }
-       return true
 }
 
 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {