17816: Add --runtime-engine to crunch-dispatch-local and crunch-dispatch-slurm
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // Dispatcher service for Crunch that runs containers locally.
8
9 import (
10         "context"
11         "flag"
12         "fmt"
13         "os"
14         "os/exec"
15         "os/signal"
16         "sync"
17         "syscall"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/config"
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
23         "git.arvados.org/arvados.git/sdk/go/dispatch"
24         "github.com/sirupsen/logrus"
25 )
26
27 var version = "dev"
28
29 func main() {
30         err := doMain()
31         if err != nil {
32                 logrus.Fatalf("%q", err)
33         }
34 }
35
36 var (
37         runningCmds      map[string]*exec.Cmd
38         runningCmdsMutex sync.Mutex
39         waitGroup        sync.WaitGroup
40         crunchRunCommand *string
41 )
42
43 func doMain() error {
44         logger := logrus.StandardLogger()
45         if os.Getenv("DEBUG") != "" {
46                 logger.SetLevel(logrus.DebugLevel)
47         }
48         logger.Formatter = &logrus.JSONFormatter{
49                 TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
50         }
51
52         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
53
54         pollInterval := flags.Int(
55                 "poll-interval",
56                 10,
57                 "Interval in seconds to poll for queued containers")
58
59         crunchRunCommand = flags.String(
60                 "crunch-run-command",
61                 "/usr/bin/crunch-run",
62                 "Crunch command to run container")
63
64         getVersion := flags.Bool(
65                 "version",
66                 false,
67                 "Print version information and exit.")
68
69         // Parse args; omit the first arg which is the command name
70         flags.Parse(os.Args[1:])
71
72         // Print version information if requested
73         if *getVersion {
74                 fmt.Printf("crunch-dispatch-local %s\n", version)
75                 return nil
76         }
77
78         loader := config.NewLoader(nil, logger)
79         cfg, err := loader.Load()
80         cluster, err := cfg.GetCluster("")
81         if err != nil {
82                 return fmt.Errorf("config error: %s", err)
83         }
84
85         logger.Printf("crunch-dispatch-local %s started", version)
86
87         runningCmds = make(map[string]*exec.Cmd)
88
89         arv, err := arvadosclient.MakeArvadosClient()
90         if err != nil {
91                 logger.Errorf("error making Arvados client: %v", err)
92                 return err
93         }
94         arv.Retries = 25
95
96         ctx, cancel := context.WithCancel(context.Background())
97
98         dispatcher := dispatch.Dispatcher{
99                 Logger:       logger,
100                 Arv:          arv,
101                 RunContainer: (&LocalRun{startFunc, make(chan bool, 8), ctx, cluster}).run,
102                 PollPeriod:   time.Duration(*pollInterval) * time.Second,
103         }
104
105         err = dispatcher.Run(ctx)
106         if err != nil {
107                 return err
108         }
109
110         c := make(chan os.Signal, 1)
111         signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
112         sig := <-c
113         logger.Printf("Received %s, shutting down", sig)
114         signal.Stop(c)
115
116         cancel()
117
118         runningCmdsMutex.Lock()
119         // Finished dispatching; interrupt any crunch jobs that are still running
120         for _, cmd := range runningCmds {
121                 cmd.Process.Signal(os.Interrupt)
122         }
123         runningCmdsMutex.Unlock()
124
125         // Wait for all running crunch jobs to complete / terminate
126         waitGroup.Wait()
127
128         return nil
129 }
130
131 func startFunc(container arvados.Container, cmd *exec.Cmd) error {
132         return cmd.Start()
133 }
134
135 type LocalRun struct {
136         startCmd         func(container arvados.Container, cmd *exec.Cmd) error
137         concurrencyLimit chan bool
138         ctx              context.Context
139         cluster          *arvados.Cluster
140 }
141
142 // Run a container.
143 //
144 // If the container is Locked, start a new crunch-run process and wait until
145 // crunch-run completes.  If the priority is set to zero, set an interrupt
146 // signal to the crunch-run process.
147 //
148 // If the container is in any other state, or is not Complete/Cancelled after
149 // crunch-run terminates, mark the container as Cancelled.
150 func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
151         container arvados.Container,
152         status <-chan arvados.Container) {
153
154         uuid := container.UUID
155
156         if container.State == dispatch.Locked {
157
158                 select {
159                 case lr.concurrencyLimit <- true:
160                         break
161                 case <-lr.ctx.Done():
162                         return
163                 }
164
165                 defer func() { <-lr.concurrencyLimit }()
166
167                 select {
168                 case c := <-status:
169                         // Check for state updates after possibly
170                         // waiting to be ready-to-run
171                         if c.Priority == 0 {
172                                 goto Finish
173                         }
174                 default:
175                         break
176                 }
177
178                 waitGroup.Add(1)
179                 defer waitGroup.Done()
180
181                 cmd := exec.Command(*crunchRunCommand, "--runtime-engine="+lr.cluster.Containers.RuntimeEngine, uuid)
182                 cmd.Stdin = nil
183                 cmd.Stderr = os.Stderr
184                 cmd.Stdout = os.Stderr
185
186                 dispatcher.Logger.Printf("starting container %v", uuid)
187
188                 // Add this crunch job to the list of runningCmds only if we
189                 // succeed in starting crunch-run.
190
191                 runningCmdsMutex.Lock()
192                 if err := lr.startCmd(container, cmd); err != nil {
193                         runningCmdsMutex.Unlock()
194                         dispatcher.Logger.Warnf("error starting %q for %s: %s", *crunchRunCommand, uuid, err)
195                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
196                 } else {
197                         runningCmds[uuid] = cmd
198                         runningCmdsMutex.Unlock()
199
200                         // Need to wait for crunch-run to exit
201                         done := make(chan struct{})
202
203                         go func() {
204                                 if _, err := cmd.Process.Wait(); err != nil {
205                                         dispatcher.Logger.Warnf("error while waiting for crunch job to finish for %v: %q", uuid, err)
206                                 }
207                                 dispatcher.Logger.Debugf("sending done")
208                                 done <- struct{}{}
209                         }()
210
211                 Loop:
212                         for {
213                                 select {
214                                 case <-done:
215                                         break Loop
216                                 case c := <-status:
217                                         // Interrupt the child process if priority changes to 0
218                                         if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
219                                                 dispatcher.Logger.Printf("sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
220                                                 cmd.Process.Signal(os.Interrupt)
221                                         }
222                                 }
223                         }
224                         close(done)
225
226                         dispatcher.Logger.Printf("finished container run for %v", uuid)
227
228                         // Remove the crunch job from runningCmds
229                         runningCmdsMutex.Lock()
230                         delete(runningCmds, uuid)
231                         runningCmdsMutex.Unlock()
232                 }
233         }
234
235 Finish:
236
237         // If the container is not finalized, then change it to "Cancelled".
238         err := dispatcher.Arv.Get("containers", uuid, nil, &container)
239         if err != nil {
240                 dispatcher.Logger.Warnf("error getting final container state: %v", err)
241         }
242         if container.State == dispatch.Locked || container.State == dispatch.Running {
243                 dispatcher.Logger.Warnf("after %q process termination, container state for %v is %q; updating it to %q",
244                         *crunchRunCommand, uuid, container.State, dispatch.Cancelled)
245                 dispatcher.UpdateState(uuid, dispatch.Cancelled)
246         }
247
248         // drain any subsequent status changes
249         for range status {
250         }
251
252         dispatcher.Logger.Printf("finalized container %v", uuid)
253 }