X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d4973e18ed92e3bcc0cb8b2020649148240ce9c0..95f399d4fca0c4e36c6da4e98e4092106ebfdc6d:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 54d596fee8..a48613292e 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -4,6 +4,7 @@ package dispatch import ( + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "log" "os" @@ -13,39 +14,14 @@ import ( "time" ) -// Constants for container states const ( - Queued = "Queued" - Locked = "Locked" - Running = "Running" - Complete = "Complete" - Cancelled = "Cancelled" + Queued = arvados.ContainerStateQueued + Locked = arvados.ContainerStateLocked + Running = arvados.ContainerStateRunning + Complete = arvados.ContainerStateComplete + Cancelled = arvados.ContainerStateCancelled ) -type apiClientAuthorization struct { - UUID string `json:"uuid"` - APIToken string `json:"api_token"` -} - -type apiClientAuthorizationList struct { - Items []apiClientAuthorization `json:"items"` -} - -// Represents an Arvados container record -type Container struct { - UUID string `json:"uuid"` - State string `json:"state"` - Priority int `json:"priority"` - RuntimeConstraints map[string]int64 `json:"runtime_constraints"` - LockedByUUID string `json:"locked_by_uuid"` -} - -// ContainerList is a list of the containers from api -type ContainerList struct { - Items []Container `json:"items"` - ItemsAvailable int `json:"items_available"` -} - // Dispatcher holds the state of the dispatcher type Dispatcher struct { // The Arvados client @@ -63,7 +39,7 @@ type Dispatcher struct { // handled by this dispatcher and the goroutine should terminate. The // goroutine is responsible for draining the 'status' channel, failure // to do so may deadlock the dispatcher. - RunContainer func(*Dispatcher, Container, chan Container) + RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container) // Amount of time to wait between polling for updates. PollInterval time.Duration @@ -72,22 +48,22 @@ type Dispatcher struct { DoneProcessing chan struct{} mineMutex sync.Mutex - mineMap map[string]chan Container - Auth apiClientAuthorization - containers chan Container + mineMap map[string]chan arvados.Container + Auth arvados.APIClientAuthorization + containers chan arvados.Container } // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones // for which this process is actively starting/monitoring. Returns channel to // be used to send container status updates. -func (dispatcher *Dispatcher) setMine(uuid string) chan Container { +func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container { dispatcher.mineMutex.Lock() defer dispatcher.mineMutex.Unlock() if ch, ok := dispatcher.mineMap[uuid]; ok { return ch } - ch := make(chan Container) + ch := make(chan arvados.Container) dispatcher.mineMap[uuid] = ch return ch } @@ -102,10 +78,10 @@ func (dispatcher *Dispatcher) notMine(uuid string) { } } -// checkMine returns true/false if there is a channel for updates associated +// checkMine returns true if there is a channel for updates associated // with container c. If update is true, also send the container record on // the channel. -func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool { +func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool { dispatcher.mineMutex.Lock() defer dispatcher.mineMutex.Unlock() ch, ok := dispatcher.mineMap[c.UUID] @@ -119,7 +95,7 @@ func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool { } func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) { - var containers ContainerList + var containers arvados.ContainerList err := dispatcher.Arv.List("containers", params, &containers) if err != nil { log.Printf("Error getting list of containers: %q", err) @@ -175,7 +151,7 @@ func (dispatcher *Dispatcher) pollContainers() { } } -func (dispatcher *Dispatcher) handleUpdate(container Container) { +func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) { if container.State == Queued && dispatcher.checkMine(container, false) { // If we previously started the job, something failed, and it // was re-queued, this dispatcher might still be monitoring it. @@ -202,7 +178,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) { if container.State == Queued && container.Priority > 0 { // Try to take the lock - if err := dispatcher.UpdateState(container.UUID, Locked); err != nil { + if err := dispatcher.Lock(container.UUID); err != nil { return } container.State = Locked @@ -216,7 +192,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) { } // UpdateState makes an API call to change the state of a container. -func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { +func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error { err := dispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{ "container": arvadosclient.Dict{"state": newState}}, @@ -227,6 +203,24 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { return err } +// Lock makes the lock API call which updates the state of a container to Locked. +func (dispatcher *Dispatcher) Lock(uuid string) error { + err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil) + if err != nil { + log.Printf("Error locking container %s: %q", uuid, err) + } + return err +} + +// Unlock makes the unlock API call which updates the state of a container to Queued. +func (dispatcher *Dispatcher) Unlock(uuid string) error { + err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) + if err != nil { + log.Printf("Error unlocking container %s: %q", uuid, err) + } + return err +} + // RunDispatcher runs the main loop of the dispatcher until receiving a message // on the dispatcher.DoneProcessing channel. It also installs a signal handler // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT. @@ -237,8 +231,8 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) { return } - dispatcher.mineMap = make(map[string]chan Container) - dispatcher.containers = make(chan Container) + dispatcher.mineMap = make(map[string]chan arvados.Container) + dispatcher.containers = make(chan arvados.Container) // Graceful shutdown on signal sigChan := make(chan os.Signal)