5 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
23 arv arvadosclient.ArvadosClient
24 runningCmds map[string]*exec.Cmd
25 runningCmdsMutex sync.Mutex
26 waitGroup sync.WaitGroup
27 doneProcessing chan bool
28 sigChan chan os.Signal
32 flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
34 pollInterval := flags.Int(
37 "Interval in seconds to poll for queued containers")
39 priorityPollInterval := flags.Int(
40 "container-priority-poll-interval",
42 "Interval in seconds to check priority of a dispatched container")
44 crunchRunCommand := flags.String(
46 "/usr/bin/crunch-run",
47 "Crunch command to run container")
49 // Parse args; omit the first arg which is the command name
50 flags.Parse(os.Args[1:])
53 arv, err = arvadosclient.MakeArvadosClient()
58 // Channel to terminate
59 doneProcessing = make(chan bool)
61 // Map of running crunch jobs
62 runningCmds = make(map[string]*exec.Cmd)
65 sigChan = make(chan os.Signal, 1)
66 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
67 go func(sig <-chan os.Signal) {
68 for sig := range sig {
69 log.Printf("Caught signal: %v", sig)
70 doneProcessing <- true
74 // Run all queued containers
75 runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
77 // Finished dispatching; interrupt any crunch jobs that are still running
78 for _, cmd := range runningCmds {
79 cmd.Process.Signal(os.Interrupt)
82 // Wait for all running crunch jobs to complete / terminate
88 // Poll for queued containers using pollInterval.
89 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
91 // Any errors encountered are logged but the program would continue to run (not exit).
92 // This is because, once one or more crunch jobs are running,
93 // we would need to wait for them complete.
94 func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
95 ticker := time.NewTicker(pollInterval)
100 dispatchLocal(priorityPollInterval, crunchRunCommand)
101 case <-doneProcessing:
109 type Container struct {
110 UUID string `json:"uuid"`
111 State string `json:"state"`
112 Priority int `json:"priority"`
113 LockedByUUID string `json:"locked_by_uuid"`
116 // ContainerList is a list of the containers from api
117 type ContainerList struct {
118 Items []Container `json:"items"`
121 // Get the list of queued containers from API server and invoke run for each container.
122 func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
123 params := arvadosclient.Dict{
124 "filters": [][]string{[]string{"state", "=", "Queued"}},
127 var containers ContainerList
128 err := arv.List("containers", params, &containers)
130 log.Printf("Error getting list of queued containers: %q", err)
134 for i := 0; i < len(containers.Items); i++ {
135 log.Printf("About to run queued container %v", containers.Items[i].UUID)
137 go run(containers.Items[i].UUID, crunchRunCommand, pollInterval)
141 func updateState(uuid, newState string) error {
142 err := arv.Update("containers", uuid,
144 "container": arvadosclient.Dict{"state": newState}},
147 log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
152 // Run queued container:
153 // Set container state to Locked
154 // Run container using the given crunch-run command
155 // Set the container state to Running
156 // If the container priority becomes zero while crunch job is still running, terminate it.
157 func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
158 if err := updateState(uuid, "Locked"); err != nil {
162 cmd := exec.Command(crunchRunCommand, uuid)
164 cmd.Stderr = os.Stderr
165 cmd.Stdout = os.Stderr
167 // Add this crunch job to the list of runningCmds only if we
168 // succeed in starting crunch-run.
169 runningCmdsMutex.Lock()
170 if err := cmd.Start(); err != nil {
171 log.Printf("Error starting crunch-run for %v: %q", uuid, err)
172 runningCmdsMutex.Unlock()
173 updateState(uuid, "Queued")
176 runningCmds[uuid] = cmd
177 runningCmdsMutex.Unlock()
182 // Remove the crunch job from runningCmds
183 runningCmdsMutex.Lock()
184 delete(runningCmds, uuid)
185 runningCmdsMutex.Unlock()
188 log.Printf("Starting container %v", uuid)
190 // Add this crunch job to waitGroup
192 defer waitGroup.Done()
194 updateState(uuid, "Running")
196 cmdExited := make(chan struct{})
198 // Kill the child process if container priority changes to zero
200 ticker := time.NewTicker(pollInterval)
208 var container Container
209 err := arv.Get("containers", uuid, nil, &container)
211 log.Printf("Error getting container %v: %q", uuid, err)
214 if container.Priority == 0 {
215 log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
216 cmd.Process.Signal(os.Interrupt)
221 // Wait for crunch-run to exit
222 if _, err := cmd.Process.Wait(); err != nil {
223 log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
227 log.Printf("Finished container run for %v", uuid)
230 func setFinalState(uuid string) {
231 // The container state should now be 'Complete' if everything
232 // went well. If it started but crunch-run didn't change its
233 // final state to 'Running', fix that now. If it never even
234 // started, cancel it as unrunnable. (TODO: Requeue instead,
235 // and fix tests so they can tell something happened even if
236 // the final state is Queued.)
237 var container Container
238 err := arv.Get("containers", uuid, nil, &container)
240 log.Printf("Error getting final container state: %v", err)
242 fixState := map[string]string{
243 "Running": "Complete",
244 "Locked": "Cancelled",
246 if newState, ok := fixState[container.State]; ok {
247 log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
248 updateState(uuid, newState)