X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3a3910fdc8a5003c182f68e3423c96327a136175..0e7915c354852dc63c989b9e5454a11037f3b65f:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index a27971f906..a48613292e 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -1,6 +1,10 @@ +// Framework for monitoring the Arvados container Queue, Locks container +// records, and runs goroutine callbacks which implement execution and +// monitoring of the containers. package dispatch import ( + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "log" "os" @@ -10,63 +14,56 @@ import ( "time" ) -// Constants for container states const ( - Queued = "Queued" - Locked = "Locked" - Running = "Running" - Complete = "Complete" - Cancelled = "Cancelled" + Queued = arvados.ContainerStateQueued + Locked = arvados.ContainerStateLocked + Running = arvados.ContainerStateRunning + Complete = arvados.ContainerStateComplete + Cancelled = arvados.ContainerStateCancelled ) -type apiClientAuthorization struct { - UUID string `json:"uuid"` - APIToken string `json:"api_token"` -} - -type apiClientAuthorizationList struct { - Items []apiClientAuthorization `json:"items"` -} +// Dispatcher holds the state of the dispatcher +type Dispatcher struct { + // The Arvados client + Arv arvadosclient.ArvadosClient -// Container data -type Container struct { - UUID string `json:"uuid"` - State string `json:"state"` - Priority int `json:"priority"` - RuntimeConstraints map[string]int64 `json:"runtime_constraints"` - LockedByUUID string `json:"locked_by_uuid"` -} + // When a new queued container appears and is either already owned by + // this dispatcher or is successfully locked, the dispatcher will call + // go RunContainer(). The RunContainer() goroutine gets a channel over + // which it will receive updates to the container state. The + // RunContainer() goroutine should only assume status updates come when + // the container record changes on the API server; if it needs to + // monitor the job submission to the underlying slurm/grid engine/etc + // queue it should spin up its own polling goroutines. When the + // channel is closed, that means the container is no longer being + // handled by this dispatcher and the goroutine should terminate. The + // goroutine is responsible for draining the 'status' channel, failure + // to do so may deadlock the dispatcher. + RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container) -// ContainerList is a list of the containers from api -type ContainerList struct { - Items []Container `json:"items"` - ItemsAvailable int `json:"items_available"` -} + // Amount of time to wait between polling for updates. + PollInterval time.Duration -// Dispatcher holds the state of the dispatcher -type Dispatcher struct { - Arv arvadosclient.ArvadosClient - RunContainer func(*Dispatcher, Container, chan Container) - PollInterval time.Duration + // Channel used to signal that RunDispatcher loop should exit. DoneProcessing chan struct{} mineMutex sync.Mutex - mineMap map[string]chan Container - Auth apiClientAuthorization - containers chan Container + mineMap map[string]chan arvados.Container + Auth arvados.APIClientAuthorization + containers chan arvados.Container } // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones // for which this process is actively starting/monitoring. Returns channel to // be used to send container status updates. -func (dispatcher *Dispatcher) setMine(uuid string) chan Container { +func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container { dispatcher.mineMutex.Lock() defer dispatcher.mineMutex.Unlock() if ch, ok := dispatcher.mineMap[uuid]; ok { return ch } - ch := make(chan Container) + ch := make(chan arvados.Container) dispatcher.mineMap[uuid] = ch return ch } @@ -81,22 +78,24 @@ func (dispatcher *Dispatcher) notMine(uuid string) { } } -// Check if there is a channel for updates associated with this container. If -// so send the container record on the channel and return true, if not return -// false. -func (dispatcher *Dispatcher) updateMine(c Container) bool { +// checkMine returns true if there is a channel for updates associated +// with container c. If update is true, also send the container record on +// the channel. +func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool { dispatcher.mineMutex.Lock() defer dispatcher.mineMutex.Unlock() ch, ok := dispatcher.mineMap[c.UUID] if ok { - ch <- c + if update { + ch <- c + } return true } return false } func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) { - var containers ContainerList + var containers arvados.ContainerList err := dispatcher.Arv.List("containers", params, &containers) if err != nil { log.Printf("Error getting list of containers: %q", err) @@ -152,26 +151,34 @@ func (dispatcher *Dispatcher) pollContainers() { } } -func (dispatcher *Dispatcher) handleUpdate(container Container) { +func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) { + if container.State == Queued && dispatcher.checkMine(container, false) { + // If we previously started the job, something failed, and it + // was re-queued, this dispatcher might still be monitoring it. + // Stop the existing monitor, then try to lock and run it + // again. + dispatcher.notMine(container.UUID) + } + if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued { // If container is Complete, Cancelled, or Queued, LockedByUUID - // will be nil. If the container was formally Locked, moved + // will be nil. If the container was formerly Locked, moved // back to Queued and then locked by another dispatcher, // LockedByUUID will be different. In either case, we want // to stop monitoring it. - log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID) + log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID) dispatcher.notMine(container.UUID) return } - if dispatcher.updateMine(container) { + if dispatcher.checkMine(container, true) { // Already monitored, sent status update return } - if container.State == Queued { + if container.State == Queued && container.Priority > 0 { // Try to take the lock - if err := dispatcher.UpdateState(container.UUID, Locked); err != nil { + if err := dispatcher.Lock(container.UUID); err != nil { return } container.State = Locked @@ -185,13 +192,31 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) { } // UpdateState makes an API call to change the state of a container. -func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { +func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error { err := dispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{ "container": arvadosclient.Dict{"state": newState}}, nil) if err != nil { - log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err) + log.Printf("Error updating container %s to state %q: %q", uuid, newState, err) + } + return err +} + +// Lock makes the lock API call which updates the state of a container to Locked. +func (dispatcher *Dispatcher) Lock(uuid string) error { + err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil) + if err != nil { + log.Printf("Error locking container %s: %q", uuid, err) + } + return err +} + +// Unlock makes the unlock API call which updates the state of a container to Queued. +func (dispatcher *Dispatcher) Unlock(uuid string) error { + err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) + if err != nil { + log.Printf("Error unlocking container %s: %q", uuid, err) } return err } @@ -199,14 +224,6 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { // RunDispatcher runs the main loop of the dispatcher until receiving a message // on the dispatcher.DoneProcessing channel. It also installs a signal handler // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT. -// -// When a new queued container appears and is successfully locked, the -// dispatcher will call RunContainer() followed by MonitorContainer(). If a -// container appears that is Locked or Running but not known to the dispatcher, -// it will only call monitorContainer(). The monitorContainer() callback is -// passed a channel over which it will receive updates to the container state. -// The callback is responsible for draining the channel, if it fails to do so -// it will deadlock the dispatcher. func (dispatcher *Dispatcher) RunDispatcher() (err error) { err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth) if err != nil { @@ -214,8 +231,8 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) { return } - dispatcher.mineMap = make(map[string]chan Container) - dispatcher.containers = make(chan Container) + dispatcher.mineMap = make(map[string]chan arvados.Container) + dispatcher.containers = make(chan arvados.Container) // Graceful shutdown on signal sigChan := make(chan os.Signal)