5 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
19 var arv arvadosclient.ArvadosClient
22 flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
24 pollInterval := flags.Int(
27 "Interval in seconds to poll for queued containers")
29 priorityPollInterval := flags.Int(
30 "container-priority-poll-interval",
32 "Interval in seconds to check priority of a dispatched container")
34 crunchRunCommand := flags.String(
36 "/usr/bin/crunch-run",
37 "Crunch command to run container")
39 // Parse args; omit the first arg which is the command name
40 flags.Parse(os.Args[1:])
43 arv, err = arvadosclient.MakeArvadosClient()
48 // channel to terminate
49 doneProcessing = make(chan bool)
51 // run all queued containers
52 runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
56 var doneProcessing chan bool
58 // Poll for queued containers using pollInterval.
59 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
61 // Any errors encountered are logged but the program would continue to run (not exit).
62 // This is because, once one or more child processes are running,
63 // we would need to wait for them complete.
64 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
65 ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
70 dispatchLocal(priorityPollInterval, crunchRunCommand)
71 case <-doneProcessing:
79 type Container struct {
80 UUID string `json:"uuid"`
81 State string `json:"state"`
82 Priority int `json:"priority"`
85 // ContainerList is a list of the containers from api
86 type ContainerList struct {
87 Items []Container `json:"items"`
90 // Get the list of queued containers from API server and invoke run for each container.
91 func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
92 params := arvadosclient.Dict{
93 "filters": [][]string{[]string{"state", "=", "Queued"}},
96 var containers ContainerList
97 err := arv.List("containers", params, &containers)
99 log.Printf("Error getting list of queued containers: %q", err)
103 for i := 0; i < len(containers.Items); i++ {
104 log.Printf("About to run queued container %v", containers.Items[i].UUID)
105 go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
109 // Run queued container:
110 // Set container state to locked (TBD)
111 // Run container using the given crunch-run command
112 // Set the container state to Running
113 // If the container priority becomes zero while crunch job is still running, terminate it.
114 func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
115 cmd := exec.Command(crunchRunCommand, uuid)
118 cmd.Stderr = os.Stderr
119 cmd.Stdout = os.Stderr
120 if err := cmd.Start(); err != nil {
121 log.Printf("Error running container for %v: %q", uuid, err)
125 log.Printf("Started container run for %v", uuid)
127 err := arv.Update("containers", uuid,
129 "container": arvadosclient.Dict{"state": "Running"}},
132 log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
135 // Terminate the runner if container priority becomes zero
136 priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
140 case <-priorityTicker.C:
141 var container Container
142 err := arv.Get("containers", uuid, nil, &container)
144 log.Printf("Error getting container info for %v: %q", uuid, err)
146 if container.Priority == 0 {
147 priorityTicker.Stop()
148 cmd.Process.Signal(os.Interrupt)
156 // Wait for the process to exit
157 if _, err := cmd.Process.Wait(); err != nil {
158 log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
161 priorityTicker.Stop()
163 var container Container
164 err = arv.Get("containers", uuid, nil, &container)
165 if container.State == "Running" {
166 log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
167 err = arv.Update("containers", uuid,
169 "container": arvadosclient.Dict{"state": "Complete"}},
172 log.Printf("Error updating container state to Complete for %v: %q", uuid, err)