X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3a3910fdc8a5003c182f68e3423c96327a136175..123153139bbee3674c81325653d87fa19ffbe0e4:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index a27971f906..261444a05f 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -1,240 +1,265 @@ +// Package dispatch is a helper library for building Arvados container +// dispatchers. package dispatch import ( - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "context" + "fmt" "log" - "os" - "os/signal" "sync" - "syscall" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" ) -// Constants for container states const ( - Queued = "Queued" - Locked = "Locked" - Running = "Running" - Complete = "Complete" - Cancelled = "Cancelled" + Queued = arvados.ContainerStateQueued + Locked = arvados.ContainerStateLocked + Running = arvados.ContainerStateRunning + Complete = arvados.ContainerStateComplete + Cancelled = arvados.ContainerStateCancelled ) -type apiClientAuthorization struct { - UUID string `json:"uuid"` - APIToken string `json:"api_token"` -} +// Dispatcher struct +type Dispatcher struct { + Arv *arvadosclient.ArvadosClient -type apiClientAuthorizationList struct { - Items []apiClientAuthorization `json:"items"` -} + // Queue polling frequency + PollPeriod time.Duration -// Container data -type Container struct { - UUID string `json:"uuid"` - State string `json:"state"` - Priority int `json:"priority"` - RuntimeConstraints map[string]int64 `json:"runtime_constraints"` - LockedByUUID string `json:"locked_by_uuid"` -} + // Time to wait between successive attempts to run the same container + MinRetryPeriod time.Duration -// ContainerList is a list of the containers from api -type ContainerList struct { - Items []Container `json:"items"` - ItemsAvailable int `json:"items_available"` -} + // Func that implements the container lifecycle. Must be set + // to a non-nil DispatchFunc before calling Run(). + RunContainer DispatchFunc -// Dispatcher holds the state of the dispatcher -type Dispatcher struct { - Arv arvadosclient.ArvadosClient - RunContainer func(*Dispatcher, Container, chan Container) - PollInterval time.Duration - DoneProcessing chan struct{} - - mineMutex sync.Mutex - mineMap map[string]chan Container - Auth apiClientAuthorization - containers chan Container + auth arvados.APIClientAuthorization + mtx sync.Mutex + trackers map[string]*runTracker + throttle throttle } -// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones -// for which this process is actively starting/monitoring. Returns channel to -// be used to send container status updates. -func (dispatcher *Dispatcher) setMine(uuid string) chan Container { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - if ch, ok := dispatcher.mineMap[uuid]; ok { - return ch - } - - ch := make(chan Container) - dispatcher.mineMap[uuid] = ch - return ch -} +// A DispatchFunc executes a container (if the container record is +// Locked) or resume monitoring an already-running container, and wait +// until that container exits. +// +// While the container runs, the DispatchFunc should listen for +// updated container records on the provided channel. When the channel +// closes, the DispatchFunc should stop the container if it's still +// running, and return. +// +// The DispatchFunc should not return until the container is finished. +type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) -// Release a container which is no longer being monitored. -func (dispatcher *Dispatcher) notMine(uuid string) { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - if ch, ok := dispatcher.mineMap[uuid]; ok { - close(ch) - delete(dispatcher.mineMap, uuid) +// Run watches the API server's queue for containers that are either +// ready to run and available to lock, or are already locked by this +// dispatcher's token. When a new one appears, Run calls RunContainer +// in a new goroutine. +func (d *Dispatcher) Run(ctx context.Context) error { + err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth) + if err != nil { + return fmt.Errorf("error getting my token UUID: %v", err) } -} -// Check if there is a channel for updates associated with this container. If -// so send the container record on the channel and return true, if not return -// false. -func (dispatcher *Dispatcher) updateMine(c Container) bool { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - ch, ok := dispatcher.mineMap[c.UUID] - if ok { - ch <- c - return true - } - return false -} + d.throttle.hold = d.MinRetryPeriod -func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) { - var containers ContainerList - err := dispatcher.Arv.List("containers", params, &containers) - if err != nil { - log.Printf("Error getting list of containers: %q", err) - return + poll := time.NewTicker(d.PollPeriod) + defer poll.Stop() + + for { + tracked := d.trackedUUIDs() + d.checkForUpdates([][]interface{}{ + {"uuid", "in", tracked}}) + d.checkForUpdates([][]interface{}{ + {"locked_by_uuid", "=", d.auth.UUID}, + {"uuid", "not in", tracked}}) + d.checkForUpdates([][]interface{}{ + {"state", "=", Queued}, + {"priority", ">", "0"}, + {"uuid", "not in", tracked}}) + select { + case <-poll.C: + continue + case <-ctx.Done(): + return ctx.Err() + } } +} - if containers.ItemsAvailable > len(containers.Items) { - // TODO: support paging - log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.", - containers.ItemsAvailable, - len(containers.Items)) +func (d *Dispatcher) trackedUUIDs() []string { + d.mtx.Lock() + defer d.mtx.Unlock() + if len(d.trackers) == 0 { + // API bug: ["uuid", "not in", []] does not work as + // expected, but this does: + return []string{"this-uuid-does-not-exist"} } - for _, container := range containers.Items { - touched[container.UUID] = true - dispatcher.containers <- container + uuids := make([]string, 0, len(d.trackers)) + for x := range d.trackers { + uuids = append(uuids, x) } + return uuids } -func (dispatcher *Dispatcher) pollContainers() { - ticker := time.NewTicker(dispatcher.PollInterval) +// Start a runner in a new goroutine, and send the initial container +// record to its updates channel. +func (d *Dispatcher) start(c arvados.Container) *runTracker { + tracker := &runTracker{updates: make(chan arvados.Container, 1)} + tracker.updates <- c + go func() { + d.RunContainer(d, c, tracker.updates) - paramsQ := arvadosclient.Dict{ - "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}}, - "order": []string{"priority desc"}, - "limit": "1000"} - paramsP := arvadosclient.Dict{ - "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}}, - "limit": "1000"} + d.mtx.Lock() + delete(d.trackers, c.UUID) + d.mtx.Unlock() + }() + return tracker +} - for { - select { - case <-ticker.C: - touched := make(map[string]bool) - dispatcher.getContainers(paramsQ, touched) - dispatcher.getContainers(paramsP, touched) - dispatcher.mineMutex.Lock() - var monitored []string - for k := range dispatcher.mineMap { - if _, ok := touched[k]; !ok { - monitored = append(monitored, k) - } - } - dispatcher.mineMutex.Unlock() - if monitored != nil { - dispatcher.getContainers(arvadosclient.Dict{ - "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched) - } - case <-dispatcher.DoneProcessing: - close(dispatcher.containers) - ticker.Stop() +func (d *Dispatcher) checkForUpdates(filters [][]interface{}) { + params := arvadosclient.Dict{ + "filters": filters, + "order": []string{"priority desc"}} + + var list arvados.ContainerList + for offset, more := 0, true; more; offset += len(list.Items) { + params["offset"] = offset + err := d.Arv.List("containers", params, &list) + if err != nil { + log.Printf("Error getting list of containers: %q", err) return } + more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset + d.checkListForUpdates(list.Items) } } -func (dispatcher *Dispatcher) handleUpdate(container Container) { - if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued { - // If container is Complete, Cancelled, or Queued, LockedByUUID - // will be nil. If the container was formally Locked, moved - // back to Queued and then locked by another dispatcher, - // LockedByUUID will be different. In either case, we want - // to stop monitoring it. - log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID) - dispatcher.notMine(container.UUID) - return +func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) { + d.mtx.Lock() + defer d.mtx.Unlock() + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) } - if dispatcher.updateMine(container) { - // Already monitored, sent status update - return - } - - if container.State == Queued { - // Try to take the lock - if err := dispatcher.UpdateState(container.UUID, Locked); err != nil { - return + for _, c := range containers { + tracker, alreadyTracking := d.trackers[c.UUID] + if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID { + log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID) + } else if alreadyTracking { + switch c.State { + case Queued: + tracker.close() + case Locked, Running: + tracker.update(c) + case Cancelled, Complete: + tracker.close() + } + } else { + switch c.State { + case Queued: + if !d.throttle.Check(c.UUID) { + break + } + err := d.lock(c.UUID) + if err != nil { + log.Printf("debug: error locking container %s: %s", c.UUID, err) + break + } + c.State = Locked + d.trackers[c.UUID] = d.start(c) + case Locked, Running: + if !d.throttle.Check(c.UUID) { + break + } + d.trackers[c.UUID] = d.start(c) + case Cancelled, Complete: + // no-op (we already stopped monitoring) + } } - container.State = Locked - } - - if container.State == Locked || container.State == Running { - // Not currently monitored but in Locked or Running state and - // owned by this dispatcher, so start monitoring. - go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID)) } } // UpdateState makes an API call to change the state of a container. -func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { - err := dispatcher.Arv.Update("containers", uuid, +func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error { + err := d.Arv.Update("containers", uuid, arvadosclient.Dict{ - "container": arvadosclient.Dict{"state": newState}}, - nil) + "container": arvadosclient.Dict{"state": state}, + }, nil) if err != nil { - log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err) + log.Printf("Error updating container %s to state %q: %s", uuid, state, err) } return err } -// RunDispatcher runs the main loop of the dispatcher until receiving a message -// on the dispatcher.DoneProcessing channel. It also installs a signal handler -// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT. +// Lock makes the lock API call which updates the state of a container to Locked. +func (d *Dispatcher) lock(uuid string) error { + return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil) +} + +// Unlock makes the unlock API call which updates the state of a container to Queued. +func (d *Dispatcher) Unlock(uuid string) error { + return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) +} + +// TrackContainer ensures a tracker is running for the given UUID, +// regardless of the current state of the container (except: if the +// container is locked by a different dispatcher, a tracker will not +// be started). If the container is not in Locked or Running state, +// the new tracker will close down immediately. // -// When a new queued container appears and is successfully locked, the -// dispatcher will call RunContainer() followed by MonitorContainer(). If a -// container appears that is Locked or Running but not known to the dispatcher, -// it will only call monitorContainer(). The monitorContainer() callback is -// passed a channel over which it will receive updates to the container state. -// The callback is responsible for draining the channel, if it fails to do so -// it will deadlock the dispatcher. -func (dispatcher *Dispatcher) RunDispatcher() (err error) { - err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth) +// This allows the dispatcher to put its own RunContainer func into a +// cleanup phase (for example, to kill local processes created by a +// prevous dispatch process that are still running even though the +// container state is final) without the risk of having multiple +// goroutines monitoring the same UUID. +func (d *Dispatcher) TrackContainer(uuid string) error { + var cntr arvados.Container + err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) if err != nil { - log.Printf("Error getting my token UUID: %v", err) - return + return err + } + if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID { + return nil } - dispatcher.mineMap = make(map[string]chan Container) - dispatcher.containers = make(chan Container) - - // Graceful shutdown on signal - sigChan := make(chan os.Signal) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - - go func(sig <-chan os.Signal) { - for sig := range sig { - log.Printf("Caught signal: %v", sig) - dispatcher.DoneProcessing <- struct{}{} - } - }(sigChan) + d.mtx.Lock() + defer d.mtx.Unlock() + if _, alreadyTracking := d.trackers[uuid]; alreadyTracking { + return nil + } + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) + } + d.trackers[uuid] = d.start(cntr) + switch cntr.State { + case Queued, Cancelled, Complete: + d.trackers[uuid].close() + } + return nil +} - defer close(sigChan) - defer signal.Stop(sigChan) +type runTracker struct { + closing bool + updates chan arvados.Container +} - go dispatcher.pollContainers() - for container := range dispatcher.containers { - dispatcher.handleUpdate(container) +func (tracker *runTracker) close() { + if !tracker.closing { + close(tracker.updates) } + tracker.closing = true +} - return nil +func (tracker *runTracker) update(c arvados.Container) { + if tracker.closing { + return + } + select { + case <-tracker.updates: + log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID) + default: + } + tracker.updates <- c }