6518: Working on using strigger to update job records when crunch-run cannot.
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
1 package main
2
3 import (
4         "flag"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "io/ioutil"
8         "log"
9         "os"
10         "os/exec"
11         "os/signal"
12         "sync"
13         "syscall"
14         "time"
15 )
16
17 func main() {
18         err := doMain()
19         if err != nil {
20                 log.Fatalf("%q", err)
21         }
22 }
23
24 var (
25         arv              arvadosclient.ArvadosClient
26         runningCmds      map[string]*exec.Cmd
27         runningCmdsMutex sync.Mutex
28         waitGroup        sync.WaitGroup
29         doneProcessing   chan bool
30         sigChan          chan os.Signal
31 )
32
33 func doMain() error {
34         flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
35
36         pollInterval := flags.Int(
37                 "poll-interval",
38                 10,
39                 "Interval in seconds to poll for queued containers")
40
41         priorityPollInterval := flags.Int(
42                 "container-priority-poll-interval",
43                 60,
44                 "Interval in seconds to check priority of a dispatched container")
45
46         crunchRunCommand := flags.String(
47                 "crunch-run-command",
48                 "/usr/bin/crunch-run",
49                 "Crunch command to run container")
50
51         finishCommand := flags.String(
52                 "finish-command",
53                 "/usr/bin/crunch-finish-slurm.sh",
54                 "Command to run from strigger when job is finished")
55
56         // Parse args; omit the first arg which is the command name
57         flags.Parse(os.Args[1:])
58
59         var err error
60         arv, err = arvadosclient.MakeArvadosClient()
61         if err != nil {
62                 return err
63         }
64
65         // Channel to terminate
66         doneProcessing = make(chan bool)
67
68         // Graceful shutdown
69         sigChan = make(chan os.Signal, 1)
70         signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
71         go func(sig <-chan os.Signal) {
72                 for sig := range sig {
73                         log.Printf("Caught signal: %v", sig)
74                         doneProcessing <- true
75                 }
76         }(sigChan)
77
78         // Run all queued containers
79         runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
80
81         // Wait for all running crunch jobs to complete / terminate
82         waitGroup.Wait()
83
84         return nil
85 }
86
87 // Poll for queued containers using pollInterval.
88 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
89 //
90 // Any errors encountered are logged but the program would continue to run (not exit).
91 // This is because, once one or more crunch jobs are running,
92 // we would need to wait for them complete.
93 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
94         ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
95
96         for {
97                 select {
98                 case <-ticker.C:
99                         dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
100                 case <-doneProcessing:
101                         ticker.Stop()
102                         return
103                 }
104         }
105 }
106
107 // Container data
108 type Container struct {
109         UUID     string `json:"uuid"`
110         State    string `json:"state"`
111         Priority int    `json:"priority"`
112 }
113
114 // ContainerList is a list of the containers from api
115 type ContainerList struct {
116         Items []Container `json:"items"`
117 }
118
119 // Get the list of queued containers from API server and invoke run for each container.
120 func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
121         params := arvadosclient.Dict{
122                 "filters": [][]string{[]string{"state", "=", "Queued"}},
123         }
124
125         var containers ContainerList
126         err := arv.List("containers", params, &containers)
127         if err != nil {
128                 log.Printf("Error getting list of queued containers: %q", err)
129                 return
130         }
131
132         for i := 0; i < len(containers.Items); i++ {
133                 log.Printf("About to submit queued container %v", containers.Items[i].UUID)
134                 // Run the container
135                 go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
136         }
137 }
138
139 func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
140         submiterr = nil
141
142         defer func() {
143                 if submiterr != nil {
144                         // This really should be an "Error" state, see #8018
145                         updateErr := arv.Update("containers", container.UUID,
146                                 arvadosclient.Dict{
147                                         "container": arvadosclient.Dict{"state": "Complete"}},
148                                 nil)
149                         if updateErr != nil {
150                                 log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
151                         }
152                 }
153         }()
154
155         cmd := exec.Command("sbatch", "--job-name="+container.UUID, "--share", "--parsable")
156         stdinWriter, stdinerr := cmd.StdinPipe()
157         if stdinerr != nil {
158                 submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
159                 return
160         }
161
162         stdoutReader, stdouterr := cmd.StdoutPipe()
163         if stdouterr != nil {
164                 submiterr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdouterr)
165                 return
166         }
167
168         stderrReader, stderrerr := cmd.StderrPipe()
169         if stderrerr != nil {
170                 submiterr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrerr)
171                 return
172         }
173
174         err := cmd.Start()
175         if err != nil {
176                 submiterr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
177                 return
178         }
179
180         stdoutchan := make(chan []byte)
181         go func() {
182                 b, _ := ioutil.ReadAll(stdoutReader)
183                 stdoutchan <- b
184                 close(stdoutchan)
185         }()
186
187         stderrchan := make(chan []byte)
188         go func() {
189                 b, _ := ioutil.ReadAll(stderrReader)
190                 stderrchan <- b
191                 close(stderrchan)
192         }()
193
194         fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
195         stdinWriter.Close()
196
197         err = cmd.Wait()
198
199         stdoutmsg := <-stdoutchan
200         stderrmsg := <-stderrchan
201
202         if err != nil {
203                 submiterr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
204                 return
205         }
206
207         jobid = string(stdoutmsg)
208
209         return
210 }
211
212 func strigger(jobid, containerUUID, finishCommand string) {
213         cmd := exec.Command("strigger", "--set", "--jobid="+jobid, "--fini", fmt.Sprintf("--program=%s", finishCommand))
214         cmd.Stdout = os.Stdout
215         cmd.Stderr = os.Stderr
216         err := cmd.Run()
217         if err != nil {
218                 log.Printf("While setting up strigger: %v", err)
219         }
220 }
221
222 // Run queued container:
223 // Set container state to locked (TBD)
224 // Run container using the given crunch-run command
225 // Set the container state to Running
226 // If the container priority becomes zero while crunch job is still running, terminate it.
227 func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
228
229         jobid, err := submit(container, crunchRunCommand)
230         if err != nil {
231                 log.Printf("Error queuing container run: %v", err)
232                 return
233         }
234
235         strigger(jobid, container.UUID, finishCommand)
236
237         // Update container status to Running
238         err = arv.Update("containers", container.UUID,
239                 arvadosclient.Dict{
240                         "container": arvadosclient.Dict{"state": "Running"}},
241                 nil)
242         if err != nil {
243                 log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
244         }
245
246         log.Printf("Submitted container run for %v", container.UUID)
247
248         containerUUID := container.UUID
249
250         // A goroutine to terminate the runner if container priority becomes zero
251         priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
252         go func() {
253                 for _ = range priorityTicker.C {
254                         var container Container
255                         err := arv.Get("containers", containerUUID, nil, &container)
256                         if err != nil {
257                                 log.Printf("Error getting container info for %v: %q", container.UUID, err)
258                         } else {
259                                 if container.Priority == 0 {
260                                         log.Printf("Canceling container %v", container.UUID)
261                                         priorityTicker.Stop()
262                                         cancelcmd := exec.Command("scancel", "--name="+container.UUID)
263                                         cancelcmd.Run()
264                                 }
265                                 if container.State == "Complete" {
266                                         priorityTicker.Stop()
267                                 }
268                         }
269                 }
270         }()
271
272 }