X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c702045c2d8ab4ce332b279018ed3128a688be6c..0eb72b526bf8bbb011551ecf019f604e17a534f1:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 49c756e892..356d087a46 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + // Package dispatch is a helper library for building Arvados container // dispatchers. package dispatch @@ -203,28 +207,41 @@ func (d *Dispatcher) Unlock(uuid string) error { return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) } -// TrackContainer starts a tracker for given uuid if one is not already existing, despite its state. -func (d *Dispatcher) TrackContainer(uuid string) { - d.mtx.Lock() - defer d.mtx.Unlock() - - if d.trackers == nil { - d.trackers = make(map[string]*runTracker) - } - - _, alreadyTracking := d.trackers[uuid] - if alreadyTracking { - return - } - +// TrackContainer ensures a tracker is running for the given UUID, +// regardless of the current state of the container (except: if the +// container is locked by a different dispatcher, a tracker will not +// be started). If the container is not in Locked or Running state, +// the new tracker will close down immediately. +// +// This allows the dispatcher to put its own RunContainer func into a +// cleanup phase (for example, to kill local processes created by a +// prevous dispatch process that are still running even though the +// container state is final) without the risk of having multiple +// goroutines monitoring the same UUID. +func (d *Dispatcher) TrackContainer(uuid string) error { var cntr arvados.Container err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) if err != nil { - log.Printf("Error getting container %s: %s", uuid, err) - return + return err + } + if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID { + return nil } + d.mtx.Lock() + defer d.mtx.Unlock() + if _, alreadyTracking := d.trackers[uuid]; alreadyTracking { + return nil + } + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) + } d.trackers[uuid] = d.start(cntr) + switch cntr.State { + case Queued, Cancelled, Complete: + d.trackers[uuid].close() + } + return nil } type runTracker struct {