10700: Rename PollInterval to PollPeriod in library to match commands and config...
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 package main
2
3 // Dispatcher service for Crunch that runs containers locally.
4
5 import (
6         "flag"
7         "git.curoverse.com/arvados.git/sdk/go/arvados"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/dispatch"
10         "log"
11         "os"
12         "os/exec"
13         "os/signal"
14         "sync"
15         "syscall"
16         "time"
17 )
18
19 func main() {
20         err := doMain()
21         if err != nil {
22                 log.Fatalf("%q", err)
23         }
24 }
25
26 var (
27         runningCmds      map[string]*exec.Cmd
28         runningCmdsMutex sync.Mutex
29         waitGroup        sync.WaitGroup
30         crunchRunCommand *string
31 )
32
33 func doMain() error {
34         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
35
36         pollInterval := flags.Int(
37                 "poll-interval",
38                 10,
39                 "Interval in seconds to poll for queued containers")
40
41         crunchRunCommand = flags.String(
42                 "crunch-run-command",
43                 "/usr/bin/crunch-run",
44                 "Crunch command to run container")
45
46         // Parse args; omit the first arg which is the command name
47         flags.Parse(os.Args[1:])
48
49         runningCmds = make(map[string]*exec.Cmd)
50
51         arv, err := arvadosclient.MakeArvadosClient()
52         if err != nil {
53                 log.Printf("Error making Arvados client: %v", err)
54                 return err
55         }
56         arv.Retries = 25
57
58         dispatcher := dispatch.Dispatcher{
59                 Arv:          arv,
60                 RunContainer: run,
61                 PollPeriod:   time.Duration(*pollInterval) * time.Second,
62         }
63
64         err = dispatcher.Run()
65         if err != nil {
66                 return err
67         }
68
69         c := make(chan os.Signal, 1)
70         signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
71         sig := <-c
72         log.Printf("Received %s, shutting down", sig)
73         signal.Stop(c)
74
75         dispatcher.Stop()
76
77         runningCmdsMutex.Lock()
78         // Finished dispatching; interrupt any crunch jobs that are still running
79         for _, cmd := range runningCmds {
80                 cmd.Process.Signal(os.Interrupt)
81         }
82         runningCmdsMutex.Unlock()
83
84         // Wait for all running crunch jobs to complete / terminate
85         waitGroup.Wait()
86
87         return nil
88 }
89
90 func startFunc(container arvados.Container, cmd *exec.Cmd) error {
91         return cmd.Start()
92 }
93
94 var startCmd = startFunc
95
96 // Run a container.
97 //
98 // If the container is Locked, start a new crunch-run process and wait until
99 // crunch-run completes.  If the priority is set to zero, set an interrupt
100 // signal to the crunch-run process.
101 //
102 // If the container is in any other state, or is not Complete/Cancelled after
103 // crunch-run terminates, mark the container as Cancelled.
104 func run(dispatcher *dispatch.Dispatcher,
105         container arvados.Container,
106         status chan arvados.Container) {
107
108         uuid := container.UUID
109
110         if container.State == dispatch.Locked {
111                 waitGroup.Add(1)
112
113                 cmd := exec.Command(*crunchRunCommand, uuid)
114                 cmd.Stdin = nil
115                 cmd.Stderr = os.Stderr
116                 cmd.Stdout = os.Stderr
117
118                 log.Printf("Starting container %v", uuid)
119
120                 // Add this crunch job to the list of runningCmds only if we
121                 // succeed in starting crunch-run.
122
123                 runningCmdsMutex.Lock()
124                 if err := startCmd(container, cmd); err != nil {
125                         runningCmdsMutex.Unlock()
126                         log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
127                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
128                 } else {
129                         runningCmds[uuid] = cmd
130                         runningCmdsMutex.Unlock()
131
132                         // Need to wait for crunch-run to exit
133                         done := make(chan struct{})
134
135                         go func() {
136                                 if _, err := cmd.Process.Wait(); err != nil {
137                                         log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
138                                 }
139                                 log.Printf("sending done")
140                                 done <- struct{}{}
141                         }()
142
143                 Loop:
144                         for {
145                                 select {
146                                 case <-done:
147                                         break Loop
148                                 case c := <-status:
149                                         // Interrupt the child process if priority changes to 0
150                                         if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
151                                                 log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
152                                                 cmd.Process.Signal(os.Interrupt)
153                                         }
154                                 }
155                         }
156                         close(done)
157
158                         log.Printf("Finished container run for %v", uuid)
159
160                         // Remove the crunch job from runningCmds
161                         runningCmdsMutex.Lock()
162                         delete(runningCmds, uuid)
163                         runningCmdsMutex.Unlock()
164                 }
165                 waitGroup.Done()
166         }
167
168         // If the container is not finalized, then change it to "Cancelled".
169         err := dispatcher.Arv.Get("containers", uuid, nil, &container)
170         if err != nil {
171                 log.Printf("Error getting final container state: %v", err)
172         }
173         if container.LockedByUUID == dispatcher.Auth.UUID &&
174                 (container.State == dispatch.Locked || container.State == dispatch.Running) {
175                 log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
176                         *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
177                 dispatcher.UpdateState(uuid, dispatch.Cancelled)
178         }
179
180         // drain any subsequent status changes
181         for range status {
182         }
183
184         log.Printf("Finalized container %v", uuid)
185 }