"log"
"os"
"os/exec"
+ "os/signal"
+ "syscall"
"time"
)
}
var arv arvadosclient.ArvadosClient
+var runningCmds map[string]*exec.Cmd
func doMain() error {
flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
return err
}
+ runningCmds = make(map[string]*exec.Cmd)
+ sigChan = make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+ go func(sig <-chan os.Signal) {
+ for sig := range sig {
+ doneProcessing <- true
+ caught := sig
+ for uuid, cmd := range runningCmds {
+ cmd.Process.Signal(caught)
+ if _, err := cmd.Process.Wait(); err != nil {
+ log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+ }
+ }
+ }
+ }(sigChan)
+
// channel to terminate
doneProcessing = make(chan bool)
}
var doneProcessing chan bool
+var sigChan chan os.Signal
// Poll for queued containers using pollInterval.
// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
return
}
+ runningCmds[uuid] = cmd
+
log.Printf("Started container run for %v", uuid)
err := arv.Update("containers", uuid,
if container.Priority == 0 {
priorityTicker.Stop()
cmd.Process.Signal(os.Interrupt)
+ delete(runningCmds, uuid)
return
}
}
if _, err := cmd.Process.Wait(); err != nil {
log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
}
+ delete(runningCmds, uuid)
priorityTicker.Stop()
"net/http/httptest"
"os"
"strings"
+ "syscall"
"testing"
"time"
os.Args = append(os.Args, args...)
go func() {
- time.Sleep(2 * time.Second)
- doneProcessing <- true
+ time.Sleep(5 * time.Second)
+ sigChan <- syscall.SIGINT
}()
err := doMain()
c.Check(err, IsNil)
- // Give some time for run goroutine to complete
- time.Sleep(1 * time.Second)
-
// There should be no queued containers now
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
go func() {
time.Sleep(1 * time.Second)
- doneProcessing <- true
+ sigChan <- syscall.SIGTERM
}()
runQueuedContainers(1, 1, crunchCmd)