package dispatch
import (
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"log"
- "os"
- "os/signal"
"sync"
- "syscall"
"time"
)
-// Constants for container states
const (
- Queued = "Queued"
- Locked = "Locked"
- Running = "Running"
- Complete = "Complete"
- Cancelled = "Cancelled"
+ Queued = arvados.ContainerStateQueued
+ Locked = arvados.ContainerStateLocked
+ Running = arvados.ContainerStateRunning
+ Complete = arvados.ContainerStateComplete
+ Cancelled = arvados.ContainerStateCancelled
)
-type apiClientAuthorization struct {
- UUID string `json:"uuid"`
- APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
- Items []apiClientAuthorization `json:"items"`
-}
-
-// Represents an Arvados container record
-type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
- RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
- LockedByUUID string `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
- Items []Container `json:"items"`
- ItemsAvailable int `json:"items_available"`
-}
-
// Dispatcher holds the state of the dispatcher
type Dispatcher struct {
// The Arvados client
- Arv arvadosclient.ArvadosClient
+ Arv *arvadosclient.ArvadosClient
// When a new queued container appears and is either already owned by
// this dispatcher or is successfully locked, the dispatcher will call
// handled by this dispatcher and the goroutine should terminate. The
// goroutine is responsible for draining the 'status' channel, failure
// to do so may deadlock the dispatcher.
- RunContainer func(*Dispatcher, Container, chan Container)
+ RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
// Amount of time to wait between polling for updates.
PollInterval time.Duration
- // Channel used to signal that RunDispatcher loop should exit.
- DoneProcessing chan struct{}
+ mineMutex sync.Mutex
+ mineMap map[string]chan arvados.Container
+ Auth arvados.APIClientAuthorization
- mineMutex sync.Mutex
- mineMap map[string]chan Container
- Auth apiClientAuthorization
- containers chan Container
+ stop chan struct{}
}
// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
// for which this process is actively starting/monitoring. Returns channel to
// be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
dispatcher.mineMutex.Lock()
defer dispatcher.mineMutex.Unlock()
if ch, ok := dispatcher.mineMap[uuid]; ok {
return ch
}
- ch := make(chan Container)
+ ch := make(chan arvados.Container)
dispatcher.mineMap[uuid] = ch
return ch
}
}
}
-// Check if there is a channel for updates associated with this container. If
-// so send the container record on the channel and return true, if not return
-// false.
-func (dispatcher *Dispatcher) updateMine(c Container) bool {
+// checkMine returns true if there is a channel for updates associated
+// with container c. If update is true, also send the container record on
+// the channel.
+func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
dispatcher.mineMutex.Lock()
defer dispatcher.mineMutex.Unlock()
ch, ok := dispatcher.mineMap[c.UUID]
if ok {
- ch <- c
+ if update {
+ ch <- c
+ }
return true
}
return false
}
func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
- var containers ContainerList
+ var containers arvados.ContainerList
err := dispatcher.Arv.List("containers", params, &containers)
if err != nil {
log.Printf("Error getting list of containers: %q", err)
}
for _, container := range containers.Items {
touched[container.UUID] = true
- dispatcher.containers <- container
+ dispatcher.handleUpdate(container)
}
}
-func (dispatcher *Dispatcher) pollContainers() {
+func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
ticker := time.NewTicker(dispatcher.PollInterval)
+ defer ticker.Stop()
paramsQ := arvadosclient.Dict{
"filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
"limit": "1000"}
for {
+ touched := make(map[string]bool)
+ dispatcher.getContainers(paramsQ, touched)
+ dispatcher.getContainers(paramsP, touched)
+ dispatcher.mineMutex.Lock()
+ var monitored []string
+ for k := range dispatcher.mineMap {
+ if _, ok := touched[k]; !ok {
+ monitored = append(monitored, k)
+ }
+ }
+ dispatcher.mineMutex.Unlock()
+ if monitored != nil {
+ dispatcher.getContainers(arvadosclient.Dict{
+ "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+ }
select {
case <-ticker.C:
- touched := make(map[string]bool)
- dispatcher.getContainers(paramsQ, touched)
- dispatcher.getContainers(paramsP, touched)
- dispatcher.mineMutex.Lock()
- var monitored []string
- for k := range dispatcher.mineMap {
- if _, ok := touched[k]; !ok {
- monitored = append(monitored, k)
- }
- }
- dispatcher.mineMutex.Unlock()
- if monitored != nil {
- dispatcher.getContainers(arvadosclient.Dict{
- "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
- }
- case <-dispatcher.DoneProcessing:
- close(dispatcher.containers)
- ticker.Stop()
+ case <-stop:
return
}
}
}
-func (dispatcher *Dispatcher) handleUpdate(container Container) {
+func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
+ if container.State == Queued && dispatcher.checkMine(container, false) {
+ // If we previously started the job, something failed, and it
+ // was re-queued, this dispatcher might still be monitoring it.
+ // Stop the existing monitor, then try to lock and run it
+ // again.
+ dispatcher.notMine(container.UUID)
+ }
+
if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
// If container is Complete, Cancelled, or Queued, LockedByUUID
// will be nil. If the container was formerly Locked, moved
return
}
- if dispatcher.updateMine(container) {
+ if dispatcher.checkMine(container, true) {
// Already monitored, sent status update
return
}
- if container.State == Queued {
+ if container.State == Queued && container.Priority > 0 {
// Try to take the lock
- if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+ if err := dispatcher.Lock(container.UUID); err != nil {
return
}
container.State = Locked
}
// UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
err := dispatcher.Arv.Update("containers", uuid,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": newState}},
return err
}
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel. It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
- err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
+// Lock makes the lock API call which updates the state of a container to Locked.
+func (dispatcher *Dispatcher) Lock(uuid string) error {
+ err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
if err != nil {
- log.Printf("Error getting my token UUID: %v", err)
- return
+ log.Printf("Error locking container %s: %q", uuid, err)
}
+ return err
+}
- dispatcher.mineMap = make(map[string]chan Container)
- dispatcher.containers = make(chan Container)
-
- // Graceful shutdown on signal
- sigChan := make(chan os.Signal)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
- go func(sig <-chan os.Signal) {
- for sig := range sig {
- log.Printf("Caught signal: %v", sig)
- dispatcher.DoneProcessing <- struct{}{}
- }
- }(sigChan)
+// Unlock makes the unlock API call which updates the state of a container to Queued.
+func (dispatcher *Dispatcher) Unlock(uuid string) error {
+ err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
+ if err != nil {
+ log.Printf("Error unlocking container %s: %q", uuid, err)
+ }
+ return err
+}
- defer close(sigChan)
- defer signal.Stop(sigChan)
+// Stop causes Run to return after the current polling cycle.
+func (dispatcher *Dispatcher) Stop() {
+ if dispatcher.stop == nil {
+ // already stopped
+ return
+ }
+ close(dispatcher.stop)
+ dispatcher.stop = nil
+}
- go dispatcher.pollContainers()
- for container := range dispatcher.containers {
- dispatcher.handleUpdate(container)
+// Run runs the main loop of the dispatcher.
+func (dispatcher *Dispatcher) Run() (err error) {
+ err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
+ if err != nil {
+ log.Printf("Error getting my token UUID: %v", err)
+ return
}
+ dispatcher.mineMap = make(map[string]chan arvados.Container)
+ dispatcher.stop = make(chan struct{})
+ dispatcher.pollContainers(dispatcher.stop)
return nil
}