Make dispatcher Run loop use count=none and BatchSize limit
authorJoshua C. Randall <jcrandall@alum.mit.edu>
Mon, 13 Aug 2018 12:01:10 +0000 (12:01 +0000)
committerJoshua C. Randall <jcrandall@alum.mit.edu>
Tue, 14 Aug 2018 00:47:06 +0000 (00:47 +0000)
Arvados-DCO-1.1-Signed-off-by: Joshua C. Randall <jcrandall@alum.mit.edu>

sdk/go/dispatch/dispatch.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/usage.go

index c3d60309992c9368ba7a2d75586db58829c8f2c0..e9fb6b6d93086f06e47d654e5e498bbc75190f9e 100644 (file)
@@ -29,6 +29,9 @@ const (
 type Dispatcher struct {
        Arv *arvadosclient.ArvadosClient
 
+       // Batch size for container queries
+       BatchSize int64
+
        // Queue polling frequency
        PollPeriod time.Duration
 
@@ -72,6 +75,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
 
+       if d.BatchSize == 0 {
+               d.BatchSize = 100
+       }
+
        for {
                select {
                case <-poll.C:
@@ -159,8 +166,22 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 }
 
 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       var countList arvados.ContainerList
        params := arvadosclient.Dict{
                "filters": filters,
+               "count": "exact",
+               "limit": 0,
+               "order":   []string{"priority desc"}}
+       err := d.Arv.List("containers", params, &countList)
+       if err != nil {
+               log.Printf("Error getting count of containers: %q", err)
+               return false
+       }
+       itemsAvailable := countList.ItemsAvailable      
+       params = arvadosclient.Dict{
+               "filters": filters,
+               "count": "none",
+               "limit": d.BatchSize,
                "order":   []string{"priority desc"}}
        offset := 0
        for {
@@ -173,7 +194,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r
                }
                d.checkListForUpdates(list.Items, todo)
                offset += len(list.Items)
-               if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+               if len(list.Items) == 0 || itemsAvailable <= offset {
                        return true
                }
        }
index d1f19dd7b5702e2431471bc7ce2164f553a8cb11..c1009a5d8ed007810a517842d79067811abf03cf 100644 (file)
@@ -57,6 +57,9 @@ type Dispatcher struct {
 
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
+
+       // Batch size for container queries
+       BatchSize int64
 }
 
 func main() {
@@ -164,6 +167,7 @@ func (disp *Dispatcher) setup() {
        }
        disp.Dispatcher = &dispatch.Dispatcher{
                Arv:            arv,
+               BatchSize:      disp.BatchSize,
                RunContainer:   disp.runContainer,
                PollPeriod:     time.Duration(disp.PollPeriod),
                MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
index 032d86284d5e0a9fc8a3d712a0283597ec29d765..db2395cdf546a570d8fd17781bb15bfc06297183 100644 (file)
@@ -22,6 +22,7 @@ var exampleConfigFile = []byte(`
        "PollPeriod": "10s",
        "SbatchArguments": ["--partition=foo", "--exclude=node13"],
        "ReserveExtraRAM": 268435456,
+       "BatchSize": 10000,
     }`)
 
 func usage(fs *flag.FlagSet) {