X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e34a5060cfc1cc4821b431e8aa6778a31898e0eb..028e052db597498ee5c1412b606fa178c621b3ca:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index ce960c0772..261444a05f 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -1,14 +1,16 @@ -// Framework for monitoring the Arvados container Queue, Locks container -// records, and runs goroutine callbacks which implement execution and -// monitoring of the containers. +// Package dispatch is a helper library for building Arvados container +// dispatchers. package dispatch import ( - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "context" + "fmt" "log" "sync" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" ) const ( @@ -19,231 +21,245 @@ const ( Cancelled = arvados.ContainerStateCancelled ) -// Dispatcher holds the state of the dispatcher +// Dispatcher struct type Dispatcher struct { - // The Arvados client Arv *arvadosclient.ArvadosClient - // When a new queued container appears and is either already owned by - // this dispatcher or is successfully locked, the dispatcher will call - // go RunContainer(). The RunContainer() goroutine gets a channel over - // which it will receive updates to the container state. The - // RunContainer() goroutine should only assume status updates come when - // the container record changes on the API server; if it needs to - // monitor the job submission to the underlying slurm/grid engine/etc - // queue it should spin up its own polling goroutines. When the - // channel is closed, that means the container is no longer being - // handled by this dispatcher and the goroutine should terminate. The - // goroutine is responsible for draining the 'status' channel, failure - // to do so may deadlock the dispatcher. - RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container) - - // Amount of time to wait between polling for updates. + // Queue polling frequency PollPeriod time.Duration - // Minimum time between two attempts to run the same container + // Time to wait between successive attempts to run the same container MinRetryPeriod time.Duration - mineMutex sync.Mutex - mineMap map[string]chan arvados.Container - Auth arvados.APIClientAuthorization + // Func that implements the container lifecycle. Must be set + // to a non-nil DispatchFunc before calling Run(). + RunContainer DispatchFunc + auth arvados.APIClientAuthorization + mtx sync.Mutex + trackers map[string]*runTracker throttle throttle - - stop chan struct{} } -// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones -// for which this process is actively starting/monitoring. Returns channel to -// be used to send container status updates. -func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - if ch, ok := dispatcher.mineMap[uuid]; ok { - return ch +// A DispatchFunc executes a container (if the container record is +// Locked) or resume monitoring an already-running container, and wait +// until that container exits. +// +// While the container runs, the DispatchFunc should listen for +// updated container records on the provided channel. When the channel +// closes, the DispatchFunc should stop the container if it's still +// running, and return. +// +// The DispatchFunc should not return until the container is finished. +type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) + +// Run watches the API server's queue for containers that are either +// ready to run and available to lock, or are already locked by this +// dispatcher's token. When a new one appears, Run calls RunContainer +// in a new goroutine. +func (d *Dispatcher) Run(ctx context.Context) error { + err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth) + if err != nil { + return fmt.Errorf("error getting my token UUID: %v", err) } - ch := make(chan arvados.Container) - dispatcher.mineMap[uuid] = ch - return ch -} + d.throttle.hold = d.MinRetryPeriod -// Release a container which is no longer being monitored. -func (dispatcher *Dispatcher) notMine(uuid string) { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - if ch, ok := dispatcher.mineMap[uuid]; ok { - close(ch) - delete(dispatcher.mineMap, uuid) - } -} + poll := time.NewTicker(d.PollPeriod) + defer poll.Stop() -// checkMine returns true if there is a channel for updates associated -// with container c. If update is true, also send the container record on -// the channel. -func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - ch, ok := dispatcher.mineMap[c.UUID] - if ok { - if update { - ch <- c + for { + tracked := d.trackedUUIDs() + d.checkForUpdates([][]interface{}{ + {"uuid", "in", tracked}}) + d.checkForUpdates([][]interface{}{ + {"locked_by_uuid", "=", d.auth.UUID}, + {"uuid", "not in", tracked}}) + d.checkForUpdates([][]interface{}{ + {"state", "=", Queued}, + {"priority", ">", "0"}, + {"uuid", "not in", tracked}}) + select { + case <-poll.C: + continue + case <-ctx.Done(): + return ctx.Err() } - return true } - return false } -func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) { - var containers arvados.ContainerList - err := dispatcher.Arv.List("containers", params, &containers) - if err != nil { - log.Printf("Error getting list of containers: %q", err) - return - } - - if containers.ItemsAvailable > len(containers.Items) { - // TODO: support paging - log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.", - containers.ItemsAvailable, - len(containers.Items)) +func (d *Dispatcher) trackedUUIDs() []string { + d.mtx.Lock() + defer d.mtx.Unlock() + if len(d.trackers) == 0 { + // API bug: ["uuid", "not in", []] does not work as + // expected, but this does: + return []string{"this-uuid-does-not-exist"} } - for _, container := range containers.Items { - touched[container.UUID] = true - dispatcher.handleUpdate(container) + uuids := make([]string, 0, len(d.trackers)) + for x := range d.trackers { + uuids = append(uuids, x) } + return uuids } -func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) { - ticker := time.NewTicker(dispatcher.PollPeriod) - defer ticker.Stop() +// Start a runner in a new goroutine, and send the initial container +// record to its updates channel. +func (d *Dispatcher) start(c arvados.Container) *runTracker { + tracker := &runTracker{updates: make(chan arvados.Container, 1)} + tracker.updates <- c + go func() { + d.RunContainer(d, c, tracker.updates) - paramsQ := arvadosclient.Dict{ - "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}}, - "order": []string{"priority desc"}, - "limit": "1000"} - paramsP := arvadosclient.Dict{ - "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}}, - "limit": "1000"} + d.mtx.Lock() + delete(d.trackers, c.UUID) + d.mtx.Unlock() + }() + return tracker +} - for { - touched := make(map[string]bool) - dispatcher.getContainers(paramsQ, touched) - dispatcher.getContainers(paramsP, touched) - dispatcher.mineMutex.Lock() - var monitored []string - for k := range dispatcher.mineMap { - if _, ok := touched[k]; !ok { - monitored = append(monitored, k) - } - } - dispatcher.mineMutex.Unlock() - if monitored != nil { - dispatcher.getContainers(arvadosclient.Dict{ - "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched) - } - select { - case <-ticker.C: - case <-stop: +func (d *Dispatcher) checkForUpdates(filters [][]interface{}) { + params := arvadosclient.Dict{ + "filters": filters, + "order": []string{"priority desc"}} + + var list arvados.ContainerList + for offset, more := 0, true; more; offset += len(list.Items) { + params["offset"] = offset + err := d.Arv.List("containers", params, &list) + if err != nil { + log.Printf("Error getting list of containers: %q", err) return } + more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset + d.checkListForUpdates(list.Items) } } -func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) { - if container.State == Queued && dispatcher.checkMine(container, false) { - // If we previously started the job, something failed, and it - // was re-queued, this dispatcher might still be monitoring it. - // Stop the existing monitor, then try to lock and run it - // again. - dispatcher.notMine(container.UUID) - } - - if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued { - // If container is Complete, Cancelled, or Queued, LockedByUUID - // will be nil. If the container was formerly Locked, moved - // back to Queued and then locked by another dispatcher, - // LockedByUUID will be different. In either case, we want - // to stop monitoring it. - log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID) - dispatcher.notMine(container.UUID) - return - } - - if dispatcher.checkMine(container, true) { - // Already monitored, sent status update - return +func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) { + d.mtx.Lock() + defer d.mtx.Unlock() + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) } - if container.State == Queued && container.Priority > 0 { - if !dispatcher.throttle.Check(container.UUID) { - return - } - // Try to take the lock - if err := dispatcher.Lock(container.UUID); err != nil { - return + for _, c := range containers { + tracker, alreadyTracking := d.trackers[c.UUID] + if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID { + log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID) + } else if alreadyTracking { + switch c.State { + case Queued: + tracker.close() + case Locked, Running: + tracker.update(c) + case Cancelled, Complete: + tracker.close() + } + } else { + switch c.State { + case Queued: + if !d.throttle.Check(c.UUID) { + break + } + err := d.lock(c.UUID) + if err != nil { + log.Printf("debug: error locking container %s: %s", c.UUID, err) + break + } + c.State = Locked + d.trackers[c.UUID] = d.start(c) + case Locked, Running: + if !d.throttle.Check(c.UUID) { + break + } + d.trackers[c.UUID] = d.start(c) + case Cancelled, Complete: + // no-op (we already stopped monitoring) + } } - container.State = Locked - } - - if container.State == Locked || container.State == Running { - // Not currently monitored but in Locked or Running state and - // owned by this dispatcher, so start monitoring. - go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID)) } } // UpdateState makes an API call to change the state of a container. -func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error { - err := dispatcher.Arv.Update("containers", uuid, +func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error { + err := d.Arv.Update("containers", uuid, arvadosclient.Dict{ - "container": arvadosclient.Dict{"state": newState}}, - nil) + "container": arvadosclient.Dict{"state": state}, + }, nil) if err != nil { - log.Printf("Error updating container %s to state %q: %q", uuid, newState, err) + log.Printf("Error updating container %s to state %q: %s", uuid, state, err) } return err } // Lock makes the lock API call which updates the state of a container to Locked. -func (dispatcher *Dispatcher) Lock(uuid string) error { - err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil) - if err != nil { - log.Printf("Error locking container %s: %q", uuid, err) - } - return err +func (d *Dispatcher) lock(uuid string) error { + return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil) } // Unlock makes the unlock API call which updates the state of a container to Queued. -func (dispatcher *Dispatcher) Unlock(uuid string) error { - err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) +func (d *Dispatcher) Unlock(uuid string) error { + return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) +} + +// TrackContainer ensures a tracker is running for the given UUID, +// regardless of the current state of the container (except: if the +// container is locked by a different dispatcher, a tracker will not +// be started). If the container is not in Locked or Running state, +// the new tracker will close down immediately. +// +// This allows the dispatcher to put its own RunContainer func into a +// cleanup phase (for example, to kill local processes created by a +// prevous dispatch process that are still running even though the +// container state is final) without the risk of having multiple +// goroutines monitoring the same UUID. +func (d *Dispatcher) TrackContainer(uuid string) error { + var cntr arvados.Container + err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) if err != nil { - log.Printf("Error unlocking container %s: %q", uuid, err) + return err } - return err + if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID { + return nil + } + + d.mtx.Lock() + defer d.mtx.Unlock() + if _, alreadyTracking := d.trackers[uuid]; alreadyTracking { + return nil + } + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) + } + d.trackers[uuid] = d.start(cntr) + switch cntr.State { + case Queued, Cancelled, Complete: + d.trackers[uuid].close() + } + return nil } -// Stop causes Run to return after the current polling cycle. -func (dispatcher *Dispatcher) Stop() { - if dispatcher.stop == nil { - // already stopped - return +type runTracker struct { + closing bool + updates chan arvados.Container +} + +func (tracker *runTracker) close() { + if !tracker.closing { + close(tracker.updates) } - close(dispatcher.stop) - dispatcher.stop = nil + tracker.closing = true } -// Run runs the main loop of the dispatcher. -func (dispatcher *Dispatcher) Run() (err error) { - err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth) - if err != nil { - log.Printf("Error getting my token UUID: %v", err) +func (tracker *runTracker) update(c arvados.Container) { + if tracker.closing { return } - - dispatcher.mineMap = make(map[string]chan arvados.Container) - dispatcher.stop = make(chan struct{}) - dispatcher.throttle.hold = dispatcher.MinRetryPeriod - dispatcher.pollContainers(dispatcher.stop) - return nil + select { + case <-tracker.updates: + log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID) + default: + } + tracker.updates <- c }