X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4153cb6cfad920ed0b1a4b818d3bcc8de492d134..bb10b7777ed6db229fbb35e6a829bec4e8efcd23:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 355ed7c108..fb7b5fb799 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -1,3 +1,6 @@ +// Framework for monitoring the Arvados container Queue, Locks container +// records, and runs goroutine callbacks which implement execution and +// monitoring of the containers. package dispatch import ( @@ -28,7 +31,7 @@ type apiClientAuthorizationList struct { Items []apiClientAuthorization `json:"items"` } -// Container data +// Represents an Arvados container record type Container struct { UUID string `json:"uuid"` State string `json:"state"` @@ -45,14 +48,32 @@ type ContainerList struct { // Dispatcher holds the state of the dispatcher type Dispatcher struct { - Arv arvadosclient.ArvadosClient - RunContainer func(*Dispatcher, Container, chan Container) - PollInterval time.Duration + // The Arvados client + Arv arvadosclient.ArvadosClient + + // When a new queued container appears and is either already owned by + // this dispatcher or is successfully locked, the dispatcher will call + // go RunContainer(). The RunContainer() goroutine gets a channel over + // which it will receive updates to the container state. The + // RunContainer() goroutine should only assume status updates come when + // the container record changes on the API server; if it needs to + // monitor the job submission to the underlying slurm/grid engine/etc + // queue it should spin up its own polling goroutines. When the + // channel is closed, that means the container is no longer being + // handled by this dispatcher and the goroutine should terminate. The + // goroutine is responsible for draining the 'status' channel, failure + // to do so may deadlock the dispatcher. + RunContainer func(*Dispatcher, Container, chan Container) + + // Amount of time to wait between polling for updates. + PollInterval time.Duration + + // Channel used to signal that RunDispatcher loop should exit. DoneProcessing chan struct{} mineMutex sync.Mutex mineMap map[string]chan Container - auth apiClientAuthorization + Auth apiClientAuthorization containers chan Container } @@ -100,17 +121,18 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m err := dispatcher.Arv.List("containers", params, &containers) if err != nil { log.Printf("Error getting list of containers: %q", err) - } else { - if containers.ItemsAvailable > len(containers.Items) { - // TODO: support paging - log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.", - containers.ItemsAvailable, - len(containers.Items)) - } - for _, container := range containers.Items { - touched[container.UUID] = true - dispatcher.containers <- container - } + return + } + + if containers.ItemsAvailable > len(containers.Items) { + // TODO: support paging + log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.", + containers.ItemsAvailable, + len(containers.Items)) + } + for _, container := range containers.Items { + touched[container.UUID] = true + dispatcher.containers <- container } } @@ -122,7 +144,7 @@ func (dispatcher *Dispatcher) pollContainers() { "order": []string{"priority desc"}, "limit": "1000"} paramsP := arvadosclient.Dict{ - "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}}, + "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}}, "limit": "1000"} for { @@ -152,11 +174,19 @@ func (dispatcher *Dispatcher) pollContainers() { } func (dispatcher *Dispatcher) handleUpdate(container Container) { + if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued { + // If container is Complete, Cancelled, or Queued, LockedByUUID + // will be nil. If the container was formerly Locked, moved + // back to Queued and then locked by another dispatcher, + // LockedByUUID will be different. In either case, we want + // to stop monitoring it. + log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID) + dispatcher.notMine(container.UUID) + return + } + if dispatcher.updateMine(container) { - if container.State == Complete || container.State == Cancelled { - log.Printf("Container %v now in state %v", container.UUID, container.State) - dispatcher.notMine(container.UUID) - } + // Already monitored, sent status update return } @@ -169,6 +199,8 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) { } if container.State == Locked || container.State == Running { + // Not currently monitored but in Locked or Running state and + // owned by this dispatcher, so start monitoring. go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID)) } } @@ -180,7 +212,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { "container": arvadosclient.Dict{"state": newState}}, nil) if err != nil { - log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err) + log.Printf("Error updating container %s to state %q: %q", uuid, newState, err) } return err } @@ -188,16 +220,8 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { // RunDispatcher runs the main loop of the dispatcher until receiving a message // on the dispatcher.DoneProcessing channel. It also installs a signal handler // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT. -// -// When a new queued container appears and is successfully locked, the -// dispatcher will call RunContainer() followed by MonitorContainer(). If a -// container appears that is Locked or Running but not known to the dispatcher, -// it will only call monitorContainer(). The monitorContainer() callback is -// passed a channel over which it will receive updates to the container state. -// The callback is responsible for draining the channel, if it fails to do so -// it will deadlock the dispatcher. func (dispatcher *Dispatcher) RunDispatcher() (err error) { - err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth) + err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth) if err != nil { log.Printf("Error getting my token UUID: %v", err) return