6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
25 arv arvadosclient.ArvadosClient
26 runningCmds map[string]*exec.Cmd
27 runningCmdsMutex sync.Mutex
28 waitGroup sync.WaitGroup
29 doneProcessing chan bool
30 sigChan chan os.Signal
34 flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
36 pollInterval := flags.Int(
39 "Interval in seconds to poll for queued containers")
41 priorityPollInterval := flags.Int(
42 "container-priority-poll-interval",
44 "Interval in seconds to check priority of a dispatched container")
46 crunchRunCommand := flags.String(
48 "/usr/bin/crunch-run",
49 "Crunch command to run container")
51 finishCommand := flags.String(
53 "/usr/bin/crunch-finish-slurm.sh",
54 "Command to run from strigger when job is finished")
56 // Parse args; omit the first arg which is the command name
57 flags.Parse(os.Args[1:])
60 arv, err = arvadosclient.MakeArvadosClient()
65 // Channel to terminate
66 doneProcessing = make(chan bool)
69 sigChan = make(chan os.Signal, 1)
70 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
71 go func(sig <-chan os.Signal) {
72 for sig := range sig {
73 log.Printf("Caught signal: %v", sig)
74 doneProcessing <- true
78 // Run all queued containers
79 runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
81 // Wait for all running crunch jobs to complete / terminate
87 // Poll for queued containers using pollInterval.
88 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
90 // Any errors encountered are logged but the program would continue to run (not exit).
91 // This is because, once one or more crunch jobs are running,
92 // we would need to wait for them complete.
93 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
94 ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
99 dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
100 case <-doneProcessing:
108 type Container struct {
109 UUID string `json:"uuid"`
110 State string `json:"state"`
111 Priority int `json:"priority"`
114 // ContainerList is a list of the containers from api
115 type ContainerList struct {
116 Items []Container `json:"items"`
119 // Get the list of queued containers from API server and invoke run for each container.
120 func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
121 params := arvadosclient.Dict{
122 "filters": [][]string{[]string{"state", "=", "Queued"}},
125 var containers ContainerList
126 err := arv.List("containers", params, &containers)
128 log.Printf("Error getting list of queued containers: %q", err)
132 for i := 0; i < len(containers.Items); i++ {
133 log.Printf("About to submit queued container %v", containers.Items[i].UUID)
135 go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
139 func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
143 if submiterr != nil {
144 // This really should be an "Error" state, see #8018
145 updateErr := arv.Update("containers", container.UUID,
147 "container": arvadosclient.Dict{"state": "Complete"}},
149 if updateErr != nil {
150 log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
155 cmd := exec.Command("sbatch", "--job-name="+container.UUID, "--share", "--parsable")
156 stdinWriter, stdinerr := cmd.StdinPipe()
158 submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
162 stdoutReader, stdouterr := cmd.StdoutPipe()
163 if stdouterr != nil {
164 submiterr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdouterr)
168 stderrReader, stderrerr := cmd.StderrPipe()
169 if stderrerr != nil {
170 submiterr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrerr)
176 submiterr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
180 stdoutchan := make(chan []byte)
182 b, _ := ioutil.ReadAll(stdoutReader)
187 stderrchan := make(chan []byte)
189 b, _ := ioutil.ReadAll(stderrReader)
194 fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
199 stdoutmsg := <-stdoutchan
200 stderrmsg := <-stderrchan
203 submiterr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
207 jobid = string(stdoutmsg)
212 func strigger(jobid, containerUUID, finishCommand string) {
213 cmd := exec.Command("strigger", "--set", "--jobid="+jobid, "--fini", fmt.Sprintf("--program=%s", finishCommand))
214 cmd.Stdout = os.Stdout
215 cmd.Stderr = os.Stderr
218 log.Printf("While setting up strigger: %v", err)
222 // Run queued container:
223 // Set container state to locked (TBD)
224 // Run container using the given crunch-run command
225 // Set the container state to Running
226 // If the container priority becomes zero while crunch job is still running, terminate it.
227 func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
229 jobid, err := submit(container, crunchRunCommand)
231 log.Printf("Error queuing container run: %v", err)
235 strigger(jobid, container.UUID, finishCommand)
237 // Update container status to Running
238 err = arv.Update("containers", container.UUID,
240 "container": arvadosclient.Dict{"state": "Running"}},
243 log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
246 log.Printf("Submitted container run for %v", container.UUID)
248 containerUUID := container.UUID
250 // A goroutine to terminate the runner if container priority becomes zero
251 priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
253 for _ = range priorityTicker.C {
254 var container Container
255 err := arv.Get("containers", containerUUID, nil, &container)
257 log.Printf("Error getting container info for %v: %q", container.UUID, err)
259 if container.Priority == 0 {
260 log.Printf("Canceling container %v", container.UUID)
261 priorityTicker.Stop()
262 cancelcmd := exec.Command("scancel", "--name="+container.UUID)
265 if container.State == "Complete" {
266 priorityTicker.Stop()