11925: Explicitly join actor thread after stopping.
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // Dispatcher service for Crunch that runs containers locally.
8
9 import (
10         "context"
11         "flag"
12         "log"
13         "os"
14         "os/exec"
15         "os/signal"
16         "sync"
17         "syscall"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
22         "git.curoverse.com/arvados.git/sdk/go/dispatch"
23 )
24
25 func main() {
26         err := doMain()
27         if err != nil {
28                 log.Fatalf("%q", err)
29         }
30 }
31
32 var (
33         runningCmds      map[string]*exec.Cmd
34         runningCmdsMutex sync.Mutex
35         waitGroup        sync.WaitGroup
36         crunchRunCommand *string
37 )
38
39 func doMain() error {
40         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
41
42         pollInterval := flags.Int(
43                 "poll-interval",
44                 10,
45                 "Interval in seconds to poll for queued containers")
46
47         crunchRunCommand = flags.String(
48                 "crunch-run-command",
49                 "/usr/bin/crunch-run",
50                 "Crunch command to run container")
51
52         // Parse args; omit the first arg which is the command name
53         flags.Parse(os.Args[1:])
54
55         runningCmds = make(map[string]*exec.Cmd)
56
57         arv, err := arvadosclient.MakeArvadosClient()
58         if err != nil {
59                 log.Printf("Error making Arvados client: %v", err)
60                 return err
61         }
62         arv.Retries = 25
63
64         dispatcher := dispatch.Dispatcher{
65                 Arv:          arv,
66                 RunContainer: run,
67                 PollPeriod:   time.Duration(*pollInterval) * time.Second,
68         }
69
70         ctx, cancel := context.WithCancel(context.Background())
71         err = dispatcher.Run(ctx)
72         if err != nil {
73                 return err
74         }
75
76         c := make(chan os.Signal, 1)
77         signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
78         sig := <-c
79         log.Printf("Received %s, shutting down", sig)
80         signal.Stop(c)
81
82         cancel()
83
84         runningCmdsMutex.Lock()
85         // Finished dispatching; interrupt any crunch jobs that are still running
86         for _, cmd := range runningCmds {
87                 cmd.Process.Signal(os.Interrupt)
88         }
89         runningCmdsMutex.Unlock()
90
91         // Wait for all running crunch jobs to complete / terminate
92         waitGroup.Wait()
93
94         return nil
95 }
96
97 func startFunc(container arvados.Container, cmd *exec.Cmd) error {
98         return cmd.Start()
99 }
100
101 var startCmd = startFunc
102
103 // Run a container.
104 //
105 // If the container is Locked, start a new crunch-run process and wait until
106 // crunch-run completes.  If the priority is set to zero, set an interrupt
107 // signal to the crunch-run process.
108 //
109 // If the container is in any other state, or is not Complete/Cancelled after
110 // crunch-run terminates, mark the container as Cancelled.
111 func run(dispatcher *dispatch.Dispatcher,
112         container arvados.Container,
113         status <-chan arvados.Container) {
114
115         uuid := container.UUID
116
117         if container.State == dispatch.Locked {
118                 waitGroup.Add(1)
119
120                 cmd := exec.Command(*crunchRunCommand, uuid)
121                 cmd.Stdin = nil
122                 cmd.Stderr = os.Stderr
123                 cmd.Stdout = os.Stderr
124
125                 log.Printf("Starting container %v", uuid)
126
127                 // Add this crunch job to the list of runningCmds only if we
128                 // succeed in starting crunch-run.
129
130                 runningCmdsMutex.Lock()
131                 if err := startCmd(container, cmd); err != nil {
132                         runningCmdsMutex.Unlock()
133                         log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
134                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
135                 } else {
136                         runningCmds[uuid] = cmd
137                         runningCmdsMutex.Unlock()
138
139                         // Need to wait for crunch-run to exit
140                         done := make(chan struct{})
141
142                         go func() {
143                                 if _, err := cmd.Process.Wait(); err != nil {
144                                         log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
145                                 }
146                                 log.Printf("sending done")
147                                 done <- struct{}{}
148                         }()
149
150                 Loop:
151                         for {
152                                 select {
153                                 case <-done:
154                                         break Loop
155                                 case c := <-status:
156                                         // Interrupt the child process if priority changes to 0
157                                         if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
158                                                 log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
159                                                 cmd.Process.Signal(os.Interrupt)
160                                         }
161                                 }
162                         }
163                         close(done)
164
165                         log.Printf("Finished container run for %v", uuid)
166
167                         // Remove the crunch job from runningCmds
168                         runningCmdsMutex.Lock()
169                         delete(runningCmds, uuid)
170                         runningCmdsMutex.Unlock()
171                 }
172                 waitGroup.Done()
173         }
174
175         // If the container is not finalized, then change it to "Cancelled".
176         err := dispatcher.Arv.Get("containers", uuid, nil, &container)
177         if err != nil {
178                 log.Printf("Error getting final container state: %v", err)
179         }
180         if container.State == dispatch.Locked || container.State == dispatch.Running {
181                 log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
182                         *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
183                 dispatcher.UpdateState(uuid, dispatch.Cancelled)
184         }
185
186         // drain any subsequent status changes
187         for range status {
188         }
189
190         log.Printf("Finalized container %v", uuid)
191 }