Merge branch '12863-wb-cr-status'
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // Dispatcher service for Crunch that submits containers to the slurm queue.
8
9 import (
10         "context"
11         "flag"
12         "fmt"
13         "log"
14         "math"
15         "os"
16         "regexp"
17         "strings"
18         "time"
19
20         "git.curoverse.com/arvados.git/lib/dispatchcloud"
21         "git.curoverse.com/arvados.git/sdk/go/arvados"
22         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
23         "git.curoverse.com/arvados.git/sdk/go/config"
24         "git.curoverse.com/arvados.git/sdk/go/dispatch"
25         "github.com/coreos/go-systemd/daemon"
26 )
27
28 var (
29         version           = "dev"
30         defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
31 )
32
33 type Dispatcher struct {
34         *dispatch.Dispatcher
35         cluster *arvados.Cluster
36         sqCheck *SqueueChecker
37         slurm   Slurm
38
39         Client arvados.Client
40
41         SbatchArguments []string
42         PollPeriod      arvados.Duration
43
44         // crunch-run command to invoke. The container UUID will be
45         // appended. If nil, []string{"crunch-run"} will be used.
46         //
47         // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
48         CrunchRunCommand []string
49
50         // Minimum time between two attempts to run the same container
51         MinRetryPeriod arvados.Duration
52 }
53
54 func main() {
55         disp := &Dispatcher{}
56         err := disp.Run(os.Args[0], os.Args[1:])
57         if err != nil {
58                 log.Fatal(err)
59         }
60 }
61
62 func (disp *Dispatcher) Run(prog string, args []string) error {
63         if err := disp.configure(prog, args); err != nil {
64                 return err
65         }
66         disp.setup()
67         return disp.run()
68 }
69
70 // configure() loads config files. Tests skip this.
71 func (disp *Dispatcher) configure(prog string, args []string) error {
72         flags := flag.NewFlagSet(prog, flag.ExitOnError)
73         flags.Usage = func() { usage(flags) }
74
75         configPath := flags.String(
76                 "config",
77                 defaultConfigPath,
78                 "`path` to JSON or YAML configuration file")
79         dumpConfig := flag.Bool(
80                 "dump-config",
81                 false,
82                 "write current configuration to stdout and exit")
83         getVersion := flags.Bool(
84                 "version",
85                 false,
86                 "Print version information and exit.")
87         // Parse args; omit the first arg which is the command name
88         flags.Parse(args)
89
90         // Print version information if requested
91         if *getVersion {
92                 fmt.Printf("crunch-dispatch-slurm %s\n", version)
93                 return nil
94         }
95
96         log.Printf("crunch-dispatch-slurm %s started", version)
97
98         err := disp.readConfig(*configPath)
99         if err != nil {
100                 return err
101         }
102
103         if disp.CrunchRunCommand == nil {
104                 disp.CrunchRunCommand = []string{"crunch-run"}
105         }
106
107         if disp.PollPeriod == 0 {
108                 disp.PollPeriod = arvados.Duration(10 * time.Second)
109         }
110
111         if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
112                 // Copy real configs into env vars so [a]
113                 // MakeArvadosClient() uses them, and [b] they get
114                 // propagated to crunch-run via SLURM.
115                 os.Setenv("ARVADOS_API_HOST", disp.Client.APIHost)
116                 os.Setenv("ARVADOS_API_TOKEN", disp.Client.AuthToken)
117                 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
118                 if disp.Client.Insecure {
119                         os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
120                 }
121                 os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
122                 os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
123         } else {
124                 log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
125         }
126
127         if *dumpConfig {
128                 return config.DumpAndExit(disp)
129         }
130
131         siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
132         if os.IsNotExist(err) {
133                 log.Printf("warning: no cluster config (%s), proceeding with no node types defined", err)
134         } else if err != nil {
135                 return fmt.Errorf("error loading config: %s", err)
136         } else if disp.cluster, err = siteConfig.GetCluster(""); err != nil {
137                 return fmt.Errorf("config error: %s", err)
138         }
139
140         return nil
141 }
142
143 // setup() initializes private fields after configure().
144 func (disp *Dispatcher) setup() {
145         arv, err := arvadosclient.MakeArvadosClient()
146         if err != nil {
147                 log.Fatalf("Error making Arvados client: %v", err)
148         }
149         arv.Retries = 25
150
151         disp.slurm = &slurmCLI{}
152         disp.sqCheck = &SqueueChecker{
153                 Period: time.Duration(disp.PollPeriod),
154                 Slurm:  disp.slurm,
155         }
156         disp.Dispatcher = &dispatch.Dispatcher{
157                 Arv:            arv,
158                 RunContainer:   disp.runContainer,
159                 PollPeriod:     time.Duration(disp.PollPeriod),
160                 MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
161         }
162 }
163
164 func (disp *Dispatcher) run() error {
165         defer disp.sqCheck.Stop()
166
167         if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
168                 go dispatchcloud.SlurmNodeTypeFeatureKludge(disp.cluster)
169         }
170
171         if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
172                 log.Printf("Error notifying init daemon: %v", err)
173         }
174         go disp.checkSqueueForOrphans()
175         return disp.Dispatcher.Run(context.Background())
176 }
177
178 var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
179
180 // Check the next squeue report, and invoke TrackContainer for all the
181 // containers in the report. This gives us a chance to cancel slurm
182 // jobs started by a previous dispatch process that never released
183 // their slurm allocations even though their container states are
184 // Cancelled or Complete. See https://dev.arvados.org/issues/10979
185 func (disp *Dispatcher) checkSqueueForOrphans() {
186         for _, uuid := range disp.sqCheck.All() {
187                 if !containerUuidPattern.MatchString(uuid) {
188                         continue
189                 }
190                 err := disp.TrackContainer(uuid)
191                 if err != nil {
192                         log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
193                 }
194         }
195 }
196
197 func (disp *Dispatcher) niceness(priority int) int {
198         if priority > 1000 {
199                 priority = 1000
200         }
201         if priority < 0 {
202                 priority = 0
203         }
204         // Niceness range 1-10000
205         return (1000 - priority) * 10
206 }
207
208 func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
209         mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
210
211         var disk int64
212         for _, m := range container.Mounts {
213                 if m.Kind == "tmp" {
214                         disk += m.Capacity
215                 }
216         }
217         disk = int64(math.Ceil(float64(disk) / float64(1048576)))
218
219         var sbatchArgs []string
220         sbatchArgs = append(sbatchArgs, disp.SbatchArguments...)
221         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
222         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
223         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
224         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
225         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", disp.niceness(container.Priority)))
226         if len(container.SchedulingParameters.Partitions) > 0 {
227                 sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
228         }
229
230         if disp.cluster == nil {
231                 // no instance types configured
232         } else if it, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
233                 // ditto
234         } else if err != nil {
235                 return nil, err
236         } else {
237                 sbatchArgs = append(sbatchArgs, "--constraint=instancetype="+it.Name)
238         }
239
240         return sbatchArgs, nil
241 }
242
243 func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
244         // append() here avoids modifying crunchRunCommand's
245         // underlying array, which is shared with other goroutines.
246         crArgs := append([]string(nil), crunchRunCommand...)
247         crArgs = append(crArgs, container.UUID)
248         crScript := strings.NewReader(execScript(crArgs))
249
250         disp.sqCheck.L.Lock()
251         defer disp.sqCheck.L.Unlock()
252
253         sbArgs, err := disp.sbatchArgs(container)
254         if err != nil {
255                 return err
256         }
257         log.Printf("running sbatch %+q", sbArgs)
258         return disp.slurm.Batch(crScript, sbArgs)
259 }
260
261 // Submit a container to the slurm queue (or resume monitoring if it's
262 // already in the queue).  Cancel the slurm job if the container's
263 // priority changes to zero or its state indicates it's no longer
264 // running.
265 func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
266         ctx, cancel := context.WithCancel(context.Background())
267         defer cancel()
268
269         if ctr.State == dispatch.Locked && !disp.sqCheck.HasUUID(ctr.UUID) {
270                 log.Printf("Submitting container %s to slurm", ctr.UUID)
271                 if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil {
272                         var text string
273                         if err == dispatchcloud.ErrConstraintsNotSatisfiable {
274                                 text = fmt.Sprintf("cannot run container %s: %s", ctr.UUID, err)
275                                 disp.UpdateState(ctr.UUID, dispatch.Cancelled)
276                         } else {
277                                 text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
278                         }
279                         log.Print(text)
280
281                         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
282                                 "object_uuid": ctr.UUID,
283                                 "event_type":  "dispatch",
284                                 "properties":  map[string]string{"text": text}}}
285                         disp.Arv.Create("logs", lr, nil)
286
287                         disp.Unlock(ctr.UUID)
288                         return
289                 }
290         }
291
292         log.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
293         defer log.Printf("Done monitoring container %s", ctr.UUID)
294
295         // If the container disappears from the slurm queue, there is
296         // no point in waiting for further dispatch updates: just
297         // clean up and return.
298         go func(uuid string) {
299                 for ctx.Err() == nil && disp.sqCheck.HasUUID(uuid) {
300                 }
301                 cancel()
302         }(ctr.UUID)
303
304         for {
305                 select {
306                 case <-ctx.Done():
307                         // Disappeared from squeue
308                         if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
309                                 log.Printf("Error getting final container state for %s: %s", ctr.UUID, err)
310                         }
311                         switch ctr.State {
312                         case dispatch.Running:
313                                 disp.UpdateState(ctr.UUID, dispatch.Cancelled)
314                         case dispatch.Locked:
315                                 disp.Unlock(ctr.UUID)
316                         }
317                         return
318                 case updated, ok := <-status:
319                         if !ok {
320                                 log.Printf("container %s is done: cancel slurm job", ctr.UUID)
321                                 disp.scancel(ctr)
322                         } else if updated.Priority == 0 {
323                                 log.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
324                                 disp.scancel(ctr)
325                         } else {
326                                 disp.renice(updated)
327                         }
328                 }
329         }
330 }
331
332 func (disp *Dispatcher) scancel(ctr arvados.Container) {
333         disp.sqCheck.L.Lock()
334         err := disp.slurm.Cancel(ctr.UUID)
335         disp.sqCheck.L.Unlock()
336
337         if err != nil {
338                 log.Printf("scancel: %s", err)
339                 time.Sleep(time.Second)
340         } else if disp.sqCheck.HasUUID(ctr.UUID) {
341                 log.Printf("container %s is still in squeue after scancel", ctr.UUID)
342                 time.Sleep(time.Second)
343         }
344 }
345
346 func (disp *Dispatcher) renice(ctr arvados.Container) {
347         nice := disp.niceness(ctr.Priority)
348         oldnice := disp.sqCheck.GetNiceness(ctr.UUID)
349         if nice == oldnice || oldnice == -1 {
350                 return
351         }
352         log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
353         disp.sqCheck.L.Lock()
354         err := disp.slurm.Renice(ctr.UUID, nice)
355         disp.sqCheck.L.Unlock()
356
357         if err != nil {
358                 log.Printf("renice: %s", err)
359                 time.Sleep(time.Second)
360                 return
361         }
362         if disp.sqCheck.HasUUID(ctr.UUID) {
363                 log.Printf("container %s has arvados priority %d, slurm nice %d",
364                         ctr.UUID, ctr.Priority, disp.sqCheck.GetNiceness(ctr.UUID))
365         }
366 }
367
368 func (disp *Dispatcher) readConfig(path string) error {
369         err := config.LoadFile(disp, path)
370         if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
371                 log.Printf("Config not specified. Continue with default configuration.")
372                 err = nil
373         }
374         return err
375 }