type Dispatcher struct {
Arv *arvadosclient.ArvadosClient
+ // Batch size for container queries
+ BatchSize int64
+
// Queue polling frequency
PollPeriod time.Duration
poll := time.NewTicker(d.PollPeriod)
defer poll.Stop()
+ if d.BatchSize == 0 {
+ d.BatchSize = 100
+ }
+
for {
select {
case <-poll.C:
}
func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+ var countList arvados.ContainerList
params := arvadosclient.Dict{
"filters": filters,
+ "count": "exact",
+ "limit": 0,
+ "order": []string{"priority desc"}}
+ err := d.Arv.List("containers", params, &countList)
+ if err != nil {
+ log.Printf("Error getting count of containers: %q", err)
+ return false
+ }
+ itemsAvailable := countList.ItemsAvailable
+ params = arvadosclient.Dict{
+ "filters": filters,
+ "count": "none",
+ "limit": d.BatchSize,
"order": []string{"priority desc"}}
offset := 0
for {
}
d.checkListForUpdates(list.Items, todo)
offset += len(list.Items)
- if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+ if len(list.Items) == 0 || itemsAvailable <= offset {
return true
}
}
// Minimum time between two attempts to run the same container
MinRetryPeriod arvados.Duration
+
+ // Batch size for container queries
+ BatchSize int64
}
func main() {
}
disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
+ BatchSize: disp.BatchSize,
RunContainer: disp.runContainer,
PollPeriod: time.Duration(disp.PollPeriod),
MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
"PollPeriod": "10s",
"SbatchArguments": ["--partition=foo", "--exclude=node13"],
"ReserveExtraRAM": 268435456,
+ "BatchSize": 10000,
}`)
func usage(fs *flag.FlagSet) {