]> git.arvados.org - arvados.git/blob - services/crunch-dispatch-local/crunch-dispatch-local.go
21926: Fix tests and migrate BsubCUDAArguments to BsubGPUArguments
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // Dispatcher service for Crunch that runs containers locally.
8
9 import (
10         "context"
11         "crypto/hmac"
12         "crypto/sha256"
13         "flag"
14         "fmt"
15         "os"
16         "os/exec"
17         "os/signal"
18         "runtime"
19         "strings"
20         "sync"
21         "syscall"
22         "time"
23
24         "git.arvados.org/arvados.git/lib/cmd"
25         "git.arvados.org/arvados.git/lib/config"
26         "git.arvados.org/arvados.git/sdk/go/arvados"
27         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
28         "git.arvados.org/arvados.git/sdk/go/dispatch"
29         "github.com/pbnjay/memory"
30         "github.com/sirupsen/logrus"
31 )
32
33 var version = "dev"
34
35 var (
36         runningCmds      map[string]*exec.Cmd
37         runningCmdsMutex sync.Mutex
38         waitGroup        sync.WaitGroup
39         crunchRunCommand string
40 )
41
42 func main() {
43         baseLogger := logrus.StandardLogger()
44         if os.Getenv("DEBUG") != "" {
45                 baseLogger.SetLevel(logrus.DebugLevel)
46         }
47         baseLogger.Formatter = &logrus.JSONFormatter{
48                 TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
49         }
50
51         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
52
53         pollInterval := flags.Int(
54                 "poll-interval",
55                 10,
56                 "Interval in seconds to poll for queued containers")
57
58         flags.StringVar(&crunchRunCommand,
59                 "crunch-run-command",
60                 "",
61                 "Crunch command to run container")
62
63         getVersion := flags.Bool(
64                 "version",
65                 false,
66                 "Print version information and exit.")
67
68         if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "", os.Stderr); !ok {
69                 os.Exit(code)
70         }
71
72         // Print version information if requested
73         if *getVersion {
74                 fmt.Printf("crunch-dispatch-local %s\n", version)
75                 return
76         }
77
78         loader := config.NewLoader(nil, baseLogger)
79         cfg, err := loader.Load()
80         if err != nil {
81                 fmt.Fprintf(os.Stderr, "error loading config: %s\n", err)
82                 os.Exit(1)
83         }
84         cluster, err := cfg.GetCluster("")
85         if err != nil {
86                 fmt.Fprintf(os.Stderr, "config error: %s\n", err)
87                 os.Exit(1)
88         }
89
90         if crunchRunCommand == "" {
91                 crunchRunCommand = cluster.Containers.CrunchRunCommand
92         }
93
94         logger := baseLogger.WithField("ClusterID", cluster.ClusterID)
95         logger.Printf("crunch-dispatch-local %s started", version)
96
97         runningCmds = make(map[string]*exec.Cmd)
98
99         var client arvados.Client
100         client.APIHost = cluster.Services.Controller.ExternalURL.Host
101         client.AuthToken = cluster.SystemRootToken
102         client.Insecure = cluster.TLS.Insecure
103
104         if client.APIHost != "" || client.AuthToken != "" {
105                 // Copy real configs into env vars so [a]
106                 // MakeArvadosClient() uses them, and [b] they get
107                 // propagated to crunch-run via SLURM.
108                 os.Setenv("ARVADOS_API_HOST", client.APIHost)
109                 os.Setenv("ARVADOS_API_TOKEN", client.AuthToken)
110                 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
111                 if client.Insecure {
112                         os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
113                 }
114         } else {
115                 logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
116         }
117
118         arv, err := arvadosclient.MakeArvadosClient()
119         if err != nil {
120                 logger.Errorf("error making Arvados client: %v", err)
121                 os.Exit(1)
122         }
123         arv.Retries = 25
124
125         ctx, cancel := context.WithCancel(context.Background())
126
127         localRun := LocalRun{startFunc, make(chan ResourceRequest), make(chan ResourceAlloc), ctx, cluster}
128
129         go localRun.throttle(logger)
130
131         dispatcher := dispatch.Dispatcher{
132                 Logger:       logger,
133                 Arv:          arv,
134                 RunContainer: localRun.run,
135                 PollPeriod:   time.Duration(*pollInterval) * time.Second,
136         }
137
138         err = dispatcher.Run(ctx)
139         if err != nil {
140                 logger.Error(err)
141                 return
142         }
143
144         c := make(chan os.Signal, 1)
145         signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
146         sig := <-c
147         logger.Printf("Received %s, shutting down", sig)
148         signal.Stop(c)
149
150         cancel()
151
152         runningCmdsMutex.Lock()
153         // Finished dispatching; interrupt any crunch jobs that are still running
154         for _, cmd := range runningCmds {
155                 cmd.Process.Signal(os.Interrupt)
156         }
157         runningCmdsMutex.Unlock()
158
159         // Wait for all running crunch jobs to complete / terminate
160         waitGroup.Wait()
161 }
162
163 func startFunc(container arvados.Container, cmd *exec.Cmd) error {
164         return cmd.Start()
165 }
166
167 type ResourceAlloc struct {
168         uuid     string
169         vcpus    int
170         ram      int64
171         gpuStack string
172         gpus     []string
173 }
174
175 type ResourceRequest struct {
176         uuid     string
177         vcpus    int
178         ram      int64
179         gpuStack string
180         gpus     int
181         ready    chan ResourceAlloc
182 }
183
184 type LocalRun struct {
185         startCmd         func(container arvados.Container, cmd *exec.Cmd) error
186         requestResources chan ResourceRequest
187         releaseResources chan ResourceAlloc
188         ctx              context.Context
189         cluster          *arvados.Cluster
190 }
191
192 func (lr *LocalRun) throttle(logger logrus.FieldLogger) {
193         maxVcpus := runtime.NumCPU()
194         var maxRam int64 = int64(memory.TotalMemory())
195
196         logger.Infof("AMD_VISIBLE_DEVICES=%v", os.Getenv("AMD_VISIBLE_DEVICES"))
197         logger.Infof("CUDA_VISIBLE_DEVICES=%v", os.Getenv("CUDA_VISIBLE_DEVICES"))
198
199         availableCUDAGpus := strings.Split(os.Getenv("CUDA_VISIBLE_DEVICES"), ",")
200         availableROCmGpus := strings.Split(os.Getenv("AMD_VISIBLE_DEVICES"), ",")
201
202         gpuStack := ""
203         maxGpus := 0
204         availableGpus := []string{}
205
206         if maxGpus = len(availableCUDAGpus); maxGpus > 0 {
207                 gpuStack = "cuda"
208                 availableGpus = availableCUDAGpus
209         } else if maxGpus = len(availableROCmGpus); maxGpus > 0 {
210                 gpuStack = "rocm"
211                 availableGpus = availableROCmGpus
212         }
213
214         availableVcpus := maxVcpus
215         availableRam := maxRam
216
217         pending := []ResourceRequest{}
218
219 NextEvent:
220         for {
221                 select {
222                 case rr := <-lr.requestResources:
223                         pending = append(pending, rr)
224
225                 case rr := <-lr.releaseResources:
226                         availableVcpus += rr.vcpus
227                         availableRam += rr.ram
228                         for _, gpu := range rr.gpus {
229                                 availableGpus = append(availableGpus, gpu)
230                         }
231
232                         logger.Infof("%v released allocation (cpus: %v ram: %v gpus: %v %v); now available (cpus: %v ram: %v gpus: %v)",
233                                 rr.uuid, rr.vcpus, rr.ram, rr.gpus,
234                                 availableVcpus, availableRam, availableGpus)
235
236                 case <-lr.ctx.Done():
237                         return
238                 }
239
240                 for len(pending) > 0 {
241                         rr := pending[0]
242                         if rr.vcpus < 1 || rr.vcpus > maxVcpus || rr.ram < 1 || rr.ram > maxRam || rr.gpus > maxGpus || (rr.gpus > 0 && rr.gpuStack != gpuStack) {
243                                 // resource request can never be fulfilled,
244                                 // return a zero struct
245                                 rr.ready <- ResourceAlloc{}
246                                 continue
247                         }
248
249                         if rr.vcpus > availableVcpus || rr.ram > availableRam || rr.gpus > len(availableGpus) {
250                                 logger.Infof("Insufficient resources to start %v, waiting for next event", rr.uuid)
251                                 // can't be scheduled yet, go up to
252                                 // the top and wait for the next event
253                                 continue NextEvent
254                         }
255
256                         alloc := ResourceAlloc{uuid: rr.uuid, vcpus: rr.vcpus, ram: rr.ram}
257
258                         availableVcpus -= rr.vcpus
259                         availableRam -= rr.ram
260
261                         for i := 0; i < rr.gpus; i++ {
262                                 alloc.gpuStack = alloc.gpuStack
263                                 alloc.gpus = append(alloc.gpus, availableGpus[len(availableGpus)-1])
264                                 availableGpus = availableGpus[0 : len(availableGpus)-1]
265                         }
266                         rr.ready <- alloc
267
268                         logger.Infof("%v added allocation (cpus: %v ram: %v gpus: %v %v); now available (cpus: %v ram: %v gpus: %v)",
269                                 rr.uuid, rr.vcpus, rr.ram, rr.gpus,
270                                 availableVcpus, availableRam, availableGpus)
271
272                         // shift array down
273                         for i := 0; i < len(pending)-1; i++ {
274                                 pending[i] = pending[i+1]
275                         }
276                         pending = pending[0 : len(pending)-1]
277                 }
278
279         }
280 }
281
282 // Run a container.
283 //
284 // If the container is Locked, start a new crunch-run process and wait until
285 // crunch-run completes.  If the priority is set to zero, set an interrupt
286 // signal to the crunch-run process.
287 //
288 // If the container is in any other state, or is not Complete/Cancelled after
289 // crunch-run terminates, mark the container as Cancelled.
290 func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
291         container arvados.Container,
292         status <-chan arvados.Container) error {
293
294         uuid := container.UUID
295
296         if container.State == dispatch.Locked {
297
298                 gpuStack := container.RuntimeConstraints.GPU.Stack
299                 gpus := container.RuntimeConstraints.GPU.DeviceCount
300
301                 resourceRequest := ResourceRequest{
302                         uuid:  container.UUID,
303                         vcpus: container.RuntimeConstraints.VCPUs,
304                         ram: (container.RuntimeConstraints.RAM +
305                                 container.RuntimeConstraints.KeepCacheRAM +
306                                 int64(lr.cluster.Containers.ReserveExtraRAM)),
307                         gpuStack: gpuStack,
308                         gpus:     gpus,
309                         ready:    make(chan ResourceAlloc)}
310
311                 select {
312                 case lr.requestResources <- resourceRequest:
313                         break
314                 case <-lr.ctx.Done():
315                         return lr.ctx.Err()
316                 }
317
318                 var resourceAlloc ResourceAlloc
319                 select {
320                 case resourceAlloc = <-resourceRequest.ready:
321                 case <-lr.ctx.Done():
322                         return lr.ctx.Err()
323                 }
324
325                 if resourceAlloc.vcpus == 0 {
326                         dispatcher.Logger.Warnf("Container resource request %v cannot be fulfilled.", uuid)
327                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
328                         return nil
329                 }
330
331                 defer func() {
332                         lr.releaseResources <- resourceAlloc
333                 }()
334
335                 select {
336                 case c := <-status:
337                         // Check for state updates after possibly
338                         // waiting to be ready-to-run
339                         if c.Priority == 0 {
340                                 goto Finish
341                         }
342                 default:
343                         break
344                 }
345
346                 waitGroup.Add(1)
347                 defer waitGroup.Done()
348
349                 args := []string{"--runtime-engine=" + lr.cluster.Containers.RuntimeEngine}
350                 args = append(args, lr.cluster.Containers.CrunchRunArgumentsList...)
351                 args = append(args, uuid)
352
353                 cmd := exec.Command(crunchRunCommand, args...)
354                 cmd.Stdin = nil
355                 cmd.Stderr = os.Stderr
356                 cmd.Stdout = os.Stderr
357
358                 cmd.Env = append(cmd.Env, fmt.Sprintf("PATH=%v", os.Getenv("PATH")))
359                 cmd.Env = append(cmd.Env, fmt.Sprintf("TMPDIR=%v", os.Getenv("TMPDIR")))
360                 cmd.Env = append(cmd.Env, fmt.Sprintf("ARVADOS_API_HOST=%v", os.Getenv("ARVADOS_API_HOST")))
361                 cmd.Env = append(cmd.Env, fmt.Sprintf("ARVADOS_API_TOKEN=%v", os.Getenv("ARVADOS_API_TOKEN")))
362
363                 h := hmac.New(sha256.New, []byte(lr.cluster.SystemRootToken))
364                 fmt.Fprint(h, container.UUID)
365                 cmd.Env = append(cmd.Env, fmt.Sprintf("GatewayAuthSecret=%x", h.Sum(nil)))
366
367                 if resourceAlloc.gpuStack == "rocm" {
368                         cmd.Env = append(cmd.Env, fmt.Sprintf("AMD_VISIBLE_DEVICES=%v", strings.Join(resourceAlloc.gpus, ",")))
369                 }
370                 if resourceAlloc.gpuStack == "cuda" {
371                         cmd.Env = append(cmd.Env, fmt.Sprintf("CUDA_VISIBLE_DEVICES=%v", strings.Join(resourceAlloc.gpus, ",")))
372                 }
373
374                 dispatcher.Logger.Printf("starting container %v", uuid)
375
376                 // Add this crunch job to the list of runningCmds only if we
377                 // succeed in starting crunch-run.
378
379                 runningCmdsMutex.Lock()
380                 if err := lr.startCmd(container, cmd); err != nil {
381                         runningCmdsMutex.Unlock()
382                         dispatcher.Logger.Warnf("error starting %q for %s: %s", crunchRunCommand, uuid, err)
383                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
384                 } else {
385                         runningCmds[uuid] = cmd
386                         runningCmdsMutex.Unlock()
387
388                         // Need to wait for crunch-run to exit
389                         done := make(chan struct{})
390
391                         go func() {
392                                 if _, err := cmd.Process.Wait(); err != nil {
393                                         dispatcher.Logger.Warnf("error while waiting for crunch job to finish for %v: %q", uuid, err)
394                                 }
395                                 dispatcher.Logger.Debugf("sending done")
396                                 done <- struct{}{}
397                         }()
398
399                 Loop:
400                         for {
401                                 select {
402                                 case <-done:
403                                         break Loop
404                                 case c := <-status:
405                                         // Interrupt the child process if priority changes to 0
406                                         if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
407                                                 dispatcher.Logger.Printf("sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
408                                                 cmd.Process.Signal(os.Interrupt)
409                                         }
410                                 }
411                         }
412                         close(done)
413
414                         dispatcher.Logger.Printf("finished container run for %v", uuid)
415
416                         // Remove the crunch job from runningCmds
417                         runningCmdsMutex.Lock()
418                         delete(runningCmds, uuid)
419                         runningCmdsMutex.Unlock()
420                 }
421         }
422
423 Finish:
424
425         // If the container is not finalized, then change it to "Cancelled".
426         err := dispatcher.Arv.Get("containers", uuid, nil, &container)
427         if err != nil {
428                 dispatcher.Logger.Warnf("error getting final container state: %v", err)
429         }
430         if container.State == dispatch.Locked || container.State == dispatch.Running {
431                 dispatcher.Logger.Warnf("after %q process termination, container state for %v is %q; updating it to %q",
432                         crunchRunCommand, uuid, container.State, dispatch.Cancelled)
433                 dispatcher.UpdateState(uuid, dispatch.Cancelled)
434         }
435
436         // drain any subsequent status changes
437         for range status {
438         }
439
440         dispatcher.Logger.Printf("finalized container %v", uuid)
441         return nil
442 }