"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"log"
- "os"
- "os/signal"
"sync"
- "syscall"
"time"
)
// Amount of time to wait between polling for updates.
PollInterval time.Duration
- // Channel used to signal that RunDispatcher loop should exit.
- DoneProcessing chan struct{}
+ mineMutex sync.Mutex
+ mineMap map[string]chan arvados.Container
+ Auth arvados.APIClientAuthorization
- mineMutex sync.Mutex
- mineMap map[string]chan arvados.Container
- Auth arvados.APIClientAuthorization
- containers chan arvados.Container
+ stop chan struct{}
}
// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
}
for _, container := range containers.Items {
touched[container.UUID] = true
- dispatcher.containers <- container
+ dispatcher.handleUpdate(container)
}
}
-func (dispatcher *Dispatcher) pollContainers() {
+func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
ticker := time.NewTicker(dispatcher.PollInterval)
+ defer ticker.Stop()
paramsQ := arvadosclient.Dict{
"filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
"limit": "1000"}
for {
+ touched := make(map[string]bool)
+ dispatcher.getContainers(paramsQ, touched)
+ dispatcher.getContainers(paramsP, touched)
+ dispatcher.mineMutex.Lock()
+ var monitored []string
+ for k := range dispatcher.mineMap {
+ if _, ok := touched[k]; !ok {
+ monitored = append(monitored, k)
+ }
+ }
+ dispatcher.mineMutex.Unlock()
+ if monitored != nil {
+ dispatcher.getContainers(arvadosclient.Dict{
+ "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+ }
select {
case <-ticker.C:
- touched := make(map[string]bool)
- dispatcher.getContainers(paramsQ, touched)
- dispatcher.getContainers(paramsP, touched)
- dispatcher.mineMutex.Lock()
- var monitored []string
- for k := range dispatcher.mineMap {
- if _, ok := touched[k]; !ok {
- monitored = append(monitored, k)
- }
- }
- dispatcher.mineMutex.Unlock()
- if monitored != nil {
- dispatcher.getContainers(arvadosclient.Dict{
- "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
- }
- case <-dispatcher.DoneProcessing:
- close(dispatcher.containers)
- ticker.Stop()
+ case <-stop:
return
}
}
return err
}
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel. It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
+// Stop causes Run to return after the current polling cycle.
+func (dispatcher *Dispatcher) Stop() {
+ if dispatcher.stop == nil {
+ // already stopped
+ return
+ }
+ close(dispatcher.stop)
+ dispatcher.stop = nil
+}
+
+// Run runs the main loop of the dispatcher.
+func (dispatcher *Dispatcher) Run() (err error) {
err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
if err != nil {
log.Printf("Error getting my token UUID: %v", err)
}
dispatcher.mineMap = make(map[string]chan arvados.Container)
- dispatcher.containers = make(chan arvados.Container)
-
- // Graceful shutdown on signal
- sigChan := make(chan os.Signal)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
- go func(sig <-chan os.Signal) {
- for sig := range sig {
- log.Printf("Caught signal: %v", sig)
- dispatcher.DoneProcessing <- struct{}{}
- }
- }(sigChan)
-
- defer close(sigChan)
- defer signal.Stop(sigChan)
-
- go dispatcher.pollContainers()
- for container := range dispatcher.containers {
- dispatcher.handleUpdate(container)
- }
-
+ dispatcher.stop = make(chan struct{})
+ dispatcher.pollContainers(dispatcher.stop)
return nil
}
echo := "echo"
crunchRunCommand = &echo
- doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollInterval: time.Second,
container arvados.Container,
status chan arvados.Container) {
run(dispatcher, container, status)
- doneProcessing <- struct{}{}
+ dispatcher.Stop()
},
- DoneProcessing: doneProcessing}
+ }
startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
dispatcher.UpdateState(container.UUID, "Running")
return cmd.Start()
}
- err = dispatcher.RunDispatcher()
+ err = dispatcher.Run()
c.Assert(err, IsNil)
// Wait for all running crunch jobs to complete / terminate
*crunchRunCommand = crunchCmd
- doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
container arvados.Container,
status chan arvados.Container) {
run(dispatcher, container, status)
- doneProcessing <- struct{}{}
+ dispatcher.Stop()
},
- DoneProcessing: doneProcessing}
+ }
startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
dispatcher.UpdateState(container.UUID, "Running")
for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
time.Sleep(100 * time.Millisecond)
}
- dispatcher.DoneProcessing <- struct{}{}
+ dispatcher.Stop()
}()
- err := dispatcher.RunDispatcher()
+ err := dispatcher.Run()
c.Assert(err, IsNil)
// Wait for all running crunch jobs to complete / terminate
theConfig.CrunchRunCommand = []string{"echo"}
- doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
status chan arvados.Container) {
go runContainer(dispatcher, container)
run(dispatcher, container, status)
- doneProcessing <- struct{}{}
+ dispatcher.Stop()
},
- DoneProcessing: doneProcessing}
+ }
squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
- err = dispatcher.RunDispatcher()
+ err = dispatcher.Run()
c.Assert(err, IsNil)
squeueUpdater.Done()
theConfig.CrunchRunCommand = []string{crunchCmd}
- doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
dispatcher.UpdateState(container.UUID, dispatch.Complete)
}()
run(dispatcher, container, status)
- doneProcessing <- struct{}{}
+ dispatcher.Stop()
},
- DoneProcessing: doneProcessing}
+ }
go func() {
for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
time.Sleep(100 * time.Millisecond)
}
- dispatcher.DoneProcessing <- struct{}{}
+ dispatcher.Stop()
}()
- err := dispatcher.RunDispatcher()
+ err := dispatcher.Run()
c.Assert(err, IsNil)
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)