"bytes"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/config"
- "git.curoverse.com/arvados.git/sdk/go/dispatch"
- "github.com/coreos/go-systemd/daemon"
"log"
"math"
"os"
"os/exec"
"strings"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "github.com/coreos/go-systemd/daemon"
)
// Config used by crunch-dispatch-slurm
}
var (
- theConfig Config
- squeueUpdater Squeue
+ theConfig Config
+ sqCheck SqueueChecker
)
const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
"config",
defaultConfigPath,
"`path` to JSON or YAML configuration file")
+ dumpConfig := flag.Bool(
+ "dump-config",
+ false,
+ "write current configuration to stdout and exit")
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
}
+ if *dumpConfig {
+ log.Fatal(config.DumpAndExit(theConfig))
+ }
+
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Printf("Error making Arvados client: %v", err)
}
arv.Retries = 25
- squeueUpdater.StartMonitor(time.Duration(theConfig.PollPeriod))
- defer squeueUpdater.Done()
+ sqCheck = SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
+ defer sqCheck.Stop()
dispatcher := dispatch.Dispatcher{
Arv: arv,
RunContainer: run,
- PollInterval: time.Duration(theConfig.PollPeriod),
+ PollPeriod: time.Duration(theConfig.PollPeriod),
MinRetryPeriod: time.Duration(theConfig.MinRetryPeriod),
}
cmd.Stderr = &stderr
// Mutex between squeue sync and running sbatch or scancel.
- squeueUpdater.SlurmLock.Lock()
- defer squeueUpdater.SlurmLock.Unlock()
+ sqCheck.L.Lock()
+ defer sqCheck.L.Unlock()
log.Printf("exec sbatch %+q", cmd.Args)
err := cmd.Run()
func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
submitted := false
for !*monitorDone {
- if squeueUpdater.CheckSqueue(container.UUID) {
+ if sqCheck.HasUUID(container.UUID) {
// Found in the queue, so continue monitoring
submitted = true
} else if container.State == dispatch.Locked && !submitted {
go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
for container = range status {
- if !(container.State == dispatch.Locked || container.State == dispatch.Running) {
- continue
- }
- if container.Priority != 0 {
- continue
- }
- log.Printf("Canceling container %s", container.UUID)
+ if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
+ log.Printf("Canceling container %s", container.UUID)
+ // Mutex between squeue sync and running sbatch or scancel.
+ sqCheck.L.Lock()
+ cmd := scancelCmd(container)
+ msg, err := cmd.CombinedOutput()
+ sqCheck.L.Unlock()
- // Mutex between squeue sync and running sbatch or scancel.
- squeueUpdater.SlurmLock.Lock()
- cmd := scancelCmd(container)
- msg, err := cmd.CombinedOutput()
- squeueUpdater.SlurmLock.Unlock()
-
- if err != nil {
- log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
- if squeueUpdater.CheckSqueue(container.UUID) {
- log.Printf("Container %s is still in squeue after scancel.", container.UUID)
- continue
+ if err != nil {
+ log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
+ if sqCheck.HasUUID(container.UUID) {
+ log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+ continue
+ }
}
- }
- // Ignore errors; if necessary, we'll try again next time
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ // Ignore errors; if necessary, we'll try again next time
+ dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ }
}
monitorDone = true
}