package dispatch
import (
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"log"
"os"
"time"
)
-// Constants for container states
const (
- Queued = "Queued"
- Locked = "Locked"
- Running = "Running"
- Complete = "Complete"
- Cancelled = "Cancelled"
+ Queued = arvados.ContainerStateQueued
+ Locked = arvados.ContainerStateLocked
+ Running = arvados.ContainerStateRunning
+ Complete = arvados.ContainerStateComplete
+ Cancelled = arvados.ContainerStateCancelled
)
-type apiClientAuthorization struct {
- UUID string `json:"uuid"`
- APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
- Items []apiClientAuthorization `json:"items"`
-}
-
-// Represents an Arvados container record
-type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
- RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
- LockedByUUID string `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
- Items []Container `json:"items"`
- ItemsAvailable int `json:"items_available"`
-}
-
// Dispatcher holds the state of the dispatcher
type Dispatcher struct {
// The Arvados client
// handled by this dispatcher and the goroutine should terminate. The
// goroutine is responsible for draining the 'status' channel, failure
// to do so may deadlock the dispatcher.
- RunContainer func(*Dispatcher, Container, chan Container)
+ RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
// Amount of time to wait between polling for updates.
PollInterval time.Duration
DoneProcessing chan struct{}
mineMutex sync.Mutex
- mineMap map[string]chan Container
- Auth apiClientAuthorization
- containers chan Container
+ mineMap map[string]chan arvados.Container
+ Auth arvados.APIClientAuthorization
+ containers chan arvados.Container
}
// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
// for which this process is actively starting/monitoring. Returns channel to
// be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
dispatcher.mineMutex.Lock()
defer dispatcher.mineMutex.Unlock()
if ch, ok := dispatcher.mineMap[uuid]; ok {
return ch
}
- ch := make(chan Container)
+ ch := make(chan arvados.Container)
dispatcher.mineMap[uuid] = ch
return ch
}
}
}
-// checkMine returns true/false if there is a channel for updates associated
+// checkMine returns true if there is a channel for updates associated
// with container c. If update is true, also send the container record on
// the channel.
-func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool {
+func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
dispatcher.mineMutex.Lock()
defer dispatcher.mineMutex.Unlock()
ch, ok := dispatcher.mineMap[c.UUID]
}
func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
- var containers ContainerList
+ var containers arvados.ContainerList
err := dispatcher.Arv.List("containers", params, &containers)
if err != nil {
log.Printf("Error getting list of containers: %q", err)
}
}
-func (dispatcher *Dispatcher) handleUpdate(container Container) {
+func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
if container.State == Queued && dispatcher.checkMine(container, false) {
// If we previously started the job, something failed, and it
// was re-queued, this dispatcher might still be monitoring it.
}
// UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
err := dispatcher.Arv.Update("containers", uuid,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": newState}},
return
}
- dispatcher.mineMap = make(map[string]chan Container)
- dispatcher.containers = make(chan Container)
+ dispatcher.mineMap = make(map[string]chan arvados.Container)
+ dispatcher.containers = make(chan arvados.Container)
// Graceful shutdown on signal
sigChan := make(chan os.Signal)