X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bef56b9a22efac9ce73006623080e84a0b57f243..70e5c7a3c6a5860d702d5e5c219dc0f3a3696d35:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index c3d6030999..4e25ba4f06 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -29,6 +29,9 @@ const ( type Dispatcher struct { Arv *arvadosclient.ArvadosClient + // Batch size for container queries + BatchSize int64 + // Queue polling frequency PollPeriod time.Duration @@ -72,6 +75,10 @@ func (d *Dispatcher) Run(ctx context.Context) error { poll := time.NewTicker(d.PollPeriod) defer poll.Stop() + if d.BatchSize == 0 { + d.BatchSize = 100 + } + for { select { case <-poll.C: @@ -159,13 +166,33 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { } func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool { + var countList arvados.ContainerList params := arvadosclient.Dict{ "filters": filters, + "count": "exact", + "limit": 0, + "order": []string{"priority desc"}} + err := d.Arv.List("containers", params, &countList) + if err != nil { + log.Printf("error getting count of containers: %q", err) + return false + } + itemsAvailable := countList.ItemsAvailable + params = arvadosclient.Dict{ + "filters": filters, + "count": "none", + "limit": d.BatchSize, "order": []string{"priority desc"}} offset := 0 for { params["offset"] = offset + + // This list variable must be a new one declared + // inside the loop: otherwise, items in the API + // response would get deep-merged into the items + // loaded in previous iterations. var list arvados.ContainerList + err := d.Arv.List("containers", params, &list) if err != nil { log.Printf("Error getting list of containers: %q", err) @@ -173,7 +200,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r } d.checkListForUpdates(list.Items, todo) offset += len(list.Items) - if len(list.Items) == 0 || list.ItemsAvailable <= offset { + if len(list.Items) == 0 || itemsAvailable <= offset { return true } }