type Dispatcher struct {
Arv *arvadosclient.ArvadosClient
+ // Batch size for container queries
+ BatchSize int64
+
// Queue polling frequency
PollPeriod time.Duration
poll := time.NewTicker(d.PollPeriod)
defer poll.Stop()
+ if d.BatchSize == 0 {
+ d.BatchSize = 100
+ }
+
for {
select {
case <-poll.C:
}
func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+ var countList arvados.ContainerList
params := arvadosclient.Dict{
"filters": filters,
+ "count": "exact",
+ "limit": 0,
"order": []string{"priority desc"}}
-
- var list arvados.ContainerList
- for offset, more := 0, true; more; offset += len(list.Items) {
+ err := d.Arv.List("containers", params, &countList)
+ if err != nil {
+ log.Printf("error getting count of containers: %q", err)
+ return false
+ }
+ itemsAvailable := countList.ItemsAvailable
+ params = arvadosclient.Dict{
+ "filters": filters,
+ "count": "none",
+ "limit": d.BatchSize,
+ "order": []string{"priority desc"}}
+ offset := 0
+ for {
params["offset"] = offset
+
+ // This list variable must be a new one declared
+ // inside the loop: otherwise, items in the API
+ // response would get deep-merged into the items
+ // loaded in previous iterations.
+ var list arvados.ContainerList
+
err := d.Arv.List("containers", params, &list)
if err != nil {
log.Printf("Error getting list of containers: %q", err)
return false
}
- more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
d.checkListForUpdates(list.Items, todo)
+ offset += len(list.Items)
+ if len(list.Items) == 0 || itemsAvailable <= offset {
+ return true
+ }
}
- return true
}
func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
case Queued:
tracker.close()
case Locked, Running:
- if c.SchedulingParameters.MaxRunTime > 0 {
- maxRunTime := time.Duration(c.SchedulingParameters.MaxRunTime) * time.Second
- if time.Since(c.StartedAt) >= maxRunTime {
- // Time's up, schedule container for cancellation
- c.Priority = 0
- }
- }
tracker.update(c)
case Cancelled, Complete:
tracker.close()