Merge branch '8231-publish-arvbox' closes #8231
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 package main
2
3 import (
4         "flag"
5         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
6         "log"
7         "os"
8         "os/exec"
9         "os/signal"
10         "sync"
11         "syscall"
12         "time"
13 )
14
15 func main() {
16         err := doMain()
17         if err != nil {
18                 log.Fatalf("%q", err)
19         }
20 }
21
22 var (
23         arv              arvadosclient.ArvadosClient
24         runningCmds      map[string]*exec.Cmd
25         runningCmdsMutex sync.Mutex
26         waitGroup        sync.WaitGroup
27         doneProcessing   chan bool
28         sigChan          chan os.Signal
29 )
30
31 func doMain() error {
32         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
33
34         pollInterval := flags.Int(
35                 "poll-interval",
36                 10,
37                 "Interval in seconds to poll for queued containers")
38
39         priorityPollInterval := flags.Int(
40                 "container-priority-poll-interval",
41                 60,
42                 "Interval in seconds to check priority of a dispatched container")
43
44         crunchRunCommand := flags.String(
45                 "crunch-run-command",
46                 "/usr/bin/crunch-run",
47                 "Crunch command to run container")
48
49         // Parse args; omit the first arg which is the command name
50         flags.Parse(os.Args[1:])
51
52         var err error
53         arv, err = arvadosclient.MakeArvadosClient()
54         if err != nil {
55                 return err
56         }
57
58         // Channel to terminate
59         doneProcessing = make(chan bool)
60
61         // Map of running crunch jobs
62         runningCmds = make(map[string]*exec.Cmd)
63
64         // Graceful shutdown
65         sigChan = make(chan os.Signal, 1)
66         signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
67         go func(sig <-chan os.Signal) {
68                 for sig := range sig {
69                         log.Printf("Caught signal: %v", sig)
70                         doneProcessing <- true
71                 }
72         }(sigChan)
73
74         // Run all queued containers
75         runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
76
77         // Finished dispatching; interrupt any crunch jobs that are still running
78         for _, cmd := range runningCmds {
79                 cmd.Process.Signal(os.Interrupt)
80         }
81
82         // Wait for all running crunch jobs to complete / terminate
83         waitGroup.Wait()
84
85         return nil
86 }
87
88 // Poll for queued containers using pollInterval.
89 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
90 //
91 // Any errors encountered are logged but the program would continue to run (not exit).
92 // This is because, once one or more crunch jobs are running,
93 // we would need to wait for them complete.
94 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
95         ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
96
97         for {
98                 select {
99                 case <-ticker.C:
100                         dispatchLocal(priorityPollInterval, crunchRunCommand)
101                 case <-doneProcessing:
102                         ticker.Stop()
103                         return
104                 }
105         }
106 }
107
108 // Container data
109 type Container struct {
110         UUID     string `json:"uuid"`
111         State    string `json:"state"`
112         Priority int    `json:"priority"`
113 }
114
115 // ContainerList is a list of the containers from api
116 type ContainerList struct {
117         Items []Container `json:"items"`
118 }
119
120 // Get the list of queued containers from API server and invoke run for each container.
121 func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
122         params := arvadosclient.Dict{
123                 "filters": [][]string{[]string{"state", "=", "Queued"}},
124         }
125
126         var containers ContainerList
127         err := arv.List("containers", params, &containers)
128         if err != nil {
129                 log.Printf("Error getting list of queued containers: %q", err)
130                 return
131         }
132
133         for i := 0; i < len(containers.Items); i++ {
134                 log.Printf("About to run queued container %v", containers.Items[i].UUID)
135                 // Run the container
136                 go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
137         }
138 }
139
140 // Run queued container:
141 // Set container state to locked (TBD)
142 // Run container using the given crunch-run command
143 // Set the container state to Running
144 // If the container priority becomes zero while crunch job is still running, terminate it.
145 func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
146         cmd := exec.Command(crunchRunCommand, uuid)
147
148         cmd.Stdin = nil
149         cmd.Stderr = os.Stderr
150         cmd.Stdout = os.Stderr
151         if err := cmd.Start(); err != nil {
152                 log.Printf("Error running container for %v: %q", uuid, err)
153                 return
154         }
155
156         // Add this crunch job to the list of runningCmds
157         runningCmdsMutex.Lock()
158         runningCmds[uuid] = cmd
159         runningCmdsMutex.Unlock()
160
161         log.Printf("Started container run for %v", uuid)
162
163         // Add this crunch job to waitGroup
164         waitGroup.Add(1)
165         defer waitGroup.Done()
166
167         // Update container status to Running
168         err := arv.Update("containers", uuid,
169                 arvadosclient.Dict{
170                         "container": arvadosclient.Dict{"state": "Running"}},
171                 nil)
172         if err != nil {
173                 log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
174         }
175
176         // A goroutine to terminate the runner if container priority becomes zero
177         priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
178         go func() {
179                 for _ = range priorityTicker.C {
180                         var container Container
181                         err := arv.Get("containers", uuid, nil, &container)
182                         if err != nil {
183                                 log.Printf("Error getting container info for %v: %q", uuid, err)
184                         } else {
185                                 if container.Priority == 0 {
186                                         priorityTicker.Stop()
187                                         cmd.Process.Signal(os.Interrupt)
188                                 }
189                         }
190                 }
191         }()
192
193         // Wait for the crunch job to exit
194         if _, err := cmd.Process.Wait(); err != nil {
195                 log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
196         }
197
198         // Remove the crunch job to runningCmds
199         runningCmdsMutex.Lock()
200         delete(runningCmds, uuid)
201         runningCmdsMutex.Unlock()
202
203         priorityTicker.Stop()
204
205         // The container state should be 'Complete'
206         var container Container
207         err = arv.Get("containers", uuid, nil, &container)
208         if container.State == "Running" {
209                 log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
210                 err = arv.Update("containers", uuid,
211                         arvadosclient.Dict{
212                                 "container": arvadosclient.Dict{"state": "Complete"}},
213                         nil)
214                 if err != nil {
215                         log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
216                 }
217         }
218
219         log.Printf("Finished container run for %v", uuid)
220 }