-func (dispatcher *Dispatcher) handleUpdate(container Container) {
- if dispatcher.updateMine(container) {
- if container.State == Complete || container.State == Cancelled {
- log.Printf("Container %v now in state %v", container.UUID, container.State)
- dispatcher.notMine(container.UUID)
+// Start a runner in a new goroutine, and send the initial container
+// record to its updates channel.
+func (d *Dispatcher) start(c arvados.Container) *runTracker {
+ tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+ tracker.updates <- c
+ go func() {
+ d.RunContainer(d, c, tracker.updates)
+ // RunContainer blocks for the lifetime of the container. When
+ // it returns, the tracker should delete itself.
+ d.mtx.Lock()
+ delete(d.trackers, c.UUID)
+ d.mtx.Unlock()
+ }()
+ return tracker
+}
+
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+ params := arvadosclient.Dict{
+ "filters": filters,
+ "order": []string{"priority desc"}}
+
+ var list arvados.ContainerList
+ for offset, more := 0, true; more; offset += len(list.Items) {
+ params["offset"] = offset
+ err := d.Arv.List("containers", params, &list)
+ if err != nil {
+ log.Printf("Error getting list of containers: %q", err)
+ return false