]> git.arvados.org - arvados.git/blob - services/crunch-dispatch-local/crunch-dispatch-local.go
22127: Merge branch 'main' into 22127-wb2-optimization
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // Dispatcher service for Crunch that runs containers locally.
8
9 import (
10         "context"
11         "crypto/hmac"
12         "crypto/sha256"
13         "flag"
14         "fmt"
15         "os"
16         "os/exec"
17         "os/signal"
18         "runtime"
19         "strings"
20         "sync"
21         "syscall"
22         "time"
23
24         "git.arvados.org/arvados.git/lib/cmd"
25         "git.arvados.org/arvados.git/lib/config"
26         "git.arvados.org/arvados.git/sdk/go/arvados"
27         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
28         "git.arvados.org/arvados.git/sdk/go/dispatch"
29         "github.com/pbnjay/memory"
30         "github.com/sirupsen/logrus"
31 )
32
33 var version = "dev"
34
35 var (
36         runningCmds      map[string]*exec.Cmd
37         runningCmdsMutex sync.Mutex
38         waitGroup        sync.WaitGroup
39         crunchRunCommand string
40 )
41
42 func main() {
43         baseLogger := logrus.StandardLogger()
44         if os.Getenv("DEBUG") != "" {
45                 baseLogger.SetLevel(logrus.DebugLevel)
46         }
47         baseLogger.Formatter = &logrus.JSONFormatter{
48                 TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
49         }
50
51         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
52
53         pollInterval := flags.Int(
54                 "poll-interval",
55                 10,
56                 "Interval in seconds to poll for queued containers")
57
58         flags.StringVar(&crunchRunCommand,
59                 "crunch-run-command",
60                 "",
61                 "Crunch command to run container")
62
63         getVersion := flags.Bool(
64                 "version",
65                 false,
66                 "Print version information and exit.")
67
68         if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "", os.Stderr); !ok {
69                 os.Exit(code)
70         }
71
72         // Print version information if requested
73         if *getVersion {
74                 fmt.Printf("crunch-dispatch-local %s\n", version)
75                 return
76         }
77
78         loader := config.NewLoader(nil, baseLogger)
79         cfg, err := loader.Load()
80         if err != nil {
81                 fmt.Fprintf(os.Stderr, "error loading config: %s\n", err)
82                 os.Exit(1)
83         }
84         cluster, err := cfg.GetCluster("")
85         if err != nil {
86                 fmt.Fprintf(os.Stderr, "config error: %s\n", err)
87                 os.Exit(1)
88         }
89
90         if crunchRunCommand == "" {
91                 crunchRunCommand = cluster.Containers.CrunchRunCommand
92         }
93
94         logger := baseLogger.WithField("ClusterID", cluster.ClusterID)
95         logger.Printf("crunch-dispatch-local %s started", version)
96
97         runningCmds = make(map[string]*exec.Cmd)
98
99         var client arvados.Client
100         client.APIHost = cluster.Services.Controller.ExternalURL.Host
101         client.AuthToken = cluster.SystemRootToken
102         client.Insecure = cluster.TLS.Insecure
103
104         if client.APIHost != "" || client.AuthToken != "" {
105                 // Copy real configs into env vars so [a]
106                 // MakeArvadosClient() uses them, and [b] they get
107                 // propagated to crunch-run via SLURM.
108                 os.Setenv("ARVADOS_API_HOST", client.APIHost)
109                 os.Setenv("ARVADOS_API_TOKEN", client.AuthToken)
110                 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
111                 if client.Insecure {
112                         os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
113                 }
114         } else {
115                 logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
116         }
117
118         arv, err := arvadosclient.MakeArvadosClient()
119         if err != nil {
120                 logger.Errorf("error making Arvados client: %v", err)
121                 os.Exit(1)
122         }
123         arv.Retries = 25
124
125         ctx, cancel := context.WithCancel(context.Background())
126
127         localRun := LocalRun{startFunc, make(chan ResourceRequest), make(chan ResourceAlloc), ctx, cluster}
128
129         go localRun.throttle(logger)
130
131         dispatcher := dispatch.Dispatcher{
132                 Logger:       logger,
133                 Arv:          arv,
134                 RunContainer: localRun.run,
135                 PollPeriod:   time.Duration(*pollInterval) * time.Second,
136         }
137
138         err = dispatcher.Run(ctx)
139         if err != nil {
140                 logger.Error(err)
141                 return
142         }
143
144         c := make(chan os.Signal, 1)
145         signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
146         sig := <-c
147         logger.Printf("Received %s, shutting down", sig)
148         signal.Stop(c)
149
150         cancel()
151
152         runningCmdsMutex.Lock()
153         // Finished dispatching; interrupt any crunch jobs that are still running
154         for _, cmd := range runningCmds {
155                 cmd.Process.Signal(os.Interrupt)
156         }
157         runningCmdsMutex.Unlock()
158
159         // Wait for all running crunch jobs to complete / terminate
160         waitGroup.Wait()
161 }
162
163 func startFunc(container arvados.Container, cmd *exec.Cmd) error {
164         return cmd.Start()
165 }
166
167 type ResourceAlloc struct {
168         uuid     string
169         vcpus    int
170         ram      int64
171         gpuStack string
172         gpus     []string
173 }
174
175 type ResourceRequest struct {
176         uuid     string
177         vcpus    int
178         ram      int64
179         gpuStack string
180         gpus     int
181         ready    chan ResourceAlloc
182 }
183
184 type LocalRun struct {
185         startCmd         func(container arvados.Container, cmd *exec.Cmd) error
186         requestResources chan ResourceRequest
187         releaseResources chan ResourceAlloc
188         ctx              context.Context
189         cluster          *arvados.Cluster
190 }
191
192 func (lr *LocalRun) throttle(logger logrus.FieldLogger) {
193         maxVcpus := runtime.NumCPU()
194         var maxRam int64 = int64(memory.TotalMemory())
195
196         logger.Infof("AMD_VISIBLE_DEVICES=%v", os.Getenv("AMD_VISIBLE_DEVICES"))
197         logger.Infof("CUDA_VISIBLE_DEVICES=%v", os.Getenv("CUDA_VISIBLE_DEVICES"))
198
199         availableCUDAGpus := strings.Split(os.Getenv("CUDA_VISIBLE_DEVICES"), ",")
200         availableROCmGpus := strings.Split(os.Getenv("AMD_VISIBLE_DEVICES"), ",")
201
202         gpuStack := ""
203         maxGpus := 0
204         availableGpus := []string{}
205
206         if maxGpus = len(availableCUDAGpus); maxGpus > 0 && availableCUDAGpus[0] != "" {
207                 gpuStack = "cuda"
208                 availableGpus = availableCUDAGpus
209         } else if maxGpus = len(availableROCmGpus); maxGpus > 0 && availableROCmGpus[0] != "" {
210                 gpuStack = "rocm"
211                 availableGpus = availableROCmGpus
212         }
213
214         availableVcpus := maxVcpus
215         availableRam := maxRam
216
217         pending := []ResourceRequest{}
218
219 NextEvent:
220         for {
221                 select {
222                 case rr := <-lr.requestResources:
223                         pending = append(pending, rr)
224
225                 case rr := <-lr.releaseResources:
226                         availableVcpus += rr.vcpus
227                         availableRam += rr.ram
228                         for _, gpu := range rr.gpus {
229                                 availableGpus = append(availableGpus, gpu)
230                         }
231
232                         logger.Infof("%v released allocation (cpus: %v ram: %v gpus: %v); now available (cpus: %v ram: %v gpus: %v)",
233                                 rr.uuid, rr.vcpus, rr.ram, rr.gpus,
234                                 availableVcpus, availableRam, availableGpus)
235
236                 case <-lr.ctx.Done():
237                         return
238                 }
239
240                 for len(pending) > 0 {
241                         rr := pending[0]
242                         if rr.vcpus < 1 || rr.vcpus > maxVcpus {
243                                 logger.Infof("%v requested vcpus %v but maxVcpus is %v", rr.uuid, rr.vcpus, maxVcpus)
244                                 // resource request can never be fulfilled,
245                                 // return a zero struct
246                                 rr.ready <- ResourceAlloc{}
247                                 continue
248                         }
249                         if rr.ram < 1 || rr.ram > maxRam {
250                                 logger.Infof("%v requested ram %v but maxRam is %v", rr.uuid, rr.ram, maxRam)
251                                 // resource request can never be fulfilled,
252                                 // return a zero struct
253                                 rr.ready <- ResourceAlloc{}
254                                 continue
255                         }
256                         if rr.gpus > maxGpus || (rr.gpus > 0 && rr.gpuStack != gpuStack) {
257                                 logger.Infof("%v requested %v gpus with stack %v but maxGpus is %v and gpuStack is %q", rr.uuid, rr.gpus, rr.gpuStack, maxGpus, gpuStack)
258                                 // resource request can never be fulfilled,
259                                 // return a zero struct
260                                 rr.ready <- ResourceAlloc{}
261                                 continue
262                         }
263
264                         if rr.vcpus > availableVcpus || rr.ram > availableRam || rr.gpus > len(availableGpus) {
265                                 logger.Infof("Insufficient resources to start %v, waiting for next event", rr.uuid)
266                                 // can't be scheduled yet, go up to
267                                 // the top and wait for the next event
268                                 continue NextEvent
269                         }
270
271                         alloc := ResourceAlloc{uuid: rr.uuid, vcpus: rr.vcpus, ram: rr.ram}
272
273                         availableVcpus -= rr.vcpus
274                         availableRam -= rr.ram
275                         alloc.gpuStack = rr.gpuStack
276
277                         for i := 0; i < rr.gpus; i++ {
278                                 alloc.gpus = append(alloc.gpus, availableGpus[len(availableGpus)-1])
279                                 availableGpus = availableGpus[0 : len(availableGpus)-1]
280                         }
281                         rr.ready <- alloc
282
283                         logger.Infof("%v added allocation (cpus: %v ram: %v gpus: %v); now available (cpus: %v ram: %v gpus: %v)",
284                                 rr.uuid, rr.vcpus, rr.ram, rr.gpus,
285                                 availableVcpus, availableRam, availableGpus)
286
287                         // shift array down
288                         for i := 0; i < len(pending)-1; i++ {
289                                 pending[i] = pending[i+1]
290                         }
291                         pending = pending[0 : len(pending)-1]
292                 }
293
294         }
295 }
296
297 // Run a container.
298 //
299 // If the container is Locked, start a new crunch-run process and wait until
300 // crunch-run completes.  If the priority is set to zero, set an interrupt
301 // signal to the crunch-run process.
302 //
303 // If the container is in any other state, or is not Complete/Cancelled after
304 // crunch-run terminates, mark the container as Cancelled.
305 func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
306         container arvados.Container,
307         status <-chan arvados.Container) error {
308
309         uuid := container.UUID
310
311         if container.State == dispatch.Locked {
312
313                 gpuStack := container.RuntimeConstraints.GPU.Stack
314                 gpus := container.RuntimeConstraints.GPU.DeviceCount
315
316                 resourceRequest := ResourceRequest{
317                         uuid:  container.UUID,
318                         vcpus: container.RuntimeConstraints.VCPUs,
319                         ram: (container.RuntimeConstraints.RAM +
320                                 container.RuntimeConstraints.KeepCacheRAM +
321                                 int64(lr.cluster.Containers.ReserveExtraRAM)),
322                         gpuStack: gpuStack,
323                         gpus:     gpus,
324                         ready:    make(chan ResourceAlloc)}
325
326                 select {
327                 case lr.requestResources <- resourceRequest:
328                         break
329                 case <-lr.ctx.Done():
330                         return lr.ctx.Err()
331                 }
332
333                 var resourceAlloc ResourceAlloc
334                 select {
335                 case resourceAlloc = <-resourceRequest.ready:
336                 case <-lr.ctx.Done():
337                         return lr.ctx.Err()
338                 }
339
340                 if resourceAlloc.vcpus == 0 {
341                         dispatcher.Logger.Warnf("Container resource request %v cannot be fulfilled.", uuid)
342                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
343                         return nil
344                 }
345
346                 defer func() {
347                         lr.releaseResources <- resourceAlloc
348                 }()
349
350                 select {
351                 case c := <-status:
352                         // Check for state updates after possibly
353                         // waiting to be ready-to-run
354                         if c.Priority == 0 {
355                                 goto Finish
356                         }
357                 default:
358                         break
359                 }
360
361                 waitGroup.Add(1)
362                 defer waitGroup.Done()
363
364                 args := []string{"--runtime-engine=" + lr.cluster.Containers.RuntimeEngine}
365                 args = append(args, lr.cluster.Containers.CrunchRunArgumentsList...)
366                 args = append(args, uuid)
367
368                 cmd := exec.Command(crunchRunCommand, args...)
369                 cmd.Stdin = nil
370                 cmd.Stderr = os.Stderr
371                 cmd.Stdout = os.Stderr
372
373                 cmd.Env = append(cmd.Env, fmt.Sprintf("PATH=%v", os.Getenv("PATH")))
374                 cmd.Env = append(cmd.Env, fmt.Sprintf("TMPDIR=%v", os.Getenv("TMPDIR")))
375                 cmd.Env = append(cmd.Env, fmt.Sprintf("ARVADOS_API_HOST=%v", os.Getenv("ARVADOS_API_HOST")))
376                 cmd.Env = append(cmd.Env, fmt.Sprintf("ARVADOS_API_TOKEN=%v", os.Getenv("ARVADOS_API_TOKEN")))
377
378                 h := hmac.New(sha256.New, []byte(lr.cluster.SystemRootToken))
379                 fmt.Fprint(h, container.UUID)
380                 cmd.Env = append(cmd.Env, fmt.Sprintf("GatewayAuthSecret=%x", h.Sum(nil)))
381
382                 if resourceAlloc.gpuStack == "rocm" {
383                         cmd.Env = append(cmd.Env, fmt.Sprintf("AMD_VISIBLE_DEVICES=%v", strings.Join(resourceAlloc.gpus, ",")))
384                 }
385                 if resourceAlloc.gpuStack == "cuda" {
386                         cmd.Env = append(cmd.Env, fmt.Sprintf("CUDA_VISIBLE_DEVICES=%v", strings.Join(resourceAlloc.gpus, ",")))
387                 }
388
389                 dispatcher.Logger.Printf("starting container %v", uuid)
390
391                 // Add this crunch job to the list of runningCmds only if we
392                 // succeed in starting crunch-run.
393
394                 runningCmdsMutex.Lock()
395                 if err := lr.startCmd(container, cmd); err != nil {
396                         runningCmdsMutex.Unlock()
397                         dispatcher.Logger.Warnf("error starting %q for %s: %s", crunchRunCommand, uuid, err)
398                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
399                 } else {
400                         runningCmds[uuid] = cmd
401                         runningCmdsMutex.Unlock()
402
403                         // Need to wait for crunch-run to exit
404                         done := make(chan struct{})
405
406                         go func() {
407                                 if _, err := cmd.Process.Wait(); err != nil {
408                                         dispatcher.Logger.Warnf("error while waiting for crunch job to finish for %v: %q", uuid, err)
409                                 }
410                                 dispatcher.Logger.Debugf("sending done")
411                                 done <- struct{}{}
412                         }()
413
414                 Loop:
415                         for {
416                                 select {
417                                 case <-done:
418                                         break Loop
419                                 case c := <-status:
420                                         // Interrupt the child process if priority changes to 0
421                                         if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
422                                                 dispatcher.Logger.Printf("sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
423                                                 cmd.Process.Signal(os.Interrupt)
424                                         }
425                                 }
426                         }
427                         close(done)
428
429                         dispatcher.Logger.Printf("finished container run for %v", uuid)
430
431                         // Remove the crunch job from runningCmds
432                         runningCmdsMutex.Lock()
433                         delete(runningCmds, uuid)
434                         runningCmdsMutex.Unlock()
435                 }
436         }
437
438 Finish:
439
440         // If the container is not finalized, then change it to "Cancelled".
441         err := dispatcher.Arv.Get("containers", uuid, nil, &container)
442         if err != nil {
443                 dispatcher.Logger.Warnf("error getting final container state: %v", err)
444         }
445         if container.State == dispatch.Locked || container.State == dispatch.Running {
446                 dispatcher.Logger.Warnf("after %q process termination, container state for %v is %q; updating it to %q",
447                         crunchRunCommand, uuid, container.State, dispatch.Cancelled)
448                 dispatcher.UpdateState(uuid, dispatch.Cancelled)
449         }
450
451         // drain any subsequent status changes
452         for range status {
453         }
454
455         dispatcher.Logger.Printf("finalized container %v", uuid)
456         return nil
457 }