6518: strigger works
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
1 package main
2
3 import (
4         "flag"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "io/ioutil"
8         "log"
9         "os"
10         "os/exec"
11         "os/signal"
12         "sync"
13         "syscall"
14         "time"
15 )
16
17 func main() {
18         err := doMain()
19         if err != nil {
20                 log.Fatalf("%q", err)
21         }
22 }
23
24 var (
25         arv              arvadosclient.ArvadosClient
26         runningCmds      map[string]*exec.Cmd
27         runningCmdsMutex sync.Mutex
28         waitGroup        sync.WaitGroup
29         doneProcessing   chan bool
30         sigChan          chan os.Signal
31 )
32
33 func doMain() error {
34         flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
35
36         pollInterval := flags.Int(
37                 "poll-interval",
38                 10,
39                 "Interval in seconds to poll for queued containers")
40
41         priorityPollInterval := flags.Int(
42                 "container-priority-poll-interval",
43                 60,
44                 "Interval in seconds to check priority of a dispatched container")
45
46         crunchRunCommand := flags.String(
47                 "crunch-run-command",
48                 "/usr/bin/crunch-run",
49                 "Crunch command to run container")
50
51         finishCommand := flags.String(
52                 "finish-command",
53                 "/usr/bin/crunch-finish-slurm.sh",
54                 "Command to run from strigger when job is finished")
55
56         // Parse args; omit the first arg which is the command name
57         flags.Parse(os.Args[1:])
58
59         var err error
60         arv, err = arvadosclient.MakeArvadosClient()
61         if err != nil {
62                 return err
63         }
64
65         // Channel to terminate
66         doneProcessing = make(chan bool)
67
68         // Graceful shutdown
69         sigChan = make(chan os.Signal, 1)
70         signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
71         go func(sig <-chan os.Signal) {
72                 for sig := range sig {
73                         log.Printf("Caught signal: %v", sig)
74                         doneProcessing <- true
75                 }
76         }(sigChan)
77
78         // Run all queued containers
79         runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
80
81         // Wait for all running crunch jobs to complete / terminate
82         waitGroup.Wait()
83
84         return nil
85 }
86
87 // Poll for queued containers using pollInterval.
88 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
89 //
90 // Any errors encountered are logged but the program would continue to run (not exit).
91 // This is because, once one or more crunch jobs are running,
92 // we would need to wait for them complete.
93 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
94         ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
95
96         for {
97                 select {
98                 case <-ticker.C:
99                         dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
100                 case <-doneProcessing:
101                         ticker.Stop()
102                         return
103                 }
104         }
105 }
106
107 // Container data
108 type Container struct {
109         UUID     string `json:"uuid"`
110         State    string `json:"state"`
111         Priority int    `json:"priority"`
112 }
113
114 // ContainerList is a list of the containers from api
115 type ContainerList struct {
116         Items []Container `json:"items"`
117 }
118
119 // Get the list of queued containers from API server and invoke run for each container.
120 func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
121         params := arvadosclient.Dict{
122                 "filters": [][]string{[]string{"state", "=", "Queued"}},
123         }
124
125         var containers ContainerList
126         err := arv.List("containers", params, &containers)
127         if err != nil {
128                 log.Printf("Error getting list of queued containers: %q", err)
129                 return
130         }
131
132         for i := 0; i < len(containers.Items); i++ {
133                 log.Printf("About to submit queued container %v", containers.Items[i].UUID)
134                 // Run the container
135                 go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
136         }
137 }
138
139 func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
140         submiterr = nil
141
142         defer func() {
143                 if submiterr != nil {
144                         // This really should be an "Error" state, see #8018
145                         updateErr := arv.Update("containers", container.UUID,
146                                 arvadosclient.Dict{
147                                         "container": arvadosclient.Dict{"state": "Complete"}},
148                                 nil)
149                         if updateErr != nil {
150                                 log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
151                         }
152                 }
153         }()
154
155         cmd := exec.Command("sbatch", "--job-name="+container.UUID, "--share", "--parsable")
156         stdinWriter, stdinerr := cmd.StdinPipe()
157         if stdinerr != nil {
158                 submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
159                 return
160         }
161
162         stdoutReader, stdouterr := cmd.StdoutPipe()
163         if stdouterr != nil {
164                 submiterr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdouterr)
165                 return
166         }
167
168         stderrReader, stderrerr := cmd.StderrPipe()
169         if stderrerr != nil {
170                 submiterr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrerr)
171                 return
172         }
173
174         err := cmd.Start()
175         if err != nil {
176                 submiterr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
177                 return
178         }
179
180         stdoutchan := make(chan []byte)
181         go func() {
182                 b, _ := ioutil.ReadAll(stdoutReader)
183                 stdoutchan <- b
184                 close(stdoutchan)
185         }()
186
187         stderrchan := make(chan []byte)
188         go func() {
189                 b, _ := ioutil.ReadAll(stderrReader)
190                 stderrchan <- b
191                 close(stderrchan)
192         }()
193
194         fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
195         stdinWriter.Close()
196
197         err = cmd.Wait()
198
199         stdoutmsg := <-stdoutchan
200         stderrmsg := <-stderrchan
201
202         if err != nil {
203                 submiterr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
204                 return
205         }
206
207         jobid = string(stdoutmsg)
208
209         return
210 }
211
212 func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
213         cmd := exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
214                 fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
215         cmd.Stdout = os.Stdout
216         cmd.Stderr = os.Stderr
217         err := cmd.Run()
218         if err != nil {
219                 log.Printf("While setting up strigger: %v", err)
220         }
221 }
222
223 // Run queued container:
224 // Set container state to locked (TBD)
225 // Run container using the given crunch-run command
226 // Set the container state to Running
227 // If the container priority becomes zero while crunch job is still running, terminate it.
228 func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
229
230         jobid, err := submit(container, crunchRunCommand)
231         if err != nil {
232                 log.Printf("Error queuing container run: %v", err)
233                 return
234         }
235
236         insecure := "0"
237         if arv.ApiInsecure {
238                 insecure = "1"
239         }
240         strigger(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
241
242         // Update container status to Running
243         err = arv.Update("containers", container.UUID,
244                 arvadosclient.Dict{
245                         "container": arvadosclient.Dict{"state": "Running"}},
246                 nil)
247         if err != nil {
248                 log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
249         }
250
251         log.Printf("Submitted container run for %v", container.UUID)
252
253         containerUUID := container.UUID
254
255         // A goroutine to terminate the runner if container priority becomes zero
256         priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
257         go func() {
258                 for _ = range priorityTicker.C {
259                         var container Container
260                         err := arv.Get("containers", containerUUID, nil, &container)
261                         if err != nil {
262                                 log.Printf("Error getting container info for %v: %q", container.UUID, err)
263                         } else {
264                                 if container.Priority == 0 {
265                                         log.Printf("Canceling container %v", container.UUID)
266                                         priorityTicker.Stop()
267                                         cancelcmd := exec.Command("scancel", "--name="+container.UUID)
268                                         cancelcmd.Run()
269                                 }
270                                 if container.State == "Complete" {
271                                         priorityTicker.Stop()
272                                 }
273                         }
274                 }
275         }()
276
277 }