"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"log"
- "os"
- "os/signal"
"sync"
- "syscall"
"time"
)
// Dispatcher holds the state of the dispatcher
type Dispatcher struct {
// The Arvados client
- Arv arvadosclient.ArvadosClient
+ Arv *arvadosclient.ArvadosClient
// When a new queued container appears and is either already owned by
// this dispatcher or is successfully locked, the dispatcher will call
// Amount of time to wait between polling for updates.
PollInterval time.Duration
- // Channel used to signal that RunDispatcher loop should exit.
- DoneProcessing chan struct{}
+ mineMutex sync.Mutex
+ mineMap map[string]chan arvados.Container
+ Auth arvados.APIClientAuthorization
- mineMutex sync.Mutex
- mineMap map[string]chan arvados.Container
- Auth arvados.APIClientAuthorization
- containers chan arvados.Container
+ stop chan struct{}
}
// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
}
for _, container := range containers.Items {
touched[container.UUID] = true
- dispatcher.containers <- container
+ dispatcher.handleUpdate(container)
}
}
-func (dispatcher *Dispatcher) pollContainers() {
+func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
ticker := time.NewTicker(dispatcher.PollInterval)
+ defer ticker.Stop()
paramsQ := arvadosclient.Dict{
"filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
"limit": "1000"}
for {
+ touched := make(map[string]bool)
+ dispatcher.getContainers(paramsQ, touched)
+ dispatcher.getContainers(paramsP, touched)
+ dispatcher.mineMutex.Lock()
+ var monitored []string
+ for k := range dispatcher.mineMap {
+ if _, ok := touched[k]; !ok {
+ monitored = append(monitored, k)
+ }
+ }
+ dispatcher.mineMutex.Unlock()
+ if monitored != nil {
+ dispatcher.getContainers(arvadosclient.Dict{
+ "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+ }
select {
case <-ticker.C:
- touched := make(map[string]bool)
- dispatcher.getContainers(paramsQ, touched)
- dispatcher.getContainers(paramsP, touched)
- dispatcher.mineMutex.Lock()
- var monitored []string
- for k := range dispatcher.mineMap {
- if _, ok := touched[k]; !ok {
- monitored = append(monitored, k)
- }
- }
- dispatcher.mineMutex.Unlock()
- if monitored != nil {
- dispatcher.getContainers(arvadosclient.Dict{
- "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
- }
- case <-dispatcher.DoneProcessing:
- close(dispatcher.containers)
- ticker.Stop()
+ case <-stop:
return
}
}
if container.State == Queued && container.Priority > 0 {
// Try to take the lock
- if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+ if err := dispatcher.Lock(container.UUID); err != nil {
return
}
container.State = Locked
return err
}
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel. It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
- err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
+// Lock makes the lock API call which updates the state of a container to Locked.
+func (dispatcher *Dispatcher) Lock(uuid string) error {
+ err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
if err != nil {
- log.Printf("Error getting my token UUID: %v", err)
- return
+ log.Printf("Error locking container %s: %q", uuid, err)
}
+ return err
+}
- dispatcher.mineMap = make(map[string]chan arvados.Container)
- dispatcher.containers = make(chan arvados.Container)
-
- // Graceful shutdown on signal
- sigChan := make(chan os.Signal)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
- go func(sig <-chan os.Signal) {
- for sig := range sig {
- log.Printf("Caught signal: %v", sig)
- dispatcher.DoneProcessing <- struct{}{}
- }
- }(sigChan)
+// Unlock makes the unlock API call which updates the state of a container to Queued.
+func (dispatcher *Dispatcher) Unlock(uuid string) error {
+ err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
+ if err != nil {
+ log.Printf("Error unlocking container %s: %q", uuid, err)
+ }
+ return err
+}
- defer close(sigChan)
- defer signal.Stop(sigChan)
+// Stop causes Run to return after the current polling cycle.
+func (dispatcher *Dispatcher) Stop() {
+ if dispatcher.stop == nil {
+ // already stopped
+ return
+ }
+ close(dispatcher.stop)
+ dispatcher.stop = nil
+}
- go dispatcher.pollContainers()
- for container := range dispatcher.containers {
- dispatcher.handleUpdate(container)
+// Run runs the main loop of the dispatcher.
+func (dispatcher *Dispatcher) Run() (err error) {
+ err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
+ if err != nil {
+ log.Printf("Error getting my token UUID: %v", err)
+ return
}
+ dispatcher.mineMap = make(map[string]chan arvados.Container)
+ dispatcher.stop = make(chan struct{})
+ dispatcher.pollContainers(dispatcher.stop)
return nil
}