]> git.arvados.org - arvados.git/blob - services/crunch-dispatch-local/crunch-dispatch-local.go
22314: Correctly use CrunchRunCommand and CrunchRunArgumentsList
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // Dispatcher service for Crunch that runs containers locally.
8
9 import (
10         "context"
11         "flag"
12         "fmt"
13         "os"
14         "os/exec"
15         "os/signal"
16         "runtime"
17         "sync"
18         "syscall"
19         "time"
20
21         "git.arvados.org/arvados.git/lib/cmd"
22         "git.arvados.org/arvados.git/lib/config"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25         "git.arvados.org/arvados.git/sdk/go/dispatch"
26         "github.com/pbnjay/memory"
27         "github.com/sirupsen/logrus"
28 )
29
30 var version = "dev"
31
32 var (
33         runningCmds      map[string]*exec.Cmd
34         runningCmdsMutex sync.Mutex
35         waitGroup        sync.WaitGroup
36         crunchRunCommand string
37 )
38
39 func main() {
40         baseLogger := logrus.StandardLogger()
41         if os.Getenv("DEBUG") != "" {
42                 baseLogger.SetLevel(logrus.DebugLevel)
43         }
44         baseLogger.Formatter = &logrus.JSONFormatter{
45                 TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
46         }
47
48         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
49
50         pollInterval := flags.Int(
51                 "poll-interval",
52                 10,
53                 "Interval in seconds to poll for queued containers")
54
55         flags.StringVar(&crunchRunCommand,
56                 "crunch-run-command",
57                 "",
58                 "Crunch command to run container")
59
60         getVersion := flags.Bool(
61                 "version",
62                 false,
63                 "Print version information and exit.")
64
65         if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "", os.Stderr); !ok {
66                 os.Exit(code)
67         }
68
69         // Print version information if requested
70         if *getVersion {
71                 fmt.Printf("crunch-dispatch-local %s\n", version)
72                 return
73         }
74
75         loader := config.NewLoader(nil, baseLogger)
76         cfg, err := loader.Load()
77         if err != nil {
78                 fmt.Fprintf(os.Stderr, "error loading config: %s\n", err)
79                 os.Exit(1)
80         }
81         cluster, err := cfg.GetCluster("")
82         if err != nil {
83                 fmt.Fprintf(os.Stderr, "config error: %s\n", err)
84                 os.Exit(1)
85         }
86
87         if crunchRunCommand == "" {
88                 crunchRunCommand = cluster.Containers.CrunchRunCommand
89         }
90
91         logger := baseLogger.WithField("ClusterID", cluster.ClusterID)
92         logger.Printf("crunch-dispatch-local %s started", version)
93
94         runningCmds = make(map[string]*exec.Cmd)
95
96         var client arvados.Client
97         client.APIHost = cluster.Services.Controller.ExternalURL.Host
98         client.AuthToken = cluster.SystemRootToken
99         client.Insecure = cluster.TLS.Insecure
100
101         if client.APIHost != "" || client.AuthToken != "" {
102                 // Copy real configs into env vars so [a]
103                 // MakeArvadosClient() uses them, and [b] they get
104                 // propagated to crunch-run via SLURM.
105                 os.Setenv("ARVADOS_API_HOST", client.APIHost)
106                 os.Setenv("ARVADOS_API_TOKEN", client.AuthToken)
107                 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
108                 if client.Insecure {
109                         os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
110                 }
111         } else {
112                 logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
113         }
114
115         arv, err := arvadosclient.MakeArvadosClient()
116         if err != nil {
117                 logger.Errorf("error making Arvados client: %v", err)
118                 os.Exit(1)
119         }
120         arv.Retries = 25
121
122         ctx, cancel := context.WithCancel(context.Background())
123
124         localRun := LocalRun{startFunc, make(chan ResourceRequest), ctx, cluster}
125
126         go localRun.throttle(logger)
127
128         dispatcher := dispatch.Dispatcher{
129                 Logger:       logger,
130                 Arv:          arv,
131                 RunContainer: localRun.run,
132                 PollPeriod:   time.Duration(*pollInterval) * time.Second,
133         }
134
135         err = dispatcher.Run(ctx)
136         if err != nil {
137                 logger.Error(err)
138                 return
139         }
140
141         c := make(chan os.Signal, 1)
142         signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
143         sig := <-c
144         logger.Printf("Received %s, shutting down", sig)
145         signal.Stop(c)
146
147         cancel()
148
149         runningCmdsMutex.Lock()
150         // Finished dispatching; interrupt any crunch jobs that are still running
151         for _, cmd := range runningCmds {
152                 cmd.Process.Signal(os.Interrupt)
153         }
154         runningCmdsMutex.Unlock()
155
156         // Wait for all running crunch jobs to complete / terminate
157         waitGroup.Wait()
158 }
159
160 func startFunc(container arvados.Container, cmd *exec.Cmd) error {
161         return cmd.Start()
162 }
163
164 type ResourceRequest struct {
165         uuid  string
166         vcpus int
167         ram   int64
168         gpus  int
169         ready chan bool
170 }
171
172 type LocalRun struct {
173         startCmd         func(container arvados.Container, cmd *exec.Cmd) error
174         concurrencyLimit chan ResourceRequest
175         ctx              context.Context
176         cluster          *arvados.Cluster
177 }
178
179 func (lr *LocalRun) throttle(logger logrus.FieldLogger) {
180         maxVcpus := runtime.NumCPU()
181         var maxRam int64 = int64(memory.TotalMemory())
182
183         // treat all GPUs as a single resource for now.
184         maxGpus := 1
185
186         var allocVcpus int
187         var allocRam int64
188         var allocGpus int
189
190         pending := []ResourceRequest{}
191
192 NextEvent:
193         for {
194                 rr := <-lr.concurrencyLimit
195
196                 if rr.vcpus > 0 {
197                         // allocating resources
198                         pending = append(pending, rr)
199                 } else {
200                         // releasing resources (these should be
201                         // negative numbers)
202                         allocVcpus += rr.vcpus
203                         allocRam += rr.ram
204                         allocGpus += rr.gpus
205
206                         logger.Infof("%v removed allocation (cpus: %v ram: %v gpus: %v); total allocated (cpus: %v ram: %v gpus: %v)",
207                                 rr.uuid, rr.vcpus, rr.ram, rr.gpus,
208                                 allocVcpus, allocRam, allocGpus)
209                 }
210
211                 for len(pending) > 0 {
212                         rr := pending[0]
213                         if rr.vcpus > maxVcpus || rr.ram > maxRam || rr.gpus > maxGpus {
214                                 // resource request can never be fulfilled
215                                 rr.ready <- false
216                                 continue
217                         }
218
219                         if (allocVcpus+rr.vcpus) > maxVcpus || (allocRam+rr.ram) > maxRam || (allocGpus+rr.gpus) > maxGpus {
220                                 logger.Info("Insufficient resources to start %v, waiting for next event", rr.uuid)
221                                 // can't be scheduled yet, go up to
222                                 // the top and wait for the next event
223                                 continue NextEvent
224                         }
225
226                         allocVcpus += rr.vcpus
227                         allocRam += rr.ram
228                         allocGpus += rr.gpus
229                         rr.ready <- true
230
231                         logger.Infof("%v added allocation (cpus: %v ram: %v gpus: %v); total allocated (cpus: %v ram: %v gpus: %v)",
232                                 rr.uuid, rr.vcpus, rr.ram, rr.gpus,
233                                 allocVcpus, allocRam, allocGpus)
234
235                         // shift up
236                         for i := 0; i < len(pending)-1; i++ {
237                                 pending[i] = pending[i+1]
238                         }
239                         pending = pending[0 : len(pending)-1]
240                 }
241
242         }
243 }
244
245 // Run a container.
246 //
247 // If the container is Locked, start a new crunch-run process and wait until
248 // crunch-run completes.  If the priority is set to zero, set an interrupt
249 // signal to the crunch-run process.
250 //
251 // If the container is in any other state, or is not Complete/Cancelled after
252 // crunch-run terminates, mark the container as Cancelled.
253 func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
254         container arvados.Container,
255         status <-chan arvados.Container) error {
256
257         uuid := container.UUID
258
259         if container.State == dispatch.Locked {
260
261                 resourceRequest := ResourceRequest{
262                         uuid:  container.UUID,
263                         vcpus: container.RuntimeConstraints.VCPUs,
264                         ram: (container.RuntimeConstraints.RAM +
265                                 container.RuntimeConstraints.KeepCacheRAM +
266                                 int64(lr.cluster.Containers.ReserveExtraRAM)),
267                         gpus:  container.RuntimeConstraints.CUDA.DeviceCount,
268                         ready: make(chan bool)}
269
270                 select {
271                 case lr.concurrencyLimit <- resourceRequest:
272                         break
273                 case <-lr.ctx.Done():
274                         return lr.ctx.Err()
275                 }
276
277                 canRun := <-resourceRequest.ready
278
279                 if !canRun {
280                         dispatcher.Logger.Warnf("Container resource request %v cannot be fulfilled.", uuid)
281                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
282                         return nil
283                 }
284
285                 defer func() {
286                         resourceRequest.vcpus = -resourceRequest.vcpus
287                         resourceRequest.ram = -resourceRequest.ram
288                         resourceRequest.gpus = -resourceRequest.gpus
289                         lr.concurrencyLimit <- resourceRequest
290                 }()
291
292                 select {
293                 case c := <-status:
294                         // Check for state updates after possibly
295                         // waiting to be ready-to-run
296                         if c.Priority == 0 {
297                                 goto Finish
298                         }
299                 default:
300                         break
301                 }
302
303                 waitGroup.Add(1)
304                 defer waitGroup.Done()
305
306                 args := []string{"--runtime-engine=" + lr.cluster.Containers.RuntimeEngine}
307                 args = append(args, lr.cluster.Containers.CrunchRunArgumentsList...)
308                 args = append(args, uuid)
309
310                 cmd := exec.Command(crunchRunCommand, args...)
311                 cmd.Stdin = nil
312                 cmd.Stderr = os.Stderr
313                 cmd.Stdout = os.Stderr
314
315                 dispatcher.Logger.Printf("starting container %v", uuid)
316
317                 // Add this crunch job to the list of runningCmds only if we
318                 // succeed in starting crunch-run.
319
320                 runningCmdsMutex.Lock()
321                 if err := lr.startCmd(container, cmd); err != nil {
322                         runningCmdsMutex.Unlock()
323                         dispatcher.Logger.Warnf("error starting %q for %s: %s", crunchRunCommand, uuid, err)
324                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
325                 } else {
326                         runningCmds[uuid] = cmd
327                         runningCmdsMutex.Unlock()
328
329                         // Need to wait for crunch-run to exit
330                         done := make(chan struct{})
331
332                         go func() {
333                                 if _, err := cmd.Process.Wait(); err != nil {
334                                         dispatcher.Logger.Warnf("error while waiting for crunch job to finish for %v: %q", uuid, err)
335                                 }
336                                 dispatcher.Logger.Debugf("sending done")
337                                 done <- struct{}{}
338                         }()
339
340                 Loop:
341                         for {
342                                 select {
343                                 case <-done:
344                                         break Loop
345                                 case c := <-status:
346                                         // Interrupt the child process if priority changes to 0
347                                         if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
348                                                 dispatcher.Logger.Printf("sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
349                                                 cmd.Process.Signal(os.Interrupt)
350                                         }
351                                 }
352                         }
353                         close(done)
354
355                         dispatcher.Logger.Printf("finished container run for %v", uuid)
356
357                         // Remove the crunch job from runningCmds
358                         runningCmdsMutex.Lock()
359                         delete(runningCmds, uuid)
360                         runningCmdsMutex.Unlock()
361                 }
362         }
363
364 Finish:
365
366         // If the container is not finalized, then change it to "Cancelled".
367         err := dispatcher.Arv.Get("containers", uuid, nil, &container)
368         if err != nil {
369                 dispatcher.Logger.Warnf("error getting final container state: %v", err)
370         }
371         if container.State == dispatch.Locked || container.State == dispatch.Running {
372                 dispatcher.Logger.Warnf("after %q process termination, container state for %v is %q; updating it to %q",
373                         crunchRunCommand, uuid, container.State, dispatch.Cancelled)
374                 dispatcher.UpdateState(uuid, dispatch.Cancelled)
375         }
376
377         // drain any subsequent status changes
378         for range status {
379         }
380
381         dispatcher.Logger.Printf("finalized container %v", uuid)
382         return nil
383 }