X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f6071ef7bc7f6b7308c202e330cabd4ca111aadd..028e052db597498ee5c1412b606fa178c621b3ca:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index e489ac79f3..261444a05f 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -21,6 +21,7 @@ const ( Cancelled = arvados.ContainerStateCancelled ) +// Dispatcher struct type Dispatcher struct { Arv *arvadosclient.ArvadosClient @@ -130,7 +131,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) { log.Printf("Error getting list of containers: %q", err) return } - more = list.ItemsAvailable > len(list.Items) + more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset d.checkListForUpdates(list.Items) } } @@ -174,7 +175,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) { } d.trackers[c.UUID] = d.start(c) case Cancelled, Complete: - tracker.close() + // no-op (we already stopped monitoring) } } } @@ -202,6 +203,43 @@ func (d *Dispatcher) Unlock(uuid string) error { return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) } +// TrackContainer ensures a tracker is running for the given UUID, +// regardless of the current state of the container (except: if the +// container is locked by a different dispatcher, a tracker will not +// be started). If the container is not in Locked or Running state, +// the new tracker will close down immediately. +// +// This allows the dispatcher to put its own RunContainer func into a +// cleanup phase (for example, to kill local processes created by a +// prevous dispatch process that are still running even though the +// container state is final) without the risk of having multiple +// goroutines monitoring the same UUID. +func (d *Dispatcher) TrackContainer(uuid string) error { + var cntr arvados.Container + err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) + if err != nil { + return err + } + if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID { + return nil + } + + d.mtx.Lock() + defer d.mtx.Unlock() + if _, alreadyTracking := d.trackers[uuid]; alreadyTracking { + return nil + } + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) + } + d.trackers[uuid] = d.start(cntr) + switch cntr.State { + case Queued, Cancelled, Complete: + d.trackers[uuid].close() + } + return nil +} + type runTracker struct { closing bool updates chan arvados.Container @@ -225,8 +263,3 @@ func (tracker *runTracker) update(c arvados.Container) { } tracker.updates <- c } - -// Start a tracker for the given uuid if one is not already existing, despite its state. -// its vs. it's -- episode 5 from Series 1 of Netflix' "A Series of Unfortunate Events" -func (dispatcher *Dispatcher) TrackContainer(uuid string) { -}