X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/25af6c40181b95f13ec3e9e366c53cb50868d065..123153139bbee3674c81325653d87fa19ffbe0e4:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 5c3611ce3d..261444a05f 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -131,7 +131,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) { log.Printf("Error getting list of containers: %q", err) return } - more = list.ItemsAvailable > len(list.Items) + more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset d.checkListForUpdates(list.Items) } } @@ -203,28 +203,41 @@ func (d *Dispatcher) Unlock(uuid string) error { return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) } -// TrackContainer starts a tracker for given uuid if one is not already existing, despite its state. -func (d *Dispatcher) TrackContainer(uuid string) { +// TrackContainer ensures a tracker is running for the given UUID, +// regardless of the current state of the container (except: if the +// container is locked by a different dispatcher, a tracker will not +// be started). If the container is not in Locked or Running state, +// the new tracker will close down immediately. +// +// This allows the dispatcher to put its own RunContainer func into a +// cleanup phase (for example, to kill local processes created by a +// prevous dispatch process that are still running even though the +// container state is final) without the risk of having multiple +// goroutines monitoring the same UUID. +func (d *Dispatcher) TrackContainer(uuid string) error { + var cntr arvados.Container + err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) + if err != nil { + return err + } + if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID { + return nil + } + d.mtx.Lock() defer d.mtx.Unlock() - + if _, alreadyTracking := d.trackers[uuid]; alreadyTracking { + return nil + } if d.trackers == nil { d.trackers = make(map[string]*runTracker) } - - _, alreadyTracking := d.trackers[uuid] - if alreadyTracking { - return + d.trackers[uuid] = d.start(cntr) + switch cntr.State { + case Queued, Cancelled, Complete: + d.trackers[uuid].close() } - - var cntr arvados.Container - err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) - if err != nil { - log.Printf("Error getting container %s: %s", uuid, err) - return - } - - d.trackers[uuid] = d.start(c) + return nil } type runTracker struct {