From 420a8e7fb7b159452da834062cc3e040dd1b411b Mon Sep 17 00:00:00 2001 From: "Joshua C. Randall" Date: Mon, 13 Aug 2018 12:01:10 +0000 Subject: [PATCH] Make dispatcher Run loop use count=none and BatchSize limit Arvados-DCO-1.1-Signed-off-by: Joshua C. Randall --- sdk/go/dispatch/dispatch.go | 23 ++++++++++++++++++- .../crunch-dispatch-slurm.go | 4 ++++ services/crunch-dispatch-slurm/usage.go | 1 + 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index c3d6030999..e9fb6b6d93 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -29,6 +29,9 @@ const ( type Dispatcher struct { Arv *arvadosclient.ArvadosClient + // Batch size for container queries + BatchSize int64 + // Queue polling frequency PollPeriod time.Duration @@ -72,6 +75,10 @@ func (d *Dispatcher) Run(ctx context.Context) error { poll := time.NewTicker(d.PollPeriod) defer poll.Stop() + if d.BatchSize == 0 { + d.BatchSize = 100 + } + for { select { case <-poll.C: @@ -159,8 +166,22 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { } func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool { + var countList arvados.ContainerList params := arvadosclient.Dict{ "filters": filters, + "count": "exact", + "limit": 0, + "order": []string{"priority desc"}} + err := d.Arv.List("containers", params, &countList) + if err != nil { + log.Printf("Error getting count of containers: %q", err) + return false + } + itemsAvailable := countList.ItemsAvailable + params = arvadosclient.Dict{ + "filters": filters, + "count": "none", + "limit": d.BatchSize, "order": []string{"priority desc"}} offset := 0 for { @@ -173,7 +194,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r } d.checkListForUpdates(list.Items, todo) offset += len(list.Items) - if len(list.Items) == 0 || list.ItemsAvailable <= offset { + if len(list.Items) == 0 || itemsAvailable <= offset { return true } } diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index d1f19dd7b5..c1009a5d8e 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -57,6 +57,9 @@ type Dispatcher struct { // Minimum time between two attempts to run the same container MinRetryPeriod arvados.Duration + + // Batch size for container queries + BatchSize int64 } func main() { @@ -164,6 +167,7 @@ func (disp *Dispatcher) setup() { } disp.Dispatcher = &dispatch.Dispatcher{ Arv: arv, + BatchSize: disp.BatchSize, RunContainer: disp.runContainer, PollPeriod: time.Duration(disp.PollPeriod), MinRetryPeriod: time.Duration(disp.MinRetryPeriod), diff --git a/services/crunch-dispatch-slurm/usage.go b/services/crunch-dispatch-slurm/usage.go index 032d86284d..db2395cdf5 100644 --- a/services/crunch-dispatch-slurm/usage.go +++ b/services/crunch-dispatch-slurm/usage.go @@ -22,6 +22,7 @@ var exampleConfigFile = []byte(` "PollPeriod": "10s", "SbatchArguments": ["--partition=foo", "--exclude=node13"], "ReserveExtraRAM": 268435456, + "BatchSize": 10000, }`) func usage(fs *flag.FlagSet) { -- 2.30.2