7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
28 arv arvadosclient.ArvadosClient
29 runningCmds map[string]*exec.Cmd
30 runningCmdsMutex sync.Mutex
31 waitGroup sync.WaitGroup
32 doneProcessing chan bool
33 sigChan chan os.Signal
37 flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
39 pollInterval := flags.Int(
42 "Interval in seconds to poll for queued containers")
44 priorityPollInterval := flags.Int(
45 "container-priority-poll-interval",
47 "Interval in seconds to check priority of a dispatched container")
49 crunchRunCommand := flags.String(
51 "/usr/bin/crunch-run",
52 "Crunch command to run container")
54 finishCommand := flags.String(
56 "/usr/bin/crunch-finish-slurm.sh",
57 "Command to run from strigger when job is finished")
59 // Parse args; omit the first arg which is the command name
60 flags.Parse(os.Args[1:])
63 arv, err = arvadosclient.MakeArvadosClient()
68 // Channel to terminate
69 doneProcessing = make(chan bool)
72 sigChan = make(chan os.Signal, 1)
73 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
74 go func(sig <-chan os.Signal) {
75 for sig := range sig {
76 log.Printf("Caught signal: %v", sig)
77 doneProcessing <- true
81 // Run all queued containers
82 runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
84 // Wait for all running crunch jobs to complete / terminate
90 type apiClientAuthorization struct {
91 UUID string `json:"uuid"`
92 APIToken string `json:"api_token"`
95 type apiClientAuthorizationList struct {
96 Items []apiClientAuthorization `json:"items"`
99 // Poll for queued containers using pollInterval.
100 // Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
102 // Any errors encountered are logged but the program would continue to run (not exit).
103 // This is because, once one or more crunch jobs are running,
104 // we would need to wait for them complete.
105 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
106 var auth apiClientAuthorization
107 err := arv.Call("GET", "api_client_authorizations", "", "current", nil, &auth)
109 log.Printf("Error getting my token UUID: %v", err)
113 ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
117 dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
118 case <-doneProcessing:
126 type Container struct {
127 UUID string `json:"uuid"`
128 State string `json:"state"`
129 Priority int `json:"priority"`
130 RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
131 LockedByUUID string `json:"locked_by_uuid"`
134 // ContainerList is a list of the containers from api
135 type ContainerList struct {
136 Items []Container `json:"items"`
139 // Get the list of queued containers from API server and invoke run
140 // for each container.
141 func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
142 params := arvadosclient.Dict{
143 "filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
146 var containers ContainerList
147 err := arv.List("containers", params, &containers)
149 log.Printf("Error getting list of queued containers: %q", err)
153 for _, container := range containers.Items {
154 if container.State == "Locked" {
155 if container.LockedByUUID != auth.UUID {
156 // Locked by a different dispatcher
158 } else if checkMine(container.UUID) {
159 // I already have a goroutine running
160 // for this container: it just hasn't
161 // gotten past Locked state yet.
164 log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
165 "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
166 container.UUID, auth.UUID)
167 setMine(container.UUID, true)
169 waitContainer(container, pollInterval)
170 setMine(container.UUID, false)
173 go run(container, crunchRunCommand, finishCommand, pollInterval)
178 func sbatchFunc(container Container) *exec.Cmd {
179 memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
180 return exec.Command("sbatch", "--share", "--parsable",
181 "--job-name="+container.UUID,
182 "--mem-per-cpu="+strconv.Itoa(int(memPerCPU)),
183 "--cpus-per-task="+strconv.Itoa(int(container.RuntimeConstraints["vcpus"])))
186 var sbatchCmd = sbatchFunc
189 func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
190 return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
191 fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
194 var striggerCmd = striggerFunc
196 // Submit job to slurm using sbatch.
197 func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
201 // If we didn't get as far as submitting a slurm job,
202 // unlock the container and return it to the queue.
203 if submitErr == nil {
204 // OK, no cleanup needed
207 err := arv.Update("containers", container.UUID,
209 "container": arvadosclient.Dict{"state": "Queued"}},
212 log.Printf("Error unlocking container %s: %v", container.UUID, err)
216 // Create the command and attach to stdin/stdout
217 cmd := sbatchCmd(container)
218 stdinWriter, stdinerr := cmd.StdinPipe()
220 submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
224 stdoutReader, stdoutErr := cmd.StdoutPipe()
225 if stdoutErr != nil {
226 submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
230 stderrReader, stderrErr := cmd.StderrPipe()
231 if stderrErr != nil {
232 submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
238 submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
242 stdoutChan := make(chan []byte)
244 b, _ := ioutil.ReadAll(stdoutReader)
250 stderrChan := make(chan []byte)
252 b, _ := ioutil.ReadAll(stderrReader)
258 // Send a tiny script on stdin to execute the crunch-run command
259 // slurm actually enforces that this must be a #! script
260 fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
265 stdoutMsg := <-stdoutChan
266 stderrmsg := <-stderrChan
269 submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
273 // If everything worked out, got the jobid on stdout
274 jobid = string(stdoutMsg)
279 // finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
280 // the slurm controller when the job finishes.
281 func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
282 cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
283 cmd.Stdout = os.Stdout
284 cmd.Stderr = os.Stderr
287 log.Printf("While setting up strigger: %v", err)
288 // BUG: we drop the error here and forget about it. A
289 // human has to notice the container is stuck in
290 // Running state, and fix it manually.
294 // Run a queued container: [1] Set container state to locked. [2]
295 // Execute crunch-run as a slurm batch job. [3] waitContainer().
296 func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
297 setMine(container.UUID, true)
298 defer setMine(container.UUID, false)
300 // Update container status to Locked. This will fail if
301 // another dispatcher (token) has already locked it. It will
302 // succeed if *this* dispatcher has already locked it.
303 err := arv.Update("containers", container.UUID,
305 "container": arvadosclient.Dict{"state": "Locked"}},
308 log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
312 log.Printf("About to submit queued container %v", container.UUID)
314 jobid, err := submit(container, crunchRunCommand)
316 log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
324 finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
326 // Update container status to Running. This will fail if
327 // another dispatcher (token) has already locked it. It will
328 // succeed if *this* dispatcher has already locked it.
329 err = arv.Update("containers", container.UUID,
331 "container": arvadosclient.Dict{"state": "Running"}},
334 log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
336 log.Printf("Submitted container %v to slurm", container.UUID)
337 waitContainer(container, pollInterval)
340 // Wait for a container to finish. Cancel the slurm job if the
341 // container priority changes to zero before it ends.
342 func waitContainer(container Container, pollInterval time.Duration) {
343 log.Printf("Monitoring container %v started", container.UUID)
344 defer log.Printf("Monitoring container %v finished", container.UUID)
346 pollTicker := time.NewTicker(pollInterval)
347 defer pollTicker.Stop()
348 for _ = range pollTicker.C {
349 var updated Container
350 err := arv.Get("containers", container.UUID, nil, &updated)
352 log.Printf("Error getting container %s: %q", container.UUID, err)
355 if updated.State == "Complete" || updated.State == "Cancelled" {
358 if updated.Priority != 0 {
362 // Priority is zero, but state is Running or Locked
363 log.Printf("Canceling container %s", container.UUID)
365 err = exec.Command("scancel", "--name="+container.UUID).Run()
367 log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
368 if inQ, err := checkSqueue(container.UUID); err != nil {
369 log.Printf("Error running squeue: %v", err)
372 log.Printf("Container %s is still in squeue; will retry", container.UUID)
377 err = arv.Update("containers", container.UUID,
379 "container": arvadosclient.Dict{"state": "Cancelled"}},
382 log.Printf("Error updating state for container %s: %s", container.UUID, err)
390 func checkSqueue(uuid string) (bool, error) {
391 cmd := exec.Command("squeue", "--format=%j")
392 sq, err := cmd.StdoutPipe()
398 scanner := bufio.NewScanner(sq)
401 if scanner.Text() == uuid {
405 if err := scanner.Err(); err != nil {
411 var mineMutex sync.RWMutex
412 var mineMap = make(map[string]bool)
414 // Goroutine-safely add/remove uuid to the set of "my" containers,
415 // i.e., ones for which this process has a goroutine running.
416 func setMine(uuid string, t bool) {
421 delete(mineMap, uuid)
426 // Check whether there is already a goroutine running for this
428 func checkMine(uuid string) bool {
429 mineMutex.RLocker().Lock()
430 defer mineMutex.RLocker().Unlock()