5 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
22 var arv arvadosclient.ArvadosClient
23 var runningCmds map[string]*exec.Cmd
26 flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
28 pollInterval := flags.Int(
31 "Interval in seconds to poll for queued containers")
33 priorityPollInterval := flags.Int(
34 "container-priority-poll-interval",
36 "Interval in seconds to check priority of a dispatched container")
38 crunchRunCommand := flags.String(
40 "/usr/bin/crunch-run",
41 "Crunch command to run container")
43 // Parse args; omit the first arg which is the command name
44 flags.Parse(os.Args[1:])
47 arv, err = arvadosclient.MakeArvadosClient()
52 runningCmds = make(map[string]*exec.Cmd)
53 sigChan = make(chan os.Signal, 1)
54 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
55 go func(sig <-chan os.Signal) {
57 for sig := range sig {
58 doneProcessing <- true
60 for uuid, cmd := range runningCmds {
61 go func(uuid string) {
64 cmd.Process.Signal(caught)
65 if _, err := cmd.Process.Wait(); err != nil {
66 log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
74 // channel to terminate
75 doneProcessing = make(chan bool)
77 // run all queued containers
78 runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
82 var doneProcessing chan bool
83 var sigChan chan os.Signal
85 // Poll for queued containers using pollInterval.
86 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
88 // Any errors encountered are logged but the program would continue to run (not exit).
89 // This is because, once one or more child processes are running,
90 // we would need to wait for them complete.
91 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
92 ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
97 dispatchLocal(priorityPollInterval, crunchRunCommand)
98 case <-doneProcessing:
106 type Container struct {
107 UUID string `json:"uuid"`
108 State string `json:"state"`
109 Priority int `json:"priority"`
112 // ContainerList is a list of the containers from api
113 type ContainerList struct {
114 Items []Container `json:"items"`
117 // Get the list of queued containers from API server and invoke run for each container.
118 func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
119 params := arvadosclient.Dict{
120 "filters": [][]string{[]string{"state", "=", "Queued"}},
123 var containers ContainerList
124 err := arv.List("containers", params, &containers)
126 log.Printf("Error getting list of queued containers: %q", err)
130 for i := 0; i < len(containers.Items); i++ {
131 log.Printf("About to run queued container %v", containers.Items[i].UUID)
132 go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
136 // Run queued container:
137 // Set container state to locked (TBD)
138 // Run container using the given crunch-run command
139 // Set the container state to Running
140 // If the container priority becomes zero while crunch job is still running, terminate it.
141 func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
142 cmd := exec.Command(crunchRunCommand, uuid)
145 cmd.Stderr = os.Stderr
146 cmd.Stdout = os.Stderr
147 if err := cmd.Start(); err != nil {
148 log.Printf("Error running container for %v: %q", uuid, err)
152 runningCmds[uuid] = cmd
154 log.Printf("Started container run for %v", uuid)
156 err := arv.Update("containers", uuid,
158 "container": arvadosclient.Dict{"state": "Running"}},
161 log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
164 // Terminate the runner if container priority becomes zero
165 priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
169 case <-priorityTicker.C:
170 var container Container
171 err := arv.Get("containers", uuid, nil, &container)
173 log.Printf("Error getting container info for %v: %q", uuid, err)
175 if container.Priority == 0 {
176 priorityTicker.Stop()
177 cmd.Process.Signal(os.Interrupt)
178 delete(runningCmds, uuid)
186 // Wait for the process to exit
187 if _, err := cmd.Process.Wait(); err != nil {
188 log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
190 delete(runningCmds, uuid)
192 priorityTicker.Stop()
194 var container Container
195 err = arv.Get("containers", uuid, nil, &container)
196 if container.State == "Running" {
197 log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
198 err = arv.Update("containers", uuid,
200 "container": arvadosclient.Dict{"state": "Complete"}},
203 log.Printf("Error updating container state to Complete for %v: %q", uuid, err)