X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a95f899d7ac84f29b3d019aa410d265bb40833e5..c9180edfa0d7306b9a533a0ed15dd90eca5cfae1:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index fb7b5fb799..4987c01055 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -4,6 +4,7 @@ package dispatch import ( + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "log" "os" @@ -13,43 +14,18 @@ import ( "time" ) -// Constants for container states const ( - Queued = "Queued" - Locked = "Locked" - Running = "Running" - Complete = "Complete" - Cancelled = "Cancelled" + Queued = arvados.ContainerStateQueued + Locked = arvados.ContainerStateLocked + Running = arvados.ContainerStateRunning + Complete = arvados.ContainerStateComplete + Cancelled = arvados.ContainerStateCancelled ) -type apiClientAuthorization struct { - UUID string `json:"uuid"` - APIToken string `json:"api_token"` -} - -type apiClientAuthorizationList struct { - Items []apiClientAuthorization `json:"items"` -} - -// Represents an Arvados container record -type Container struct { - UUID string `json:"uuid"` - State string `json:"state"` - Priority int `json:"priority"` - RuntimeConstraints map[string]int64 `json:"runtime_constraints"` - LockedByUUID string `json:"locked_by_uuid"` -} - -// ContainerList is a list of the containers from api -type ContainerList struct { - Items []Container `json:"items"` - ItemsAvailable int `json:"items_available"` -} - // Dispatcher holds the state of the dispatcher type Dispatcher struct { // The Arvados client - Arv arvadosclient.ArvadosClient + Arv *arvadosclient.ArvadosClient // When a new queued container appears and is either already owned by // this dispatcher or is successfully locked, the dispatcher will call @@ -63,7 +39,7 @@ type Dispatcher struct { // handled by this dispatcher and the goroutine should terminate. The // goroutine is responsible for draining the 'status' channel, failure // to do so may deadlock the dispatcher. - RunContainer func(*Dispatcher, Container, chan Container) + RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container) // Amount of time to wait between polling for updates. PollInterval time.Duration @@ -72,22 +48,22 @@ type Dispatcher struct { DoneProcessing chan struct{} mineMutex sync.Mutex - mineMap map[string]chan Container - Auth apiClientAuthorization - containers chan Container + mineMap map[string]chan arvados.Container + Auth arvados.APIClientAuthorization + containers chan arvados.Container } // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones // for which this process is actively starting/monitoring. Returns channel to // be used to send container status updates. -func (dispatcher *Dispatcher) setMine(uuid string) chan Container { +func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container { dispatcher.mineMutex.Lock() defer dispatcher.mineMutex.Unlock() if ch, ok := dispatcher.mineMap[uuid]; ok { return ch } - ch := make(chan Container) + ch := make(chan arvados.Container) dispatcher.mineMap[uuid] = ch return ch } @@ -102,22 +78,24 @@ func (dispatcher *Dispatcher) notMine(uuid string) { } } -// Check if there is a channel for updates associated with this container. If -// so send the container record on the channel and return true, if not return -// false. -func (dispatcher *Dispatcher) updateMine(c Container) bool { +// checkMine returns true if there is a channel for updates associated +// with container c. If update is true, also send the container record on +// the channel. +func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool { dispatcher.mineMutex.Lock() defer dispatcher.mineMutex.Unlock() ch, ok := dispatcher.mineMap[c.UUID] if ok { - ch <- c + if update { + ch <- c + } return true } return false } func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) { - var containers ContainerList + var containers arvados.ContainerList err := dispatcher.Arv.List("containers", params, &containers) if err != nil { log.Printf("Error getting list of containers: %q", err) @@ -173,7 +151,15 @@ func (dispatcher *Dispatcher) pollContainers() { } } -func (dispatcher *Dispatcher) handleUpdate(container Container) { +func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) { + if container.State == Queued && dispatcher.checkMine(container, false) { + // If we previously started the job, something failed, and it + // was re-queued, this dispatcher might still be monitoring it. + // Stop the existing monitor, then try to lock and run it + // again. + dispatcher.notMine(container.UUID) + } + if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued { // If container is Complete, Cancelled, or Queued, LockedByUUID // will be nil. If the container was formerly Locked, moved @@ -185,14 +171,14 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) { return } - if dispatcher.updateMine(container) { + if dispatcher.checkMine(container, true) { // Already monitored, sent status update return } - if container.State == Queued { + if container.State == Queued && container.Priority > 0 { // Try to take the lock - if err := dispatcher.UpdateState(container.UUID, Locked); err != nil { + if err := dispatcher.Lock(container.UUID); err != nil { return } container.State = Locked @@ -206,7 +192,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) { } // UpdateState makes an API call to change the state of a container. -func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { +func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error { err := dispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{ "container": arvadosclient.Dict{"state": newState}}, @@ -217,6 +203,24 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { return err } +// Lock makes the lock API call which updates the state of a container to Locked. +func (dispatcher *Dispatcher) Lock(uuid string) error { + err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil) + if err != nil { + log.Printf("Error locking container %s: %q", uuid, err) + } + return err +} + +// Unlock makes the unlock API call which updates the state of a container to Queued. +func (dispatcher *Dispatcher) Unlock(uuid string) error { + err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) + if err != nil { + log.Printf("Error unlocking container %s: %q", uuid, err) + } + return err +} + // RunDispatcher runs the main loop of the dispatcher until receiving a message // on the dispatcher.DoneProcessing channel. It also installs a signal handler // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT. @@ -227,8 +231,8 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) { return } - dispatcher.mineMap = make(map[string]chan Container) - dispatcher.containers = make(chan Container) + dispatcher.mineMap = make(map[string]chan arvados.Container) + dispatcher.containers = make(chan arvados.Container) // Graceful shutdown on signal sigChan := make(chan os.Signal)