X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a95f899d7ac84f29b3d019aa410d265bb40833e5..58044098495d066effa7fd4742b6635d9a10fdfb:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index fb7b5fb799..152207ea94 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -1,253 +1,332 @@ -// Framework for monitoring the Arvados container Queue, Locks container -// records, and runs goroutine callbacks which implement execution and -// monitoring of the containers. +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package dispatch is a helper library for building Arvados container +// dispatchers. package dispatch import ( - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "log" - "os" - "os/signal" + "context" + "fmt" "sync" - "syscall" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "github.com/Sirupsen/logrus" ) -// Constants for container states const ( - Queued = "Queued" - Locked = "Locked" - Running = "Running" - Complete = "Complete" - Cancelled = "Cancelled" + Queued = arvados.ContainerStateQueued + Locked = arvados.ContainerStateLocked + Running = arvados.ContainerStateRunning + Complete = arvados.ContainerStateComplete + Cancelled = arvados.ContainerStateCancelled ) -type apiClientAuthorization struct { - UUID string `json:"uuid"` - APIToken string `json:"api_token"` +type Logger interface { + Printf(string, ...interface{}) + Warnf(string, ...interface{}) + Debugf(string, ...interface{}) } -type apiClientAuthorizationList struct { - Items []apiClientAuthorization `json:"items"` -} +// Dispatcher struct +type Dispatcher struct { + Arv *arvadosclient.ArvadosClient -// Represents an Arvados container record -type Container struct { - UUID string `json:"uuid"` - State string `json:"state"` - Priority int `json:"priority"` - RuntimeConstraints map[string]int64 `json:"runtime_constraints"` - LockedByUUID string `json:"locked_by_uuid"` -} + Logger Logger -// ContainerList is a list of the containers from api -type ContainerList struct { - Items []Container `json:"items"` - ItemsAvailable int `json:"items_available"` -} + // Queue polling frequency + PollPeriod time.Duration -// Dispatcher holds the state of the dispatcher -type Dispatcher struct { - // The Arvados client - Arv arvadosclient.ArvadosClient - - // When a new queued container appears and is either already owned by - // this dispatcher or is successfully locked, the dispatcher will call - // go RunContainer(). The RunContainer() goroutine gets a channel over - // which it will receive updates to the container state. The - // RunContainer() goroutine should only assume status updates come when - // the container record changes on the API server; if it needs to - // monitor the job submission to the underlying slurm/grid engine/etc - // queue it should spin up its own polling goroutines. When the - // channel is closed, that means the container is no longer being - // handled by this dispatcher and the goroutine should terminate. The - // goroutine is responsible for draining the 'status' channel, failure - // to do so may deadlock the dispatcher. - RunContainer func(*Dispatcher, Container, chan Container) - - // Amount of time to wait between polling for updates. - PollInterval time.Duration - - // Channel used to signal that RunDispatcher loop should exit. - DoneProcessing chan struct{} - - mineMutex sync.Mutex - mineMap map[string]chan Container - Auth apiClientAuthorization - containers chan Container -} + // Time to wait between successive attempts to run the same container + MinRetryPeriod time.Duration -// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones -// for which this process is actively starting/monitoring. Returns channel to -// be used to send container status updates. -func (dispatcher *Dispatcher) setMine(uuid string) chan Container { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - if ch, ok := dispatcher.mineMap[uuid]; ok { - return ch - } + // Func that implements the container lifecycle. Must be set + // to a non-nil DispatchFunc before calling Run(). + RunContainer DispatchFunc - ch := make(chan Container) - dispatcher.mineMap[uuid] = ch - return ch + auth arvados.APIClientAuthorization + mtx sync.Mutex + trackers map[string]*runTracker + throttle throttle } -// Release a container which is no longer being monitored. -func (dispatcher *Dispatcher) notMine(uuid string) { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - if ch, ok := dispatcher.mineMap[uuid]; ok { - close(ch) - delete(dispatcher.mineMap, uuid) +// A DispatchFunc executes a container (if the container record is +// Locked) or resume monitoring an already-running container, and wait +// until that container exits. +// +// While the container runs, the DispatchFunc should listen for +// updated container records on the provided channel. When the channel +// closes, the DispatchFunc should stop the container if it's still +// running, and return. +// +// The DispatchFunc should not return until the container is finished. +type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) + +// Run watches the API server's queue for containers that are either +// ready to run and available to lock, or are already locked by this +// dispatcher's token. When a new one appears, Run calls RunContainer +// in a new goroutine. +func (d *Dispatcher) Run(ctx context.Context) error { + if d.Logger == nil { + d.Logger = logrus.StandardLogger() } -} - -// Check if there is a channel for updates associated with this container. If -// so send the container record on the channel and return true, if not return -// false. -func (dispatcher *Dispatcher) updateMine(c Container) bool { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - ch, ok := dispatcher.mineMap[c.UUID] - if ok { - ch <- c - return true - } - return false -} -func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) { - var containers ContainerList - err := dispatcher.Arv.List("containers", params, &containers) + err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth) if err != nil { - log.Printf("Error getting list of containers: %q", err) - return - } - - if containers.ItemsAvailable > len(containers.Items) { - // TODO: support paging - log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.", - containers.ItemsAvailable, - len(containers.Items)) + return fmt.Errorf("error getting my token UUID: %v", err) } - for _, container := range containers.Items { - touched[container.UUID] = true - dispatcher.containers <- container - } -} -func (dispatcher *Dispatcher) pollContainers() { - ticker := time.NewTicker(dispatcher.PollInterval) + d.throttle.hold = d.MinRetryPeriod - paramsQ := arvadosclient.Dict{ - "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}}, - "order": []string{"priority desc"}, - "limit": "1000"} - paramsP := arvadosclient.Dict{ - "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}}, - "limit": "1000"} + poll := time.NewTicker(d.PollPeriod) + defer poll.Stop() for { select { - case <-ticker.C: - touched := make(map[string]bool) - dispatcher.getContainers(paramsQ, touched) - dispatcher.getContainers(paramsP, touched) - dispatcher.mineMutex.Lock() - var monitored []string - for k := range dispatcher.mineMap { - if _, ok := touched[k]; !ok { - monitored = append(monitored, k) - } - } - dispatcher.mineMutex.Unlock() - if monitored != nil { - dispatcher.getContainers(arvadosclient.Dict{ - "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched) + case <-poll.C: + break + case <-ctx.Done(): + return ctx.Err() + } + + todo := make(map[string]*runTracker) + d.mtx.Lock() + // Make a copy of trackers + for uuid, tracker := range d.trackers { + todo[uuid] = tracker + } + d.mtx.Unlock() + + // Containers I currently own (Locked/Running) + querySuccess := d.checkForUpdates([][]interface{}{ + {"locked_by_uuid", "=", d.auth.UUID}}, todo) + + // Containers I should try to dispatch + querySuccess = d.checkForUpdates([][]interface{}{ + {"state", "=", Queued}, + {"priority", ">", "0"}}, todo) && querySuccess + + if !querySuccess { + // There was an error in one of the previous queries, + // we probably didn't get updates for all the + // containers we should have. Don't check them + // individually because it may be expensive. + continue + } + + // Containers I know about but didn't fall into the + // above two categories (probably Complete/Cancelled) + var missed []string + for uuid := range todo { + missed = append(missed, uuid) + } + + for len(missed) > 0 { + var batch []string + if len(missed) > 20 { + batch = missed[0:20] + missed = missed[20:] + } else { + batch = missed + missed = missed[0:0] } - case <-dispatcher.DoneProcessing: - close(dispatcher.containers) - ticker.Stop() - return + querySuccess = d.checkForUpdates([][]interface{}{ + {"uuid", "in", batch}}, todo) && querySuccess + } + + if !querySuccess { + // There was an error in one of the previous queries, we probably + // didn't see all the containers we should have, so don't shut down + // the missed containers. + continue } + + // Containers that I know about that didn't show up in any + // query should be let go. + for uuid, tracker := range todo { + d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid) + tracker.close() + } + } } -func (dispatcher *Dispatcher) handleUpdate(container Container) { - if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued { - // If container is Complete, Cancelled, or Queued, LockedByUUID - // will be nil. If the container was formerly Locked, moved - // back to Queued and then locked by another dispatcher, - // LockedByUUID will be different. In either case, we want - // to stop monitoring it. - log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID) - dispatcher.notMine(container.UUID) - return +// Start a runner in a new goroutine, and send the initial container +// record to its updates channel. +func (d *Dispatcher) start(c arvados.Container) *runTracker { + tracker := &runTracker{ + updates: make(chan arvados.Container, 1), + logger: d.Logger, } + tracker.updates <- c + go func() { + d.RunContainer(d, c, tracker.updates) + // RunContainer blocks for the lifetime of the container. When + // it returns, the tracker should delete itself. + d.mtx.Lock() + delete(d.trackers, c.UUID) + d.mtx.Unlock() + }() + return tracker +} - if dispatcher.updateMine(container) { - // Already monitored, sent status update - return +func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool { + params := arvadosclient.Dict{ + "filters": filters, + "order": []string{"priority desc"}} + offset := 0 + for { + params["offset"] = offset + + // This list variable must be a new one declared + // inside the loop: otherwise, items in the API + // response would get deep-merged into the items + // loaded in previous iterations. + var list arvados.ContainerList + + err := d.Arv.List("containers", params, &list) + if err != nil { + d.Logger.Warnf("error getting list of containers: %q", err) + return false + } + d.checkListForUpdates(list.Items, todo) + offset += len(list.Items) + if len(list.Items) == 0 || list.ItemsAvailable <= offset { + return true + } } +} - if container.State == Queued { - // Try to take the lock - if err := dispatcher.UpdateState(container.UUID, Locked); err != nil { - return - } - container.State = Locked +func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) { + d.mtx.Lock() + defer d.mtx.Unlock() + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) } - if container.State == Locked || container.State == Running { - // Not currently monitored but in Locked or Running state and - // owned by this dispatcher, so start monitoring. - go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID)) + for _, c := range containers { + tracker, alreadyTracking := d.trackers[c.UUID] + delete(todo, c.UUID) + + if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID { + d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID) + } else if alreadyTracking { + switch c.State { + case Queued: + tracker.close() + case Locked, Running: + tracker.update(c) + case Cancelled, Complete: + tracker.close() + } + } else { + switch c.State { + case Queued: + if !d.throttle.Check(c.UUID) { + break + } + err := d.lock(c.UUID) + if err != nil { + d.Logger.Warnf("error locking container %s: %s", c.UUID, err) + break + } + c.State = Locked + d.trackers[c.UUID] = d.start(c) + case Locked, Running: + if !d.throttle.Check(c.UUID) { + break + } + d.trackers[c.UUID] = d.start(c) + case Cancelled, Complete: + // no-op (we already stopped monitoring) + } + } } } // UpdateState makes an API call to change the state of a container. -func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { - err := dispatcher.Arv.Update("containers", uuid, +func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error { + err := d.Arv.Update("containers", uuid, arvadosclient.Dict{ - "container": arvadosclient.Dict{"state": newState}}, - nil) + "container": arvadosclient.Dict{"state": state}, + }, nil) if err != nil { - log.Printf("Error updating container %s to state %q: %q", uuid, newState, err) + d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err) } return err } -// RunDispatcher runs the main loop of the dispatcher until receiving a message -// on the dispatcher.DoneProcessing channel. It also installs a signal handler -// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT. -func (dispatcher *Dispatcher) RunDispatcher() (err error) { - err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth) - if err != nil { - log.Printf("Error getting my token UUID: %v", err) - return - } +// Lock makes the lock API call which updates the state of a container to Locked. +func (d *Dispatcher) lock(uuid string) error { + return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil) +} - dispatcher.mineMap = make(map[string]chan Container) - dispatcher.containers = make(chan Container) +// Unlock makes the unlock API call which updates the state of a container to Queued. +func (d *Dispatcher) Unlock(uuid string) error { + return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) +} - // Graceful shutdown on signal - sigChan := make(chan os.Signal) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) +// TrackContainer ensures a tracker is running for the given UUID, +// regardless of the current state of the container (except: if the +// container is locked by a different dispatcher, a tracker will not +// be started). If the container is not in Locked or Running state, +// the new tracker will close down immediately. +// +// This allows the dispatcher to put its own RunContainer func into a +// cleanup phase (for example, to kill local processes created by a +// prevous dispatch process that are still running even though the +// container state is final) without the risk of having multiple +// goroutines monitoring the same UUID. +func (d *Dispatcher) TrackContainer(uuid string) error { + var cntr arvados.Container + err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) + if err != nil { + return err + } + if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID { + return nil + } - go func(sig <-chan os.Signal) { - for sig := range sig { - log.Printf("Caught signal: %v", sig) - dispatcher.DoneProcessing <- struct{}{} - } - }(sigChan) + d.mtx.Lock() + defer d.mtx.Unlock() + if _, alreadyTracking := d.trackers[uuid]; alreadyTracking { + return nil + } + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) + } + d.trackers[uuid] = d.start(cntr) + switch cntr.State { + case Queued, Cancelled, Complete: + d.trackers[uuid].close() + } + return nil +} - defer close(sigChan) - defer signal.Stop(sigChan) +type runTracker struct { + closing bool + updates chan arvados.Container + logger Logger +} - go dispatcher.pollContainers() - for container := range dispatcher.containers { - dispatcher.handleUpdate(container) +func (tracker *runTracker) close() { + if !tracker.closing { + close(tracker.updates) } + tracker.closing = true +} - return nil +func (tracker *runTracker) update(c arvados.Container) { + if tracker.closing { + return + } + select { + case <-tracker.updates: + tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID) + default: + } + tracker.updates <- c }