Cancelled = arvados.ContainerStateCancelled
)
+// Dispatcher struct
type Dispatcher struct {
Arv *arvadosclient.ArvadosClient
auth arvados.APIClientAuthorization
mtx sync.Mutex
- running map[string]*runTracker
+ trackers map[string]*runTracker
throttle throttle
}
defer poll.Stop()
for {
+ tracked := d.trackedUUIDs()
d.checkForUpdates([][]interface{}{
- {"uuid", "in", d.runningUUIDs()}})
+ {"uuid", "in", tracked}})
d.checkForUpdates([][]interface{}{
{"locked_by_uuid", "=", d.auth.UUID},
- {"uuid", "not in", d.runningUUIDs()}})
+ {"uuid", "not in", tracked}})
d.checkForUpdates([][]interface{}{
{"state", "=", Queued},
{"priority", ">", "0"},
- {"uuid", "not in", d.runningUUIDs()}})
+ {"uuid", "not in", tracked}})
select {
case <-poll.C:
continue
}
}
-func (d *Dispatcher) runningUUIDs() []string {
+func (d *Dispatcher) trackedUUIDs() []string {
d.mtx.Lock()
defer d.mtx.Unlock()
- if len(d.running) == 0 {
- // API bug: ["uuid", "not in", []] does not match everything
- return []string{"X"}
+ if len(d.trackers) == 0 {
+ // API bug: ["uuid", "not in", []] does not work as
+ // expected, but this does:
+ return []string{"this-uuid-does-not-exist"}
}
- uuids := make([]string, 0, len(d.running))
- for x := range d.running {
+ uuids := make([]string, 0, len(d.trackers))
+ for x := range d.trackers {
uuids = append(uuids, x)
}
return uuids
d.RunContainer(d, c, tracker.updates)
d.mtx.Lock()
- delete(d.running, c.UUID)
+ delete(d.trackers, c.UUID)
d.mtx.Unlock()
}()
return tracker
log.Printf("Error getting list of containers: %q", err)
return
}
- more = list.ItemsAvailable > len(list.Items)
+ more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
d.checkListForUpdates(list.Items)
}
}
func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
d.mtx.Lock()
defer d.mtx.Unlock()
- if d.running == nil {
- d.running = make(map[string]*runTracker)
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
}
for _, c := range containers {
- tracker, running := d.running[c.UUID]
+ tracker, alreadyTracking := d.trackers[c.UUID]
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
- } else if running {
+ } else if alreadyTracking {
switch c.State {
case Queued:
tracker.close()
break
}
c.State = Locked
- d.running[c.UUID] = d.start(c)
+ d.trackers[c.UUID] = d.start(c)
case Locked, Running:
if !d.throttle.Check(c.UUID) {
break
}
- d.running[c.UUID] = d.start(c)
+ d.trackers[c.UUID] = d.start(c)
case Cancelled, Complete:
- tracker.close()
+ // no-op (we already stopped monitoring)
}
}
}
return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
}
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
+ var cntr arvados.Container
+ err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
+ if err != nil {
+ return err
+ }
+ if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+ return nil
+ }
+
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+ return nil
+ }
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
+ }
+ d.trackers[uuid] = d.start(cntr)
+ switch cntr.State {
+ case Queued, Cancelled, Complete:
+ d.trackers[uuid].close()
+ }
+ return nil
+}
+
type runTracker struct {
closing bool
updates chan arvados.Container