X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a2fe6f9367de3ee93064fbee3f2df78ce84aa318..123153139bbee3674c81325653d87fa19ffbe0e4:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 5b19485c9e..261444a05f 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -21,6 +21,7 @@ const ( Cancelled = arvados.ContainerStateCancelled ) +// Dispatcher struct type Dispatcher struct { Arv *arvadosclient.ArvadosClient @@ -36,7 +37,7 @@ type Dispatcher struct { auth arvados.APIClientAuthorization mtx sync.Mutex - running map[string]*runTracker + trackers map[string]*runTracker throttle throttle } @@ -68,15 +69,16 @@ func (d *Dispatcher) Run(ctx context.Context) error { defer poll.Stop() for { + tracked := d.trackedUUIDs() d.checkForUpdates([][]interface{}{ - {"uuid", "in", d.runningUUIDs()}}) + {"uuid", "in", tracked}}) d.checkForUpdates([][]interface{}{ {"locked_by_uuid", "=", d.auth.UUID}, - {"uuid", "not in", d.runningUUIDs()}}) + {"uuid", "not in", tracked}}) d.checkForUpdates([][]interface{}{ {"state", "=", Queued}, {"priority", ">", "0"}, - {"uuid", "not in", d.runningUUIDs()}}) + {"uuid", "not in", tracked}}) select { case <-poll.C: continue @@ -86,15 +88,16 @@ func (d *Dispatcher) Run(ctx context.Context) error { } } -func (d *Dispatcher) runningUUIDs() []string { +func (d *Dispatcher) trackedUUIDs() []string { d.mtx.Lock() defer d.mtx.Unlock() - if len(d.running) == 0 { - // API bug: ["uuid", "not in", []] does not match everything - return []string{"X"} + if len(d.trackers) == 0 { + // API bug: ["uuid", "not in", []] does not work as + // expected, but this does: + return []string{"this-uuid-does-not-exist"} } - uuids := make([]string, 0, len(d.running)) - for x := range d.running { + uuids := make([]string, 0, len(d.trackers)) + for x := range d.trackers { uuids = append(uuids, x) } return uuids @@ -109,7 +112,7 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { d.RunContainer(d, c, tracker.updates) d.mtx.Lock() - delete(d.running, c.UUID) + delete(d.trackers, c.UUID) d.mtx.Unlock() }() return tracker @@ -128,7 +131,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) { log.Printf("Error getting list of containers: %q", err) return } - more = list.ItemsAvailable > len(list.Items) + more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset d.checkListForUpdates(list.Items) } } @@ -136,15 +139,15 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) { func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) { d.mtx.Lock() defer d.mtx.Unlock() - if d.running == nil { - d.running = make(map[string]*runTracker) + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) } for _, c := range containers { - tracker, running := d.running[c.UUID] + tracker, alreadyTracking := d.trackers[c.UUID] if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID { log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID) - } else if running { + } else if alreadyTracking { switch c.State { case Queued: tracker.close() @@ -165,14 +168,14 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) { break } c.State = Locked - d.running[c.UUID] = d.start(c) + d.trackers[c.UUID] = d.start(c) case Locked, Running: if !d.throttle.Check(c.UUID) { break } - d.running[c.UUID] = d.start(c) + d.trackers[c.UUID] = d.start(c) case Cancelled, Complete: - tracker.close() + // no-op (we already stopped monitoring) } } } @@ -200,6 +203,43 @@ func (d *Dispatcher) Unlock(uuid string) error { return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) } +// TrackContainer ensures a tracker is running for the given UUID, +// regardless of the current state of the container (except: if the +// container is locked by a different dispatcher, a tracker will not +// be started). If the container is not in Locked or Running state, +// the new tracker will close down immediately. +// +// This allows the dispatcher to put its own RunContainer func into a +// cleanup phase (for example, to kill local processes created by a +// prevous dispatch process that are still running even though the +// container state is final) without the risk of having multiple +// goroutines monitoring the same UUID. +func (d *Dispatcher) TrackContainer(uuid string) error { + var cntr arvados.Container + err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) + if err != nil { + return err + } + if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID { + return nil + } + + d.mtx.Lock() + defer d.mtx.Unlock() + if _, alreadyTracking := d.trackers[uuid]; alreadyTracking { + return nil + } + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) + } + d.trackers[uuid] = d.start(cntr) + switch cntr.State { + case Queued, Cancelled, Complete: + d.trackers[uuid].close() + } + return nil +} + type runTracker struct { closing bool updates chan arvados.Container