// Dispatcher service for Crunch that submits containers to the slurm queue.
import (
- "bufio"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
"io/ioutil"
var (
crunchRunCommand *string
- finishCommand *string
+ squeueUpdater Squeue
)
func doMain() error {
"/usr/bin/crunch-run",
"Crunch command to run container")
- finishCommand = flags.String(
- "finish-command",
- "/usr/bin/crunch-finish-slurm.sh",
- "Command to run from strigger when job is finished")
-
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
}
arv.Retries = 25
+ squeueUpdater.StartMonitor(time.Duration(*pollInterval) * time.Second)
+ defer squeueUpdater.Done()
+
dispatcher := dispatch.Dispatcher{
Arv: arv,
RunContainer: run,
}
// sbatchCmd
-func sbatchFunc(container dispatch.Container) *exec.Cmd {
- memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
+func sbatchFunc(container arvados.Container) *exec.Cmd {
+ memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
return exec.Command("sbatch", "--share", "--parsable",
fmt.Sprintf("--job-name=%s", container.UUID),
fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
- fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+ fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs),
fmt.Sprintf("--priority=%d", container.Priority))
}
-// striggerCmd
-func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
- return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
- fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
-}
-
-// squeueFunc
-func squeueFunc() *exec.Cmd {
- return exec.Command("squeue", "--format=%j")
+// scancelCmd
+func scancelFunc(container arvados.Container) *exec.Cmd {
+ return exec.Command("scancel", "--name="+container.UUID)
}
// Wrap these so that they can be overridden by tests
-var striggerCmd = striggerFunc
var sbatchCmd = sbatchFunc
-var squeueCmd = squeueFunc
+var scancelCmd = scancelFunc
// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher,
- container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
+ container arvados.Container, crunchRunCommand string) (jobid string, submitErr error) {
submitErr = nil
defer func() {
return
}
+ // Mutex between squeue sync and running sbatch or scancel.
+ squeueUpdater.SlurmLock.Lock()
+ defer squeueUpdater.SlurmLock.Unlock()
+
err := cmd.Start()
if err != nil {
submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
return
}
-// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
-// the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
- insecure := "0"
- if arv.ApiInsecure {
- insecure = "1"
- }
- cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- err := cmd.Run()
- if err != nil {
- log.Printf("While setting up strigger: %v", err)
- // BUG: we drop the error here and forget about it. A
- // human has to notice the container is stuck in
- // Running state, and fix it manually.
- }
-}
-
-func checkSqueue(uuid string) (bool, error) {
- cmd := squeueCmd()
- sq, err := cmd.StdoutPipe()
- if err != nil {
- return false, err
- }
- cmd.Start()
- defer cmd.Wait()
- scanner := bufio.NewScanner(sq)
- found := false
- for scanner.Scan() {
- if scanner.Text() == uuid {
- found = true
- }
- }
- if err := scanner.Err(); err != nil {
- return false, err
- }
- return found, nil
-}
-
-// Run or monitor a container.
-//
// If the container is marked as Locked, check if it is already in the slurm
// queue. If not, submit it.
//
// If the container is marked as Running, check if it is in the slurm queue.
// If not, mark it as Cancelled.
-//
-// Monitor status updates. If the priority changes to zero, cancel the
-// container using scancel.
-func run(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
-
- uuid := container.UUID
+func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
+ submitted := false
+ for !*monitorDone {
+ if squeueUpdater.CheckSqueue(container.UUID) {
+ // Found in the queue, so continue monitoring
+ submitted = true
+ } else if container.State == dispatch.Locked && !submitted {
+ // Not in queue but in Locked state and we haven't
+ // submitted it yet, so submit it.
- if container.State == dispatch.Locked {
- if inQ, err := checkSqueue(container.UUID); err != nil {
- log.Printf("Error running squeue: %v", err)
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
- } else if !inQ {
log.Printf("About to submit queued container %v", container.UUID)
- jobid, err := submit(dispatcher, container, *crunchRunCommand)
+ if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
+ log.Printf("Error submitting container %s to slurm: %v",
+ container.UUID, err)
+ // maybe sbatch is broken, put it back to queued
+ dispatcher.UpdateState(container.UUID, dispatch.Queued)
+ }
+ submitted = true
+ } else {
+ // Not in queue and we are not going to submit it.
+ // Refresh the container state. If it is
+ // Complete/Cancelled, do nothing, if it is Locked then
+ // release it back to the Queue, if it is Running then
+ // clean up the record.
+
+ var con arvados.Container
+ err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
if err != nil {
- log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
- } else {
- finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
+ log.Printf("Error getting final container state: %v", err)
}
- }
- } else if container.State == dispatch.Running {
- if inQ, err := checkSqueue(container.UUID); err != nil {
- log.Printf("Error running squeue: %v", err)
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
- } else if !inQ {
- log.Printf("Container %s in Running state but not in slurm queue, marking Cancelled.", container.UUID)
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+
+ var st arvados.ContainerState
+ switch con.State {
+ case dispatch.Locked:
+ st = dispatch.Queued
+ case dispatch.Running:
+ st = dispatch.Cancelled
+ default:
+ // Container state is Queued, Complete or Cancelled so stop monitoring it.
+ return
+ }
+
+ log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+ container.UUID, con.State, st)
+ dispatcher.UpdateState(container.UUID, st)
}
}
+}
- log.Printf("Monitoring container %v started", uuid)
+// Run or monitor a container.
+//
+// Monitor status updates. If the priority changes to zero, cancel the
+// container using scancel.
+func run(dispatcher *dispatch.Dispatcher,
+ container arvados.Container,
+ status chan arvados.Container) {
- for container = range status {
- if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
- log.Printf("Canceling container %s", container.UUID)
+ log.Printf("Monitoring container %v started", container.UUID)
+ defer log.Printf("Monitoring container %v finished", container.UUID)
- err := exec.Command("scancel", "--name="+container.UUID).Run()
- if err != nil {
- log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
- if inQ, err := checkSqueue(container.UUID); err != nil {
- log.Printf("Error running squeue: %v", err)
- continue
- } else if inQ {
- log.Printf("Container %s is still in squeue after scancel.", container.UUID)
- continue
+ monitorDone := false
+ go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
+
+ for container = range status {
+ if container.State == dispatch.Locked || container.State == dispatch.Running {
+ if container.Priority == 0 {
+ log.Printf("Canceling container %s", container.UUID)
+
+ // Mutex between squeue sync and running sbatch or scancel.
+ squeueUpdater.SlurmLock.Lock()
+ err := scancelCmd(container).Run()
+ squeueUpdater.SlurmLock.Unlock()
+
+ if err != nil {
+ log.Printf("Error stopping container %s with scancel: %v",
+ container.UUID, err)
+ if squeueUpdater.CheckSqueue(container.UUID) {
+ log.Printf("Container %s is still in squeue after scancel.",
+ container.UUID)
+ continue
+ }
}
- }
- err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ }
}
}
-
- log.Printf("Monitoring container %v finished", uuid)
+ monitorDone = true
}