1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
7 // Dispatcher service for Crunch that runs containers locally.
24 "git.arvados.org/arvados.git/lib/cmd"
25 "git.arvados.org/arvados.git/lib/config"
26 "git.arvados.org/arvados.git/sdk/go/arvados"
27 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
28 "git.arvados.org/arvados.git/sdk/go/dispatch"
29 "github.com/pbnjay/memory"
30 "github.com/sirupsen/logrus"
36 runningCmds map[string]*exec.Cmd
37 runningCmdsMutex sync.Mutex
38 waitGroup sync.WaitGroup
39 crunchRunCommand string
43 baseLogger := logrus.StandardLogger()
44 if os.Getenv("DEBUG") != "" {
45 baseLogger.SetLevel(logrus.DebugLevel)
47 baseLogger.Formatter = &logrus.JSONFormatter{
48 TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
51 flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
53 pollInterval := flags.Int(
56 "Interval in seconds to poll for queued containers")
58 flags.StringVar(&crunchRunCommand,
61 "Crunch command to run container")
63 getVersion := flags.Bool(
66 "Print version information and exit.")
68 if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "", os.Stderr); !ok {
72 // Print version information if requested
74 fmt.Printf("crunch-dispatch-local %s\n", version)
78 loader := config.NewLoader(nil, baseLogger)
79 cfg, err := loader.Load()
81 fmt.Fprintf(os.Stderr, "error loading config: %s\n", err)
84 cluster, err := cfg.GetCluster("")
86 fmt.Fprintf(os.Stderr, "config error: %s\n", err)
90 if crunchRunCommand == "" {
91 crunchRunCommand = cluster.Containers.CrunchRunCommand
94 logger := baseLogger.WithField("ClusterID", cluster.ClusterID)
95 logger.Printf("crunch-dispatch-local %s started", version)
97 runningCmds = make(map[string]*exec.Cmd)
99 var client arvados.Client
100 client.APIHost = cluster.Services.Controller.ExternalURL.Host
101 client.AuthToken = cluster.SystemRootToken
102 client.Insecure = cluster.TLS.Insecure
104 if client.APIHost != "" || client.AuthToken != "" {
105 // Copy real configs into env vars so [a]
106 // MakeArvadosClient() uses them, and [b] they get
107 // propagated to crunch-run via SLURM.
108 os.Setenv("ARVADOS_API_HOST", client.APIHost)
109 os.Setenv("ARVADOS_API_TOKEN", client.AuthToken)
110 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
112 os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
115 logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
118 arv, err := arvadosclient.MakeArvadosClient()
120 logger.Errorf("error making Arvados client: %v", err)
125 ctx, cancel := context.WithCancel(context.Background())
127 localRun := LocalRun{startFunc, make(chan ResourceRequest), make(chan ResourceAlloc), ctx, cluster}
129 go localRun.throttle(logger)
131 dispatcher := dispatch.Dispatcher{
134 RunContainer: localRun.run,
135 PollPeriod: time.Duration(*pollInterval) * time.Second,
138 err = dispatcher.Run(ctx)
144 c := make(chan os.Signal, 1)
145 signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
147 logger.Printf("Received %s, shutting down", sig)
152 runningCmdsMutex.Lock()
153 // Finished dispatching; interrupt any crunch jobs that are still running
154 for _, cmd := range runningCmds {
155 cmd.Process.Signal(os.Interrupt)
157 runningCmdsMutex.Unlock()
159 // Wait for all running crunch jobs to complete / terminate
163 func startFunc(container arvados.Container, cmd *exec.Cmd) error {
167 type ResourceAlloc struct {
175 type ResourceRequest struct {
181 ready chan ResourceAlloc
184 type LocalRun struct {
185 startCmd func(container arvados.Container, cmd *exec.Cmd) error
186 requestResources chan ResourceRequest
187 releaseResources chan ResourceAlloc
189 cluster *arvados.Cluster
192 func (lr *LocalRun) throttle(logger logrus.FieldLogger) {
193 maxVcpus := runtime.NumCPU()
194 var maxRam int64 = int64(memory.TotalMemory())
196 logger.Infof("AMD_VISIBLE_DEVICES=%v", os.Getenv("AMD_VISIBLE_DEVICES"))
197 logger.Infof("CUDA_VISIBLE_DEVICES=%v", os.Getenv("CUDA_VISIBLE_DEVICES"))
199 availableCUDAGpus := strings.Split(os.Getenv("CUDA_VISIBLE_DEVICES"), ",")
200 availableROCmGpus := strings.Split(os.Getenv("AMD_VISIBLE_DEVICES"), ",")
204 availableGpus := []string{}
206 if maxGpus = len(availableCUDAGpus); maxGpus > 0 && availableCUDAGpus[0] != "" {
208 availableGpus = availableCUDAGpus
209 } else if maxGpus = len(availableROCmGpus); maxGpus > 0 && availableROCmGpus[0] != "" {
211 availableGpus = availableROCmGpus
214 availableVcpus := maxVcpus
215 availableRam := maxRam
217 pending := []ResourceRequest{}
222 case rr := <-lr.requestResources:
223 pending = append(pending, rr)
225 case rr := <-lr.releaseResources:
226 availableVcpus += rr.vcpus
227 availableRam += rr.ram
228 for _, gpu := range rr.gpus {
229 availableGpus = append(availableGpus, gpu)
232 logger.Infof("%v released allocation (cpus: %v ram: %v gpus: %v); now available (cpus: %v ram: %v gpus: %v)",
233 rr.uuid, rr.vcpus, rr.ram, rr.gpus,
234 availableVcpus, availableRam, availableGpus)
236 case <-lr.ctx.Done():
240 for len(pending) > 0 {
242 if rr.vcpus < 1 || rr.vcpus > maxVcpus {
243 logger.Infof("%v requested vcpus %v but maxVcpus is %v", rr.uuid, rr.vcpus, maxVcpus)
244 // resource request can never be fulfilled,
245 // return a zero struct
246 rr.ready <- ResourceAlloc{}
249 if rr.ram < 1 || rr.ram > maxRam {
250 logger.Infof("%v requested ram %v but maxRam is %v", rr.uuid, rr.ram, maxRam)
251 // resource request can never be fulfilled,
252 // return a zero struct
253 rr.ready <- ResourceAlloc{}
256 if rr.gpus > maxGpus || (rr.gpus > 0 && rr.gpuStack != gpuStack) {
257 logger.Infof("%v requested %v gpus with stack %v but maxGpus is %v and gpuStack is %q", rr.uuid, rr.gpus, rr.gpuStack, maxGpus, gpuStack)
258 // resource request can never be fulfilled,
259 // return a zero struct
260 rr.ready <- ResourceAlloc{}
264 if rr.vcpus > availableVcpus || rr.ram > availableRam || rr.gpus > len(availableGpus) {
265 logger.Infof("Insufficient resources to start %v, waiting for next event", rr.uuid)
266 // can't be scheduled yet, go up to
267 // the top and wait for the next event
271 alloc := ResourceAlloc{uuid: rr.uuid, vcpus: rr.vcpus, ram: rr.ram}
273 availableVcpus -= rr.vcpus
274 availableRam -= rr.ram
275 alloc.gpuStack = rr.gpuStack
277 for i := 0; i < rr.gpus; i++ {
278 alloc.gpus = append(alloc.gpus, availableGpus[len(availableGpus)-1])
279 availableGpus = availableGpus[0 : len(availableGpus)-1]
283 logger.Infof("%v added allocation (cpus: %v ram: %v gpus: %v); now available (cpus: %v ram: %v gpus: %v)",
284 rr.uuid, rr.vcpus, rr.ram, rr.gpus,
285 availableVcpus, availableRam, availableGpus)
288 for i := 0; i < len(pending)-1; i++ {
289 pending[i] = pending[i+1]
291 pending = pending[0 : len(pending)-1]
299 // If the container is Locked, start a new crunch-run process and wait until
300 // crunch-run completes. If the priority is set to zero, set an interrupt
301 // signal to the crunch-run process.
303 // If the container is in any other state, or is not Complete/Cancelled after
304 // crunch-run terminates, mark the container as Cancelled.
305 func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
306 container arvados.Container,
307 status <-chan arvados.Container) error {
309 uuid := container.UUID
311 if container.State == dispatch.Locked {
313 gpuStack := container.RuntimeConstraints.GPU.Stack
314 gpus := container.RuntimeConstraints.GPU.DeviceCount
316 resourceRequest := ResourceRequest{
317 uuid: container.UUID,
318 vcpus: container.RuntimeConstraints.VCPUs,
319 ram: (container.RuntimeConstraints.RAM +
320 container.RuntimeConstraints.KeepCacheRAM +
321 int64(lr.cluster.Containers.ReserveExtraRAM)),
324 ready: make(chan ResourceAlloc)}
327 case lr.requestResources <- resourceRequest:
329 case <-lr.ctx.Done():
333 var resourceAlloc ResourceAlloc
335 case resourceAlloc = <-resourceRequest.ready:
336 case <-lr.ctx.Done():
340 if resourceAlloc.vcpus == 0 {
341 dispatcher.Logger.Warnf("Container resource request %v cannot be fulfilled.", uuid)
342 dispatcher.UpdateState(uuid, dispatch.Cancelled)
347 lr.releaseResources <- resourceAlloc
352 // Check for state updates after possibly
353 // waiting to be ready-to-run
362 defer waitGroup.Done()
364 args := []string{"--runtime-engine=" + lr.cluster.Containers.RuntimeEngine}
365 args = append(args, lr.cluster.Containers.CrunchRunArgumentsList...)
366 args = append(args, uuid)
368 cmd := exec.Command(crunchRunCommand, args...)
370 cmd.Stderr = os.Stderr
371 cmd.Stdout = os.Stderr
373 cmd.Env = append(cmd.Env, fmt.Sprintf("PATH=%v", os.Getenv("PATH")))
374 cmd.Env = append(cmd.Env, fmt.Sprintf("TMPDIR=%v", os.Getenv("TMPDIR")))
375 cmd.Env = append(cmd.Env, fmt.Sprintf("ARVADOS_API_HOST=%v", os.Getenv("ARVADOS_API_HOST")))
376 cmd.Env = append(cmd.Env, fmt.Sprintf("ARVADOS_API_TOKEN=%v", os.Getenv("ARVADOS_API_TOKEN")))
378 h := hmac.New(sha256.New, []byte(lr.cluster.SystemRootToken))
379 fmt.Fprint(h, container.UUID)
380 cmd.Env = append(cmd.Env, fmt.Sprintf("GatewayAuthSecret=%x", h.Sum(nil)))
382 if resourceAlloc.gpuStack == "rocm" {
383 cmd.Env = append(cmd.Env, fmt.Sprintf("AMD_VISIBLE_DEVICES=%v", strings.Join(resourceAlloc.gpus, ",")))
385 if resourceAlloc.gpuStack == "cuda" {
386 cmd.Env = append(cmd.Env, fmt.Sprintf("CUDA_VISIBLE_DEVICES=%v", strings.Join(resourceAlloc.gpus, ",")))
389 dispatcher.Logger.Printf("starting container %v", uuid)
391 // Add this crunch job to the list of runningCmds only if we
392 // succeed in starting crunch-run.
394 runningCmdsMutex.Lock()
395 if err := lr.startCmd(container, cmd); err != nil {
396 runningCmdsMutex.Unlock()
397 dispatcher.Logger.Warnf("error starting %q for %s: %s", crunchRunCommand, uuid, err)
398 dispatcher.UpdateState(uuid, dispatch.Cancelled)
400 runningCmds[uuid] = cmd
401 runningCmdsMutex.Unlock()
403 // Need to wait for crunch-run to exit
404 done := make(chan struct{})
407 if _, err := cmd.Process.Wait(); err != nil {
408 dispatcher.Logger.Warnf("error while waiting for crunch job to finish for %v: %q", uuid, err)
410 dispatcher.Logger.Debugf("sending done")
420 // Interrupt the child process if priority changes to 0
421 if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
422 dispatcher.Logger.Printf("sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
423 cmd.Process.Signal(os.Interrupt)
429 dispatcher.Logger.Printf("finished container run for %v", uuid)
431 // Remove the crunch job from runningCmds
432 runningCmdsMutex.Lock()
433 delete(runningCmds, uuid)
434 runningCmdsMutex.Unlock()
440 // If the container is not finalized, then change it to "Cancelled".
441 err := dispatcher.Arv.Get("containers", uuid, nil, &container)
443 dispatcher.Logger.Warnf("error getting final container state: %v", err)
445 if container.State == dispatch.Locked || container.State == dispatch.Running {
446 dispatcher.Logger.Warnf("after %q process termination, container state for %v is %q; updating it to %q",
447 crunchRunCommand, uuid, container.State, dispatch.Cancelled)
448 dispatcher.UpdateState(uuid, dispatch.Cancelled)
451 // drain any subsequent status changes
455 dispatcher.Logger.Printf("finalized container %v", uuid)