5 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
23 arv arvadosclient.ArvadosClient
24 runningCmds map[string]*exec.Cmd
25 runningCmdsMutex sync.Mutex
26 waitGroup sync.WaitGroup
27 doneProcessing chan bool
28 sigChan chan os.Signal
32 flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
34 pollInterval := flags.Int(
37 "Interval in seconds to poll for queued containers")
39 priorityPollInterval := flags.Int(
40 "container-priority-poll-interval",
42 "Interval in seconds to check priority of a dispatched container")
44 crunchRunCommand := flags.String(
46 "/usr/bin/crunch-run",
47 "Crunch command to run container")
49 // Parse args; omit the first arg which is the command name
50 flags.Parse(os.Args[1:])
53 arv, err = arvadosclient.MakeArvadosClient()
58 // Channel to terminate
59 doneProcessing = make(chan bool)
61 // Map of running crunch jobs
62 runningCmds = make(map[string]*exec.Cmd)
65 sigChan = make(chan os.Signal, 1)
66 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
67 go func(sig <-chan os.Signal) {
68 for sig := range sig {
69 log.Printf("Caught signal: %v", sig)
70 doneProcessing <- true
74 // Run all queued containers
75 runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
77 // Finished dispatching; interrupt any crunch jobs that are still running
78 for _, cmd := range runningCmds {
79 cmd.Process.Signal(os.Interrupt)
82 // Wait for all running crunch jobs to complete / terminate
88 // Poll for queued containers using pollInterval.
89 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
91 // Any errors encountered are logged but the program would continue to run (not exit).
92 // This is because, once one or more crunch jobs are running,
93 // we would need to wait for them complete.
94 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
95 ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
100 dispatchLocal(priorityPollInterval, crunchRunCommand)
101 case <-doneProcessing:
109 type Container struct {
110 UUID string `json:"uuid"`
111 State string `json:"state"`
112 Priority int `json:"priority"`
115 // ContainerList is a list of the containers from api
116 type ContainerList struct {
117 Items []Container `json:"items"`
120 // Get the list of queued containers from API server and invoke run for each container.
121 func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
122 params := arvadosclient.Dict{
123 "filters": [][]string{[]string{"state", "=", "Queued"}},
126 var containers ContainerList
127 err := arv.List("containers", params, &containers)
129 log.Printf("Error getting list of queued containers: %q", err)
133 for i := 0; i < len(containers.Items); i++ {
134 log.Printf("About to run queued container %v", containers.Items[i].UUID)
136 go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
140 // Run queued container:
141 // Set container state to locked (TBD)
142 // Run container using the given crunch-run command
143 // Set the container state to Running
144 // If the container priority becomes zero while crunch job is still running, terminate it.
145 func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
146 cmd := exec.Command(crunchRunCommand, uuid)
149 cmd.Stderr = os.Stderr
150 cmd.Stdout = os.Stderr
151 if err := cmd.Start(); err != nil {
152 log.Printf("Error running container for %v: %q", uuid, err)
156 // Add this crunch job to the list of runningCmds
157 runningCmdsMutex.Lock()
158 runningCmds[uuid] = cmd
159 runningCmdsMutex.Unlock()
161 log.Printf("Started container run for %v", uuid)
163 // Add this crunch job to waitGroup
165 defer waitGroup.Done()
167 // Update container status to Running
168 err := arv.Update("containers", uuid,
170 "container": arvadosclient.Dict{"state": "Running"}},
173 log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
176 // A goroutine to terminate the runner if container priority becomes zero
177 priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
179 for _ = range priorityTicker.C {
180 var container Container
181 err := arv.Get("containers", uuid, nil, &container)
183 log.Printf("Error getting container info for %v: %q", uuid, err)
185 if container.Priority == 0 {
186 priorityTicker.Stop()
187 cmd.Process.Signal(os.Interrupt)
193 // Wait for the crunch job to exit
194 if _, err := cmd.Process.Wait(); err != nil {
195 log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
198 // Remove the crunch job to runningCmds
199 runningCmdsMutex.Lock()
200 delete(runningCmds, uuid)
201 runningCmdsMutex.Unlock()
203 priorityTicker.Stop()
205 // The container state should be 'Complete'
206 var container Container
207 err = arv.Get("containers", uuid, nil, &container)
208 if container.State == "Running" {
209 log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
210 err = arv.Update("containers", uuid,
212 "container": arvadosclient.Dict{"state": "Complete"}},
215 log.Printf("Error updating container state to Complete for %v: %q", uuid, err)