17830: Adds test exposing incorrect X-Request-Id format on railsAPI.
[arvados.git] / sdk / go / dispatch / dispatch.go
index 152207ea94bb4bb4c4e21de36ca2b440691109a2..df43c2b10d9778d7c62befc8a3f7e71babacb168 100644 (file)
@@ -12,9 +12,9 @@ import (
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "github.com/Sirupsen/logrus"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "github.com/sirupsen/logrus"
 )
 
 const (
@@ -37,6 +37,9 @@ type Dispatcher struct {
 
        Logger Logger
 
+       // Batch size for container queries
+       BatchSize int
+
        // Queue polling frequency
        PollPeriod time.Duration
 
@@ -84,6 +87,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
 
+       if d.BatchSize == 0 {
+               d.BatchSize = 100
+       }
+
        for {
                select {
                case <-poll.C:
@@ -174,8 +181,22 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 }
 
 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       var countList arvados.ContainerList
        params := arvadosclient.Dict{
                "filters": filters,
+               "count":   "exact",
+               "limit":   0,
+               "order":   []string{"priority desc"}}
+       err := d.Arv.List("containers", params, &countList)
+       if err != nil {
+               d.Logger.Warnf("error getting count of containers: %q", err)
+               return false
+       }
+       itemsAvailable := countList.ItemsAvailable
+       params = arvadosclient.Dict{
+               "filters": filters,
+               "count":   "none",
+               "limit":   d.BatchSize,
                "order":   []string{"priority desc"}}
        offset := 0
        for {
@@ -194,7 +215,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r
                }
                d.checkListForUpdates(list.Items, todo)
                offset += len(list.Items)
-               if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+               if len(list.Items) == 0 || itemsAvailable <= offset {
                        return true
                }
        }