8128: Update crunch-dispatch-local to use new Locked state.
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 package main
2
3 import (
4         "flag"
5         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
6         "log"
7         "os"
8         "os/exec"
9         "os/signal"
10         "sync"
11         "syscall"
12         "time"
13 )
14
15 func main() {
16         err := doMain()
17         if err != nil {
18                 log.Fatalf("%q", err)
19         }
20 }
21
22 var (
23         arv              arvadosclient.ArvadosClient
24         runningCmds      map[string]*exec.Cmd
25         runningCmdsMutex sync.Mutex
26         waitGroup        sync.WaitGroup
27         doneProcessing   chan bool
28         sigChan          chan os.Signal
29 )
30
31 func doMain() error {
32         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
33
34         pollInterval := flags.Int(
35                 "poll-interval",
36                 10,
37                 "Interval in seconds to poll for queued containers")
38
39         priorityPollInterval := flags.Int(
40                 "container-priority-poll-interval",
41                 60,
42                 "Interval in seconds to check priority of a dispatched container")
43
44         crunchRunCommand := flags.String(
45                 "crunch-run-command",
46                 "/usr/bin/crunch-run",
47                 "Crunch command to run container")
48
49         // Parse args; omit the first arg which is the command name
50         flags.Parse(os.Args[1:])
51
52         var err error
53         arv, err = arvadosclient.MakeArvadosClient()
54         if err != nil {
55                 return err
56         }
57
58         // Channel to terminate
59         doneProcessing = make(chan bool)
60
61         // Map of running crunch jobs
62         runningCmds = make(map[string]*exec.Cmd)
63
64         // Graceful shutdown
65         sigChan = make(chan os.Signal, 1)
66         signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
67         go func(sig <-chan os.Signal) {
68                 for sig := range sig {
69                         log.Printf("Caught signal: %v", sig)
70                         doneProcessing <- true
71                 }
72         }(sigChan)
73
74         // Run all queued containers
75         runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
76
77         // Finished dispatching; interrupt any crunch jobs that are still running
78         for _, cmd := range runningCmds {
79                 cmd.Process.Signal(os.Interrupt)
80         }
81
82         // Wait for all running crunch jobs to complete / terminate
83         waitGroup.Wait()
84
85         return nil
86 }
87
88 // Poll for queued containers using pollInterval.
89 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
90 //
91 // Any errors encountered are logged but the program would continue to run (not exit).
92 // This is because, once one or more crunch jobs are running,
93 // we would need to wait for them complete.
94 func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
95         ticker := time.NewTicker(pollInterval)
96
97         for {
98                 select {
99                 case <-ticker.C:
100                         dispatchLocal(priorityPollInterval, crunchRunCommand)
101                 case <-doneProcessing:
102                         ticker.Stop()
103                         return
104                 }
105         }
106 }
107
108 // Container data
109 type Container struct {
110         UUID         string `json:"uuid"`
111         State        string `json:"state"`
112         Priority     int    `json:"priority"`
113         LockedByUUID string `json:"locked_by_uuid"`
114 }
115
116 // ContainerList is a list of the containers from api
117 type ContainerList struct {
118         Items []Container `json:"items"`
119 }
120
121 // Get the list of queued containers from API server and invoke run for each container.
122 func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
123         params := arvadosclient.Dict{
124                 "filters": [][]string{[]string{"state", "=", "Queued"}},
125         }
126
127         var containers ContainerList
128         err := arv.List("containers", params, &containers)
129         if err != nil {
130                 log.Printf("Error getting list of queued containers: %q", err)
131                 return
132         }
133
134         for i := 0; i < len(containers.Items); i++ {
135                 log.Printf("About to run queued container %v", containers.Items[i].UUID)
136                 // Run the container
137                 go run(containers.Items[i].UUID, crunchRunCommand, pollInterval)
138         }
139 }
140
141 func updateState(uuid, newState string) error {
142         err := arv.Update("containers", uuid,
143                 arvadosclient.Dict{
144                         "container": arvadosclient.Dict{"state": newState}},
145                 nil)
146         if err != nil {
147                 log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
148         }
149         return err
150 }
151
152 // Run queued container:
153 // Set container state to Locked
154 // Run container using the given crunch-run command
155 // Set the container state to Running
156 // If the container priority becomes zero while crunch job is still running, terminate it.
157 func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
158         if err := updateState(uuid, "Locked"); err != nil {
159                 return
160         }
161
162         cmd := exec.Command(crunchRunCommand, uuid)
163         cmd.Stdin = nil
164         cmd.Stderr = os.Stderr
165         cmd.Stdout = os.Stderr
166
167         // Add this crunch job to the list of runningCmds only if we
168         // succeed in starting crunch-run.
169         runningCmdsMutex.Lock()
170         if err := cmd.Start(); err != nil {
171                 log.Printf("Error starting crunch-run for %v: %q", uuid, err)
172                 runningCmdsMutex.Unlock()
173                 updateState(uuid, "Queued")
174                 return
175         }
176         runningCmds[uuid] = cmd
177         runningCmdsMutex.Unlock()
178
179         defer func() {
180                 setFinalState(uuid)
181
182                 // Remove the crunch job from runningCmds
183                 runningCmdsMutex.Lock()
184                 delete(runningCmds, uuid)
185                 runningCmdsMutex.Unlock()
186         }()
187
188         log.Printf("Starting container %v", uuid)
189
190         // Add this crunch job to waitGroup
191         waitGroup.Add(1)
192         defer waitGroup.Done()
193
194         updateState(uuid, "Running")
195
196         cmdExited := make(chan struct{})
197
198         // Kill the child process if container priority changes to zero
199         go func() {
200                 ticker := time.NewTicker(pollInterval)
201                 defer ticker.Stop()
202                 for {
203                         select {
204                         case <-cmdExited:
205                                 return
206                         case <-ticker.C:
207                         }
208                         var container Container
209                         err := arv.Get("containers", uuid, nil, &container)
210                         if err != nil {
211                                 log.Printf("Error getting container %v: %q", uuid, err)
212                                 continue
213                         }
214                         if container.Priority == 0 {
215                                 log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
216                                 cmd.Process.Signal(os.Interrupt)
217                         }
218                 }
219         }()
220
221         // Wait for crunch-run to exit
222         if _, err := cmd.Process.Wait(); err != nil {
223                 log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
224         }
225         close(cmdExited)
226
227         log.Printf("Finished container run for %v", uuid)
228 }
229
230 func setFinalState(uuid string) {
231         // The container state should now be 'Complete' if everything
232         // went well. If it started but crunch-run didn't change its
233         // final state to 'Running', fix that now. If it never even
234         // started, cancel it as unrunnable. (TODO: Requeue instead,
235         // and fix tests so they can tell something happened even if
236         // the final state is Queued.)
237         var container Container
238         err := arv.Get("containers", uuid, nil, &container)
239         if err != nil {
240                 log.Printf("Error getting final container state: %v", err)
241         }
242         fixState := map[string]string{
243                 "Running": "Complete",
244                 "Locked": "Cancelled",
245         }
246         if newState, ok := fixState[container.State]; ok {
247                 log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
248                 updateState(uuid, newState)
249         }
250 }