1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
7 // Dispatcher service for Crunch that runs containers locally.
24 "git.arvados.org/arvados.git/lib/cmd"
25 "git.arvados.org/arvados.git/lib/config"
26 "git.arvados.org/arvados.git/sdk/go/arvados"
27 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
28 "git.arvados.org/arvados.git/sdk/go/dispatch"
29 "github.com/pbnjay/memory"
30 "github.com/sirupsen/logrus"
36 runningCmds map[string]*exec.Cmd
37 runningCmdsMutex sync.Mutex
38 waitGroup sync.WaitGroup
39 crunchRunCommand string
43 baseLogger := logrus.StandardLogger()
44 if os.Getenv("DEBUG") != "" {
45 baseLogger.SetLevel(logrus.DebugLevel)
47 baseLogger.Formatter = &logrus.JSONFormatter{
48 TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
51 flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
53 pollInterval := flags.Int(
56 "Interval in seconds to poll for queued containers")
58 flags.StringVar(&crunchRunCommand,
61 "Crunch command to run container")
63 getVersion := flags.Bool(
66 "Print version information and exit.")
68 if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "", os.Stderr); !ok {
72 // Print version information if requested
74 fmt.Printf("crunch-dispatch-local %s\n", version)
78 loader := config.NewLoader(nil, baseLogger)
79 cfg, err := loader.Load()
81 fmt.Fprintf(os.Stderr, "error loading config: %s\n", err)
84 cluster, err := cfg.GetCluster("")
86 fmt.Fprintf(os.Stderr, "config error: %s\n", err)
90 if crunchRunCommand == "" {
91 crunchRunCommand = cluster.Containers.CrunchRunCommand
94 logger := baseLogger.WithField("ClusterID", cluster.ClusterID)
95 logger.Printf("crunch-dispatch-local %s started", version)
97 runningCmds = make(map[string]*exec.Cmd)
99 var client arvados.Client
100 client.APIHost = cluster.Services.Controller.ExternalURL.Host
101 client.AuthToken = cluster.SystemRootToken
102 client.Insecure = cluster.TLS.Insecure
104 if client.APIHost != "" || client.AuthToken != "" {
105 // Copy real configs into env vars so [a]
106 // MakeArvadosClient() uses them, and [b] they get
107 // propagated to crunch-run via SLURM.
108 os.Setenv("ARVADOS_API_HOST", client.APIHost)
109 os.Setenv("ARVADOS_API_TOKEN", client.AuthToken)
110 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
112 os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
115 logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
118 arv, err := arvadosclient.MakeArvadosClient()
120 logger.Errorf("error making Arvados client: %v", err)
125 ctx, cancel := context.WithCancel(context.Background())
127 localRun := LocalRun{startFunc, make(chan ResourceRequest), make(chan ResourceAlloc), ctx, cluster}
129 go localRun.throttle(logger)
131 dispatcher := dispatch.Dispatcher{
134 RunContainer: localRun.run,
135 PollPeriod: time.Duration(*pollInterval) * time.Second,
138 err = dispatcher.Run(ctx)
144 c := make(chan os.Signal, 1)
145 signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
147 logger.Printf("Received %s, shutting down", sig)
152 runningCmdsMutex.Lock()
153 // Finished dispatching; interrupt any crunch jobs that are still running
154 for _, cmd := range runningCmds {
155 cmd.Process.Signal(os.Interrupt)
157 runningCmdsMutex.Unlock()
159 // Wait for all running crunch jobs to complete / terminate
163 func startFunc(container arvados.Container, cmd *exec.Cmd) error {
167 type ResourceAlloc struct {
175 type ResourceRequest struct {
181 ready chan ResourceAlloc
184 type LocalRun struct {
185 startCmd func(container arvados.Container, cmd *exec.Cmd) error
186 requestResources chan ResourceRequest
187 releaseResources chan ResourceAlloc
189 cluster *arvados.Cluster
192 func (lr *LocalRun) throttle(logger logrus.FieldLogger) {
193 maxVcpus := runtime.NumCPU()
194 var maxRam int64 = int64(memory.TotalMemory())
196 logger.Infof("AMD_VISIBLE_DEVICES=%v", os.Getenv("AMD_VISIBLE_DEVICES"))
197 logger.Infof("CUDA_VISIBLE_DEVICES=%v", os.Getenv("CUDA_VISIBLE_DEVICES"))
199 availableCUDAGpus := strings.Split(os.Getenv("CUDA_VISIBLE_DEVICES"), ",")
200 availableROCmGpus := strings.Split(os.Getenv("AMD_VISIBLE_DEVICES"), ",")
204 availableGpus := []string{}
206 if maxGpus = len(availableCUDAGpus); maxGpus > 0 {
208 availableGpus = availableCUDAGpus
209 } else if maxGpus = len(availableROCmGpus); maxGpus > 0 {
211 availableGpus = availableROCmGpus
214 availableVcpus := maxVcpus
215 availableRam := maxRam
217 pending := []ResourceRequest{}
222 case rr := <-lr.requestResources:
223 pending = append(pending, rr)
225 case rr := <-lr.releaseResources:
226 availableVcpus += rr.vcpus
227 availableRam += rr.ram
228 for _, gpu := range rr.gpus {
229 availableGpus = append(availableGpus, gpu)
232 logger.Infof("%v released allocation (cpus: %v ram: %v gpus: %v %v); now available (cpus: %v ram: %v gpus: %v)",
233 rr.uuid, rr.vcpus, rr.ram, rr.gpus,
234 availableVcpus, availableRam, availableGpus)
236 case <-lr.ctx.Done():
240 for len(pending) > 0 {
242 if rr.vcpus < 1 || rr.vcpus > maxVcpus || rr.ram < 1 || rr.ram > maxRam || rr.gpus > maxGpus || (rr.gpus > 0 && rr.gpuStack != gpuStack) {
243 // resource request can never be fulfilled,
244 // return a zero struct
245 rr.ready <- ResourceAlloc{}
249 if rr.vcpus > availableVcpus || rr.ram > availableRam || rr.gpus > len(availableGpus) {
250 logger.Infof("Insufficient resources to start %v, waiting for next event", rr.uuid)
251 // can't be scheduled yet, go up to
252 // the top and wait for the next event
256 alloc := ResourceAlloc{uuid: rr.uuid, vcpus: rr.vcpus, ram: rr.ram}
258 availableVcpus -= rr.vcpus
259 availableRam -= rr.ram
261 for i := 0; i < rr.gpus; i++ {
262 alloc.gpuStack = alloc.gpuStack
263 alloc.gpus = append(alloc.gpus, availableGpus[len(availableGpus)-1])
264 availableGpus = availableGpus[0 : len(availableGpus)-1]
268 logger.Infof("%v added allocation (cpus: %v ram: %v gpus: %v %v); now available (cpus: %v ram: %v gpus: %v)",
269 rr.uuid, rr.vcpus, rr.ram, rr.gpus,
270 availableVcpus, availableRam, availableGpus)
273 for i := 0; i < len(pending)-1; i++ {
274 pending[i] = pending[i+1]
276 pending = pending[0 : len(pending)-1]
284 // If the container is Locked, start a new crunch-run process and wait until
285 // crunch-run completes. If the priority is set to zero, set an interrupt
286 // signal to the crunch-run process.
288 // If the container is in any other state, or is not Complete/Cancelled after
289 // crunch-run terminates, mark the container as Cancelled.
290 func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
291 container arvados.Container,
292 status <-chan arvados.Container) error {
294 uuid := container.UUID
296 if container.State == dispatch.Locked {
298 gpuStack := container.RuntimeConstraints.GPU.Stack
299 gpus := container.RuntimeConstraints.GPU.DeviceCount
301 resourceRequest := ResourceRequest{
302 uuid: container.UUID,
303 vcpus: container.RuntimeConstraints.VCPUs,
304 ram: (container.RuntimeConstraints.RAM +
305 container.RuntimeConstraints.KeepCacheRAM +
306 int64(lr.cluster.Containers.ReserveExtraRAM)),
309 ready: make(chan ResourceAlloc)}
312 case lr.requestResources <- resourceRequest:
314 case <-lr.ctx.Done():
318 var resourceAlloc ResourceAlloc
320 case resourceAlloc = <-resourceRequest.ready:
321 case <-lr.ctx.Done():
325 if resourceAlloc.vcpus == 0 {
326 dispatcher.Logger.Warnf("Container resource request %v cannot be fulfilled.", uuid)
327 dispatcher.UpdateState(uuid, dispatch.Cancelled)
332 lr.releaseResources <- resourceAlloc
337 // Check for state updates after possibly
338 // waiting to be ready-to-run
347 defer waitGroup.Done()
349 args := []string{"--runtime-engine=" + lr.cluster.Containers.RuntimeEngine}
350 args = append(args, lr.cluster.Containers.CrunchRunArgumentsList...)
351 args = append(args, uuid)
353 cmd := exec.Command(crunchRunCommand, args...)
355 cmd.Stderr = os.Stderr
356 cmd.Stdout = os.Stderr
358 cmd.Env = append(cmd.Env, fmt.Sprintf("PATH=%v", os.Getenv("PATH")))
359 cmd.Env = append(cmd.Env, fmt.Sprintf("TMPDIR=%v", os.Getenv("TMPDIR")))
360 cmd.Env = append(cmd.Env, fmt.Sprintf("ARVADOS_API_HOST=%v", os.Getenv("ARVADOS_API_HOST")))
361 cmd.Env = append(cmd.Env, fmt.Sprintf("ARVADOS_API_TOKEN=%v", os.Getenv("ARVADOS_API_TOKEN")))
363 h := hmac.New(sha256.New, []byte(lr.cluster.SystemRootToken))
364 fmt.Fprint(h, container.UUID)
365 cmd.Env = append(cmd.Env, fmt.Sprintf("GatewayAuthSecret=%x", h.Sum(nil)))
367 if resourceAlloc.gpuStack == "rocm" {
368 cmd.Env = append(cmd.Env, fmt.Sprintf("AMD_VISIBLE_DEVICES=%v", strings.Join(resourceAlloc.gpus, ",")))
370 if resourceAlloc.gpuStack == "cuda" {
371 cmd.Env = append(cmd.Env, fmt.Sprintf("CUDA_VISIBLE_DEVICES=%v", strings.Join(resourceAlloc.gpus, ",")))
374 dispatcher.Logger.Printf("starting container %v", uuid)
376 // Add this crunch job to the list of runningCmds only if we
377 // succeed in starting crunch-run.
379 runningCmdsMutex.Lock()
380 if err := lr.startCmd(container, cmd); err != nil {
381 runningCmdsMutex.Unlock()
382 dispatcher.Logger.Warnf("error starting %q for %s: %s", crunchRunCommand, uuid, err)
383 dispatcher.UpdateState(uuid, dispatch.Cancelled)
385 runningCmds[uuid] = cmd
386 runningCmdsMutex.Unlock()
388 // Need to wait for crunch-run to exit
389 done := make(chan struct{})
392 if _, err := cmd.Process.Wait(); err != nil {
393 dispatcher.Logger.Warnf("error while waiting for crunch job to finish for %v: %q", uuid, err)
395 dispatcher.Logger.Debugf("sending done")
405 // Interrupt the child process if priority changes to 0
406 if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
407 dispatcher.Logger.Printf("sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
408 cmd.Process.Signal(os.Interrupt)
414 dispatcher.Logger.Printf("finished container run for %v", uuid)
416 // Remove the crunch job from runningCmds
417 runningCmdsMutex.Lock()
418 delete(runningCmds, uuid)
419 runningCmdsMutex.Unlock()
425 // If the container is not finalized, then change it to "Cancelled".
426 err := dispatcher.Arv.Get("containers", uuid, nil, &container)
428 dispatcher.Logger.Warnf("error getting final container state: %v", err)
430 if container.State == dispatch.Locked || container.State == dispatch.Running {
431 dispatcher.Logger.Warnf("after %q process termination, container state for %v is %q; updating it to %q",
432 crunchRunCommand, uuid, container.State, dispatch.Cancelled)
433 dispatcher.UpdateState(uuid, dispatch.Cancelled)
436 // drain any subsequent status changes
440 dispatcher.Logger.Printf("finalized container %v", uuid)