X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/63bae17d784c2c1522a087d71a0fcb2a9b6eddcd..7b3836631ed085a3debb43bebeff9a06c15505c3:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 74cefed057..df43c2b10d 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -12,9 +12,9 @@ import ( "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "github.com/Sirupsen/logrus" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "github.com/sirupsen/logrus" ) const ( @@ -37,6 +37,9 @@ type Dispatcher struct { Logger Logger + // Batch size for container queries + BatchSize int + // Queue polling frequency PollPeriod time.Duration @@ -84,6 +87,10 @@ func (d *Dispatcher) Run(ctx context.Context) error { poll := time.NewTicker(d.PollPeriod) defer poll.Stop() + if d.BatchSize == 0 { + d.BatchSize = 100 + } + for { select { case <-poll.C: @@ -174,8 +181,22 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { } func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool { + var countList arvados.ContainerList params := arvadosclient.Dict{ "filters": filters, + "count": "exact", + "limit": 0, + "order": []string{"priority desc"}} + err := d.Arv.List("containers", params, &countList) + if err != nil { + d.Logger.Warnf("error getting count of containers: %q", err) + return false + } + itemsAvailable := countList.ItemsAvailable + params = arvadosclient.Dict{ + "filters": filters, + "count": "none", + "limit": d.BatchSize, "order": []string{"priority desc"}} offset := 0 for { @@ -194,7 +215,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r } d.checkListForUpdates(list.Items, todo) offset += len(list.Items) - if len(list.Items) == 0 || list.ItemsAvailable <= offset { + if len(list.Items) == 0 || itemsAvailable <= offset { return true } } @@ -230,7 +251,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma } err := d.lock(c.UUID) if err != nil { - d.Logger.Debugf("error locking container %s: %s", c.UUID, err) + d.Logger.Warnf("error locking container %s: %s", c.UUID, err) break } c.State = Locked