8028: After getting list of Queued containers, instead of looking for containers...
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 package main
2
3 import (
4         "flag"
5         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
6         "log"
7         "os"
8         "os/exec"
9         "time"
10 )
11
12 func main() {
13         err := doMain()
14         if err != nil {
15                 log.Fatalf("%q", err)
16         }
17 }
18
19 var arv arvadosclient.ArvadosClient
20
21 func doMain() error {
22         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
23
24         pollInterval := flags.Int(
25                 "poll-interval",
26                 10,
27                 "Interval in seconds to poll for queued containers")
28
29         priorityPollInterval := flags.Int(
30                 "container-priority-poll-interval",
31                 60,
32                 "Interval in seconds to check priority of a dispatched container")
33
34         crunchRunCommand := flags.String(
35                 "crunch-run-command",
36                 "/usr/bin/crunch-run",
37                 "Crunch command to run container")
38
39         // Parse args; omit the first arg which is the command name
40         flags.Parse(os.Args[1:])
41
42         var err error
43         arv, err = arvadosclient.MakeArvadosClient()
44         if err != nil {
45                 return err
46         }
47
48         // channel to terminate
49         doneProcessing = make(chan bool)
50
51         // run all queued containers
52         runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
53         return nil
54 }
55
56 var doneProcessing chan bool
57
58 // Poll for queued containers using pollInterval.
59 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
60 //
61 // Any errors encountered are logged but the program would continue to run (not exit).
62 // This is because, once one or more child processes are running,
63 // we would need to wait for them complete.
64 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
65         ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
66
67         for {
68                 select {
69                 case <-ticker.C:
70                         dispatchLocal(priorityPollInterval, crunchRunCommand)
71                 case <-doneProcessing:
72                         ticker.Stop()
73                         return
74                 }
75         }
76 }
77
78 // Container data
79 type Container struct {
80         UUID     string `json:"uuid"`
81         State    string `json:"state"`
82         Priority int    `json:"priority"`
83 }
84
85 // ContainerList is a list of the containers from api
86 type ContainerList struct {
87         Items []Container `json:"items"`
88 }
89
90 // Get the list of queued containers from API server and invoke run for each container.
91 func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
92         params := arvadosclient.Dict{
93                 "filters": [][]string{[]string{"state", "=", "Queued"}},
94         }
95
96         var containers ContainerList
97         err := arv.List("containers", params, &containers)
98         if err != nil {
99                 log.Printf("Error getting list of queued containers: %q", err)
100                 return
101         }
102
103         for i := 0; i < len(containers.Items); i++ {
104                 log.Printf("About to run queued container %v", containers.Items[i].UUID)
105                 go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
106         }
107 }
108
109 // Run queued container:
110 // Set container state to locked (TBD)
111 // Run container using the given crunch-run command
112 // Set the container state to Running
113 // If the container priority becomes zero while crunch job is still running, terminate it.
114 func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
115         cmd := exec.Command(crunchRunCommand, uuid)
116
117         cmd.Stdin = nil
118         cmd.Stderr = os.Stderr
119         cmd.Stdout = os.Stderr
120         if err := cmd.Start(); err != nil {
121                 log.Printf("Error running container for %v: %q", uuid, err)
122                 return
123         }
124
125         log.Printf("Started container run for %v", uuid)
126
127         err := arv.Update("containers", uuid,
128                 arvadosclient.Dict{
129                         "container": arvadosclient.Dict{"state": "Running"}},
130                 nil)
131         if err != nil {
132                 log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
133         }
134
135         // Terminate the runner if container priority becomes zero
136         priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
137         go func() {
138                 for {
139                         select {
140                         case <-priorityTicker.C:
141                                 var container Container
142                                 err := arv.Get("containers", uuid, nil, &container)
143                                 if err != nil {
144                                         log.Printf("Error getting container info for %v: %q", uuid, err)
145                                 } else {
146                                         if container.Priority == 0 {
147                                                 priorityTicker.Stop()
148                                                 cmd.Process.Signal(os.Interrupt)
149                                                 return
150                                         }
151                                 }
152                         }
153                 }
154         }()
155
156         // Wait for the process to exit
157         if _, err := cmd.Process.Wait(); err != nil {
158                 log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
159         }
160
161         priorityTicker.Stop()
162
163         var container Container
164         err = arv.Get("containers", uuid, nil, &container)
165         if container.State == "Running" {
166                 log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
167                 err = arv.Update("containers", uuid,
168                         arvadosclient.Dict{
169                                 "container": arvadosclient.Dict{"state": "Complete"}},
170                         nil)
171                 if err != nil {
172                         log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
173                 }
174         }
175 }