Cancelled = arvados.ContainerStateCancelled
)
+// Dispatcher struct
type Dispatcher struct {
Arv *arvadosclient.ArvadosClient
auth arvados.APIClientAuthorization
mtx sync.Mutex
- running map[string]*runTracker
+ trackers map[string]*runTracker
throttle throttle
}
defer poll.Stop()
for {
+ tracked := d.trackedUUIDs()
d.checkForUpdates([][]interface{}{
- {"uuid", "in", d.runningUUIDs()}})
+ {"uuid", "in", tracked}})
d.checkForUpdates([][]interface{}{
{"locked_by_uuid", "=", d.auth.UUID},
- {"uuid", "not in", d.runningUUIDs()}})
+ {"uuid", "not in", tracked}})
d.checkForUpdates([][]interface{}{
{"state", "=", Queued},
{"priority", ">", "0"},
- {"uuid", "not in", d.runningUUIDs()}})
+ {"uuid", "not in", tracked}})
select {
case <-poll.C:
continue
}
}
-func (d *Dispatcher) runningUUIDs() []string {
+func (d *Dispatcher) trackedUUIDs() []string {
d.mtx.Lock()
defer d.mtx.Unlock()
- if len(d.running) == 0 {
- // API bug: ["uuid", "not in", []] does not match everything
- return []string{"X"}
+ if len(d.trackers) == 0 {
+ // API bug: ["uuid", "not in", []] does not work as
+ // expected, but this does:
+ return []string{"this-uuid-does-not-exist"}
}
- uuids := make([]string, 0, len(d.running))
- for x := range d.running {
+ uuids := make([]string, 0, len(d.trackers))
+ for x := range d.trackers {
uuids = append(uuids, x)
}
return uuids
d.RunContainer(d, c, tracker.updates)
d.mtx.Lock()
- delete(d.running, c.UUID)
+ delete(d.trackers, c.UUID)
d.mtx.Unlock()
}()
return tracker
func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
params := arvadosclient.Dict{
"filters": filters,
- "order": []string{"priority desc"},
- "limit": "1000"}
+ "order": []string{"priority desc"}}
var list arvados.ContainerList
- err := d.Arv.List("containers", params, &list)
- if err != nil {
- log.Printf("Error getting list of containers: %q", err)
- return
- }
-
- if list.ItemsAvailable > len(list.Items) {
- // TODO: support paging
- log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
- list.ItemsAvailable,
- len(list.Items))
+ for offset, more := 0, true; more; offset += len(list.Items) {
+ params["offset"] = offset
+ err := d.Arv.List("containers", params, &list)
+ if err != nil {
+ log.Printf("Error getting list of containers: %q", err)
+ return
+ }
+ more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
+ d.checkListForUpdates(list.Items)
}
+}
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
d.mtx.Lock()
defer d.mtx.Unlock()
- if d.running == nil {
- d.running = make(map[string]*runTracker)
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
}
- for _, c := range list.Items {
- tracker, running := d.running[c.UUID]
+ for _, c := range containers {
+ tracker, alreadyTracking := d.trackers[c.UUID]
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
- } else if running {
+ } else if alreadyTracking {
switch c.State {
case Queued:
tracker.close()
break
}
c.State = Locked
- d.running[c.UUID] = d.start(c)
+ d.trackers[c.UUID] = d.start(c)
case Locked, Running:
if !d.throttle.Check(c.UUID) {
break
}
- d.running[c.UUID] = d.start(c)
+ d.trackers[c.UUID] = d.start(c)
case Cancelled, Complete:
- tracker.close()
+ // no-op (we already stopped monitoring)
}
}
}
return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
}
+// TrackContainer starts a tracker for given uuid if one is not already existing, despite its state.
+func (d *Dispatcher) TrackContainer(uuid string) {
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
+ }
+
+ _, alreadyTracking := d.trackers[uuid]
+ if alreadyTracking {
+ return
+ }
+
+ var cntr arvados.Container
+ err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
+ if err != nil {
+ log.Printf("Error getting container %s: %s", uuid, err)
+ return
+ }
+
+ d.trackers[uuid] = d.start(c)
+}
+
type runTracker struct {
closing bool
updates chan arvados.Container