Merge branch '10312-nodemanager-quotas' refs #10312
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 package main
2
3 // Dispatcher service for Crunch that runs containers locally.
4
5 import (
6         "context"
7         "flag"
8         "log"
9         "os"
10         "os/exec"
11         "os/signal"
12         "sync"
13         "syscall"
14         "time"
15
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18         "git.curoverse.com/arvados.git/sdk/go/dispatch"
19 )
20
21 func main() {
22         err := doMain()
23         if err != nil {
24                 log.Fatalf("%q", err)
25         }
26 }
27
28 var (
29         runningCmds      map[string]*exec.Cmd
30         runningCmdsMutex sync.Mutex
31         waitGroup        sync.WaitGroup
32         crunchRunCommand *string
33 )
34
35 func doMain() error {
36         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
37
38         pollInterval := flags.Int(
39                 "poll-interval",
40                 10,
41                 "Interval in seconds to poll for queued containers")
42
43         crunchRunCommand = flags.String(
44                 "crunch-run-command",
45                 "/usr/bin/crunch-run",
46                 "Crunch command to run container")
47
48         // Parse args; omit the first arg which is the command name
49         flags.Parse(os.Args[1:])
50
51         runningCmds = make(map[string]*exec.Cmd)
52
53         arv, err := arvadosclient.MakeArvadosClient()
54         if err != nil {
55                 log.Printf("Error making Arvados client: %v", err)
56                 return err
57         }
58         arv.Retries = 25
59
60         dispatcher := dispatch.Dispatcher{
61                 Arv:          arv,
62                 RunContainer: run,
63                 PollPeriod:   time.Duration(*pollInterval) * time.Second,
64         }
65
66         ctx, cancel := context.WithCancel(context.Background())
67         err = dispatcher.Run(ctx)
68         if err != nil {
69                 return err
70         }
71
72         c := make(chan os.Signal, 1)
73         signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
74         sig := <-c
75         log.Printf("Received %s, shutting down", sig)
76         signal.Stop(c)
77
78         cancel()
79
80         runningCmdsMutex.Lock()
81         // Finished dispatching; interrupt any crunch jobs that are still running
82         for _, cmd := range runningCmds {
83                 cmd.Process.Signal(os.Interrupt)
84         }
85         runningCmdsMutex.Unlock()
86
87         // Wait for all running crunch jobs to complete / terminate
88         waitGroup.Wait()
89
90         return nil
91 }
92
93 func startFunc(container arvados.Container, cmd *exec.Cmd) error {
94         return cmd.Start()
95 }
96
97 var startCmd = startFunc
98
99 // Run a container.
100 //
101 // If the container is Locked, start a new crunch-run process and wait until
102 // crunch-run completes.  If the priority is set to zero, set an interrupt
103 // signal to the crunch-run process.
104 //
105 // If the container is in any other state, or is not Complete/Cancelled after
106 // crunch-run terminates, mark the container as Cancelled.
107 func run(dispatcher *dispatch.Dispatcher,
108         container arvados.Container,
109         status <-chan arvados.Container) {
110
111         uuid := container.UUID
112
113         if container.State == dispatch.Locked {
114                 waitGroup.Add(1)
115
116                 cmd := exec.Command(*crunchRunCommand, uuid)
117                 cmd.Stdin = nil
118                 cmd.Stderr = os.Stderr
119                 cmd.Stdout = os.Stderr
120
121                 log.Printf("Starting container %v", uuid)
122
123                 // Add this crunch job to the list of runningCmds only if we
124                 // succeed in starting crunch-run.
125
126                 runningCmdsMutex.Lock()
127                 if err := startCmd(container, cmd); err != nil {
128                         runningCmdsMutex.Unlock()
129                         log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
130                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
131                 } else {
132                         runningCmds[uuid] = cmd
133                         runningCmdsMutex.Unlock()
134
135                         // Need to wait for crunch-run to exit
136                         done := make(chan struct{})
137
138                         go func() {
139                                 if _, err := cmd.Process.Wait(); err != nil {
140                                         log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
141                                 }
142                                 log.Printf("sending done")
143                                 done <- struct{}{}
144                         }()
145
146                 Loop:
147                         for {
148                                 select {
149                                 case <-done:
150                                         break Loop
151                                 case c := <-status:
152                                         // Interrupt the child process if priority changes to 0
153                                         if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
154                                                 log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
155                                                 cmd.Process.Signal(os.Interrupt)
156                                         }
157                                 }
158                         }
159                         close(done)
160
161                         log.Printf("Finished container run for %v", uuid)
162
163                         // Remove the crunch job from runningCmds
164                         runningCmdsMutex.Lock()
165                         delete(runningCmds, uuid)
166                         runningCmdsMutex.Unlock()
167                 }
168                 waitGroup.Done()
169         }
170
171         // If the container is not finalized, then change it to "Cancelled".
172         err := dispatcher.Arv.Get("containers", uuid, nil, &container)
173         if err != nil {
174                 log.Printf("Error getting final container state: %v", err)
175         }
176         if container.State == dispatch.Locked || container.State == dispatch.Running {
177                 log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
178                         *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
179                 dispatcher.UpdateState(uuid, dispatch.Cancelled)
180         }
181
182         // drain any subsequent status changes
183         for range status {
184         }
185
186         log.Printf("Finalized container %v", uuid)
187 }