X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ddee3839f8a82b889f84171e2354108cb20f93e0..67a9e63d83a429eadddfa1424a37e010f7c2c365:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 7342c3b8cb..3289c67b01 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -1,6 +1,9 @@ -// Framework for monitoring the Arvados container Queue, Locks container -// records, and runs goroutine callbacks which implement execution and -// monitoring of the containers. +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package dispatch is a helper library for building Arvados container +// dispatchers. package dispatch import ( @@ -22,150 +25,200 @@ const ( Cancelled = arvados.ContainerStateCancelled ) -type runner struct { - closing bool - updates chan arvados.Container -} - -func (ex *runner) close() { - if !ex.closing { - close(ex.updates) - } - ex.closing = true -} +// Dispatcher struct +type Dispatcher struct { + Arv *arvadosclient.ArvadosClient -func (ex *runner) update(c arvados.Container) { - if ex.closing { - return - } - select { - case <-ex.updates: - log.Print("debug: executor is handling updates slowly, discarded previous update for %s", c.UUID) - default: - } - ex.updates <- c -} + // Queue polling frequency + PollPeriod time.Duration -type Dispatcher struct { - Arv *arvadosclient.ArvadosClient - PollPeriod time.Duration + // Time to wait between successive attempts to run the same container MinRetryPeriod time.Duration - RunContainer Runner + + // Func that implements the container lifecycle. Must be set + // to a non-nil DispatchFunc before calling Run(). + RunContainer DispatchFunc auth arvados.APIClientAuthorization mtx sync.Mutex - running map[string]*runner + trackers map[string]*runTracker throttle throttle } -// A Runner executes a container. If it starts any goroutines, it must -// not return until it can guarantee that none of those goroutines -// will do anything with this container. -type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container) +// A DispatchFunc executes a container (if the container record is +// Locked) or resume monitoring an already-running container, and wait +// until that container exits. +// +// While the container runs, the DispatchFunc should listen for +// updated container records on the provided channel. When the channel +// closes, the DispatchFunc should stop the container if it's still +// running, and return. +// +// The DispatchFunc should not return until the container is finished. +type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) +// Run watches the API server's queue for containers that are either +// ready to run and available to lock, or are already locked by this +// dispatcher's token. When a new one appears, Run calls RunContainer +// in a new goroutine. func (d *Dispatcher) Run(ctx context.Context) error { err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth) if err != nil { return fmt.Errorf("error getting my token UUID: %v", err) } + d.throttle.hold = d.MinRetryPeriod + poll := time.NewTicker(d.PollPeriod) defer poll.Stop() for { - running := make([]string, 0, len(d.running)) + select { + case <-poll.C: + break + case <-ctx.Done(): + return ctx.Err() + } + + todo := make(map[string]*runTracker) d.mtx.Lock() - for uuid := range d.running { - running = append(running, uuid) + // Make a copy of trackers + for uuid, tracker := range d.trackers { + todo[uuid] = tracker } d.mtx.Unlock() - if len(running) == 0 { - // API bug: ["uuid", "not in", []] does not match everything - running = []string{"X"} - } - d.checkForUpdates([][]interface{}{ - {"uuid", "in", running}}) - d.checkForUpdates([][]interface{}{ + + // Containers I currently own (Locked/Running) + querySuccess := d.checkForUpdates([][]interface{}{ + {"locked_by_uuid", "=", d.auth.UUID}}, todo) + + // Containers I should try to dispatch + querySuccess = d.checkForUpdates([][]interface{}{ {"state", "=", Queued}, - {"priority", ">", "0"}, - {"uuid", "not in", running}}) - d.checkForUpdates([][]interface{}{ - {"locked_by_uuid", "=", d.auth.UUID}, - {"uuid", "not in", running}}) - select { - case <-poll.C: + {"priority", ">", "0"}}, todo) && querySuccess + + if !querySuccess { + // There was an error in one of the previous queries, + // we probably didn't get updates for all the + // containers we should have. Don't check them + // individually because it may be expensive. continue - case <-ctx.Done(): - return ctx.Err() } + + // Containers I know about but didn't fall into the + // above two categories (probably Complete/Cancelled) + var missed []string + for uuid := range todo { + missed = append(missed, uuid) + } + + for len(missed) > 0 { + var batch []string + if len(missed) > 20 { + batch = missed[0:20] + missed = missed[20:] + } else { + batch = missed + missed = missed[0:0] + } + querySuccess = d.checkForUpdates([][]interface{}{ + {"uuid", "in", batch}}, todo) && querySuccess + } + + if !querySuccess { + // There was an error in one of the previous queries, we probably + // didn't see all the containers we should have, so don't shut down + // the missed containers. + continue + } + + // Containers that I know about that didn't show up in any + // query should be let go. + for uuid, tracker := range todo { + log.Printf("Container %q not returned by any query, stopping tracking.", uuid) + tracker.close() + } + } } -func (d *Dispatcher) start(c arvados.Container) *runner { - ex := &runner{ - updates: make(chan arvados.Container, 1), - } - if d.running == nil { - d.running = make(map[string]*runner) - } - d.running[c.UUID] = ex +// Start a runner in a new goroutine, and send the initial container +// record to its updates channel. +func (d *Dispatcher) start(c arvados.Container) *runTracker { + tracker := &runTracker{updates: make(chan arvados.Container, 1)} + tracker.updates <- c go func() { - d.RunContainer(d, c, ex.updates) + d.RunContainer(d, c, tracker.updates) + // RunContainer blocks for the lifetime of the container. When + // it returns, the tracker should delete itself. d.mtx.Lock() - delete(d.running, c.UUID) + delete(d.trackers, c.UUID) d.mtx.Unlock() }() - return ex + return tracker } -func (d *Dispatcher) checkForUpdates(filters [][]interface{}) { +func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool { params := arvadosclient.Dict{ "filters": filters, - "order": []string{"priority desc"}, - "limit": "1000"} + "order": []string{"priority desc"}} var list arvados.ContainerList - err := d.Arv.List("containers", params, &list) - if err != nil { - log.Printf("Error getting list of containers: %q", err) - return - } - - if list.ItemsAvailable > len(list.Items) { - // TODO: support paging - log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.", - list.ItemsAvailable, - len(list.Items)) + for offset, more := 0, true; more; offset += len(list.Items) { + params["offset"] = offset + err := d.Arv.List("containers", params, &list) + if err != nil { + log.Printf("Error getting list of containers: %q", err) + return false + } + more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset + d.checkListForUpdates(list.Items, todo) } + return true +} +func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) { d.mtx.Lock() defer d.mtx.Unlock() - for _, c := range list.Items { - ex, running := d.running[c.UUID] + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) + } + + for _, c := range containers { + tracker, alreadyTracking := d.trackers[c.UUID] + delete(todo, c.UUID) + if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID { log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID) - } else if running { + } else if alreadyTracking { switch c.State { case Queued: - ex.close() + tracker.close() case Locked, Running: - ex.update(c) + tracker.update(c) case Cancelled, Complete: - ex.close() + tracker.close() } } else { switch c.State { case Queued: - if err := d.lock(c.UUID); err != nil { - log.Printf("Error locking container %s: %s", c.UUID, err) - } else { - c.State = Locked - d.start(c).update(c) + if !d.throttle.Check(c.UUID) { + break } + err := d.lock(c.UUID) + if err != nil { + log.Printf("debug: error locking container %s: %s", c.UUID, err) + break + } + c.State = Locked + d.trackers[c.UUID] = d.start(c) case Locked, Running: - d.start(c).update(c) + if !d.throttle.Check(c.UUID) { + break + } + d.trackers[c.UUID] = d.start(c) case Cancelled, Complete: - ex.close() + // no-op (we already stopped monitoring) } } } @@ -192,3 +245,64 @@ func (d *Dispatcher) lock(uuid string) error { func (d *Dispatcher) Unlock(uuid string) error { return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) } + +// TrackContainer ensures a tracker is running for the given UUID, +// regardless of the current state of the container (except: if the +// container is locked by a different dispatcher, a tracker will not +// be started). If the container is not in Locked or Running state, +// the new tracker will close down immediately. +// +// This allows the dispatcher to put its own RunContainer func into a +// cleanup phase (for example, to kill local processes created by a +// prevous dispatch process that are still running even though the +// container state is final) without the risk of having multiple +// goroutines monitoring the same UUID. +func (d *Dispatcher) TrackContainer(uuid string) error { + var cntr arvados.Container + err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) + if err != nil { + return err + } + if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID { + return nil + } + + d.mtx.Lock() + defer d.mtx.Unlock() + if _, alreadyTracking := d.trackers[uuid]; alreadyTracking { + return nil + } + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) + } + d.trackers[uuid] = d.start(cntr) + switch cntr.State { + case Queued, Cancelled, Complete: + d.trackers[uuid].close() + } + return nil +} + +type runTracker struct { + closing bool + updates chan arvados.Container +} + +func (tracker *runTracker) close() { + if !tracker.closing { + close(tracker.updates) + } + tracker.closing = true +} + +func (tracker *runTracker) update(c arvados.Container) { + if tracker.closing { + return + } + select { + case <-tracker.updates: + log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID) + default: + } + tracker.updates <- c +}