Merge branch 'master' into 11454-wb-federated-search
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // Dispatcher service for Crunch that submits containers to the slurm queue.
8
9 import (
10         "context"
11         "flag"
12         "fmt"
13         "log"
14         "math"
15         "os"
16         "regexp"
17         "strings"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
22         "git.curoverse.com/arvados.git/sdk/go/config"
23         "git.curoverse.com/arvados.git/sdk/go/dispatch"
24         "github.com/coreos/go-systemd/daemon"
25 )
26
27 var version = "dev"
28
29 // Config used by crunch-dispatch-slurm
30 type Config struct {
31         Client arvados.Client
32
33         SbatchArguments []string
34         PollPeriod      arvados.Duration
35
36         // crunch-run command to invoke. The container UUID will be
37         // appended. If nil, []string{"crunch-run"} will be used.
38         //
39         // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
40         CrunchRunCommand []string
41
42         // Minimum time between two attempts to run the same container
43         MinRetryPeriod arvados.Duration
44
45         slurm Slurm
46 }
47
48 func main() {
49         theConfig.slurm = &slurmCLI{}
50         err := doMain()
51         if err != nil {
52                 log.Fatal(err)
53         }
54 }
55
56 var (
57         theConfig Config
58         sqCheck   = &SqueueChecker{}
59 )
60
61 const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
62
63 func doMain() error {
64         flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
65         flags.Usage = func() { usage(flags) }
66
67         configPath := flags.String(
68                 "config",
69                 defaultConfigPath,
70                 "`path` to JSON or YAML configuration file")
71         dumpConfig := flag.Bool(
72                 "dump-config",
73                 false,
74                 "write current configuration to stdout and exit")
75         getVersion := flags.Bool(
76                 "version",
77                 false,
78                 "Print version information and exit.")
79         // Parse args; omit the first arg which is the command name
80         flags.Parse(os.Args[1:])
81
82         // Print version information if requested
83         if *getVersion {
84                 fmt.Printf("crunch-dispatch-slurm %s\n", version)
85                 return nil
86         }
87
88         log.Printf("crunch-dispatch-slurm %s started", version)
89
90         err := readConfig(&theConfig, *configPath)
91         if err != nil {
92                 return err
93         }
94
95         if theConfig.CrunchRunCommand == nil {
96                 theConfig.CrunchRunCommand = []string{"crunch-run"}
97         }
98
99         if theConfig.PollPeriod == 0 {
100                 theConfig.PollPeriod = arvados.Duration(10 * time.Second)
101         }
102
103         if theConfig.Client.APIHost != "" || theConfig.Client.AuthToken != "" {
104                 // Copy real configs into env vars so [a]
105                 // MakeArvadosClient() uses them, and [b] they get
106                 // propagated to crunch-run via SLURM.
107                 os.Setenv("ARVADOS_API_HOST", theConfig.Client.APIHost)
108                 os.Setenv("ARVADOS_API_TOKEN", theConfig.Client.AuthToken)
109                 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
110                 if theConfig.Client.Insecure {
111                         os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
112                 }
113                 os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(theConfig.Client.KeepServiceURIs, " "))
114                 os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
115         } else {
116                 log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
117         }
118
119         if *dumpConfig {
120                 log.Fatal(config.DumpAndExit(theConfig))
121         }
122
123         arv, err := arvadosclient.MakeArvadosClient()
124         if err != nil {
125                 log.Printf("Error making Arvados client: %v", err)
126                 return err
127         }
128         arv.Retries = 25
129
130         sqCheck = &SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
131         defer sqCheck.Stop()
132
133         dispatcher := &dispatch.Dispatcher{
134                 Arv:            arv,
135                 RunContainer:   run,
136                 PollPeriod:     time.Duration(theConfig.PollPeriod),
137                 MinRetryPeriod: time.Duration(theConfig.MinRetryPeriod),
138         }
139
140         if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
141                 log.Printf("Error notifying init daemon: %v", err)
142         }
143
144         go checkSqueueForOrphans(dispatcher, sqCheck)
145
146         return dispatcher.Run(context.Background())
147 }
148
149 var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
150
151 // Check the next squeue report, and invoke TrackContainer for all the
152 // containers in the report. This gives us a chance to cancel slurm
153 // jobs started by a previous dispatch process that never released
154 // their slurm allocations even though their container states are
155 // Cancelled or Complete. See https://dev.arvados.org/issues/10979
156 func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) {
157         for _, uuid := range sqCheck.All() {
158                 if !containerUuidPattern.MatchString(uuid) {
159                         continue
160                 }
161                 err := dispatcher.TrackContainer(uuid)
162                 if err != nil {
163                         log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
164                 }
165         }
166 }
167
168 func niceness(priority int) int {
169         if priority > 1000 {
170                 priority = 1000
171         }
172         if priority < 0 {
173                 priority = 0
174         }
175         // Niceness range 1-10000
176         return (1000 - priority) * 10
177 }
178
179 func sbatchArgs(container arvados.Container) []string {
180         mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
181
182         var disk int64
183         for _, m := range container.Mounts {
184                 if m.Kind == "tmp" {
185                         disk += m.Capacity
186                 }
187         }
188         disk = int64(math.Ceil(float64(disk) / float64(1048576)))
189
190         var sbatchArgs []string
191         sbatchArgs = append(sbatchArgs, theConfig.SbatchArguments...)
192         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
193         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
194         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
195         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
196         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
197         if len(container.SchedulingParameters.Partitions) > 0 {
198                 sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
199         }
200
201         return sbatchArgs
202 }
203
204 func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
205         // append() here avoids modifying crunchRunCommand's
206         // underlying array, which is shared with other goroutines.
207         crArgs := append([]string(nil), crunchRunCommand...)
208         crArgs = append(crArgs, container.UUID)
209         crScript := strings.NewReader(execScript(crArgs))
210
211         sqCheck.L.Lock()
212         defer sqCheck.L.Unlock()
213
214         sbArgs := sbatchArgs(container)
215         log.Printf("running sbatch %+q", sbArgs)
216         return theConfig.slurm.Batch(crScript, sbArgs)
217 }
218
219 // Submit a container to the slurm queue (or resume monitoring if it's
220 // already in the queue).  Cancel the slurm job if the container's
221 // priority changes to zero or its state indicates it's no longer
222 // running.
223 func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
224         ctx, cancel := context.WithCancel(context.Background())
225         defer cancel()
226
227         if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
228                 log.Printf("Submitting container %s to slurm", ctr.UUID)
229                 if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
230                         text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
231                         log.Print(text)
232
233                         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
234                                 "object_uuid": ctr.UUID,
235                                 "event_type":  "dispatch",
236                                 "properties":  map[string]string{"text": text}}}
237                         disp.Arv.Create("logs", lr, nil)
238
239                         disp.Unlock(ctr.UUID)
240                         return
241                 }
242         }
243
244         log.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
245         defer log.Printf("Done monitoring container %s", ctr.UUID)
246
247         // If the container disappears from the slurm queue, there is
248         // no point in waiting for further dispatch updates: just
249         // clean up and return.
250         go func(uuid string) {
251                 for ctx.Err() == nil && sqCheck.HasUUID(uuid) {
252                 }
253                 cancel()
254         }(ctr.UUID)
255
256         for {
257                 select {
258                 case <-ctx.Done():
259                         // Disappeared from squeue
260                         if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
261                                 log.Printf("Error getting final container state for %s: %s", ctr.UUID, err)
262                         }
263                         switch ctr.State {
264                         case dispatch.Running:
265                                 disp.UpdateState(ctr.UUID, dispatch.Cancelled)
266                         case dispatch.Locked:
267                                 disp.Unlock(ctr.UUID)
268                         }
269                         return
270                 case updated, ok := <-status:
271                         if !ok {
272                                 log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
273                                 scancel(ctr)
274                         } else if updated.Priority == 0 {
275                                 log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
276                                 scancel(ctr)
277                         } else {
278                                 renice(updated)
279                         }
280                 }
281         }
282 }
283
284 func scancel(ctr arvados.Container) {
285         sqCheck.L.Lock()
286         err := theConfig.slurm.Cancel(ctr.UUID)
287         sqCheck.L.Unlock()
288
289         if err != nil {
290                 log.Printf("scancel: %s", err)
291                 time.Sleep(time.Second)
292         } else if sqCheck.HasUUID(ctr.UUID) {
293                 log.Printf("container %s is still in squeue after scancel", ctr.UUID)
294                 time.Sleep(time.Second)
295         }
296 }
297
298 func renice(ctr arvados.Container) {
299         nice := niceness(ctr.Priority)
300         oldnice := sqCheck.GetNiceness(ctr.UUID)
301         if nice == oldnice || oldnice == -1 {
302                 return
303         }
304         log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
305         sqCheck.L.Lock()
306         err := theConfig.slurm.Renice(ctr.UUID, nice)
307         sqCheck.L.Unlock()
308
309         if err != nil {
310                 log.Printf("renice: %s", err)
311                 time.Sleep(time.Second)
312                 return
313         }
314         if sqCheck.HasUUID(ctr.UUID) {
315                 log.Printf("container %s has arvados priority %d, slurm nice %d",
316                         ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
317         }
318 }
319
320 func readConfig(dst interface{}, path string) error {
321         err := config.LoadFile(dst, path)
322         if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
323                 log.Printf("Config not specified. Continue with default configuration.")
324                 err = nil
325         }
326         return err
327 }