X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4153cb6cfad920ed0b1a4b818d3bcc8de492d134..8734a7391a5672eebcdf572d93bae1b3ed1179c9:/services/crunch-dispatch-local/crunch-dispatch-local.go diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go index cc472a4031..bb3c05c7eb 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local.go @@ -4,12 +4,15 @@ package main import ( "flag" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/dispatch" "log" "os" "os/exec" + "os/signal" "sync" + "syscall" "time" ) @@ -53,16 +56,24 @@ func doMain() error { arv.Retries = 25 dispatcher := dispatch.Dispatcher{ - Arv: arv, - RunContainer: run, - PollInterval: time.Duration(*pollInterval) * time.Second, - DoneProcessing: make(chan struct{})} + Arv: arv, + RunContainer: run, + PollPeriod: time.Duration(*pollInterval) * time.Second, + } - err = dispatcher.RunDispatcher() + err = dispatcher.Run() if err != nil { return err } + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) + sig := <-c + log.Printf("Received %s, shutting down", sig) + signal.Stop(c) + + dispatcher.Stop() + runningCmdsMutex.Lock() // Finished dispatching; interrupt any crunch jobs that are still running for _, cmd := range runningCmds { @@ -76,7 +87,7 @@ func doMain() error { return nil } -func startFunc(container dispatch.Container, cmd *exec.Cmd) error { +func startFunc(container arvados.Container, cmd *exec.Cmd) error { return cmd.Start() } @@ -91,8 +102,8 @@ var startCmd = startFunc // If the container is in any other state, or is not Complete/Cancelled after // crunch-run terminates, mark the container as Cancelled. func run(dispatcher *dispatch.Dispatcher, - container dispatch.Container, - status chan dispatch.Container) { + container arvados.Container, + status chan arvados.Container) { uuid := container.UUID @@ -159,14 +170,15 @@ func run(dispatcher *dispatch.Dispatcher, if err != nil { log.Printf("Error getting final container state: %v", err) } - if container.State != dispatch.Complete && container.State != dispatch.Cancelled { + if container.LockedByUUID == dispatcher.Auth.UUID && + (container.State == dispatch.Locked || container.State == dispatch.Running) { log.Printf("After %s process termination, container state for %v is %q. Updating it to %q", *crunchRunCommand, container.State, uuid, dispatch.Cancelled) dispatcher.UpdateState(uuid, dispatch.Cancelled) } // drain any subsequent status changes - for _ = range status { + for range status { } log.Printf("Finalized container %v", uuid)