Merge branch '8018-container-retry' closes #8018
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 package main
2
3 // Dispatcher service for Crunch that runs containers locally.
4
5 import (
6         "flag"
7         "git.curoverse.com/arvados.git/sdk/go/arvados"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/dispatch"
10         "log"
11         "os"
12         "os/exec"
13         "sync"
14         "time"
15 )
16
17 func main() {
18         err := doMain()
19         if err != nil {
20                 log.Fatalf("%q", err)
21         }
22 }
23
24 var (
25         runningCmds      map[string]*exec.Cmd
26         runningCmdsMutex sync.Mutex
27         waitGroup        sync.WaitGroup
28         crunchRunCommand *string
29 )
30
31 func doMain() error {
32         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
33
34         pollInterval := flags.Int(
35                 "poll-interval",
36                 10,
37                 "Interval in seconds to poll for queued containers")
38
39         crunchRunCommand = flags.String(
40                 "crunch-run-command",
41                 "/usr/bin/crunch-run",
42                 "Crunch command to run container")
43
44         // Parse args; omit the first arg which is the command name
45         flags.Parse(os.Args[1:])
46
47         runningCmds = make(map[string]*exec.Cmd)
48
49         arv, err := arvadosclient.MakeArvadosClient()
50         if err != nil {
51                 log.Printf("Error making Arvados client: %v", err)
52                 return err
53         }
54         arv.Retries = 25
55
56         dispatcher := dispatch.Dispatcher{
57                 Arv:            arv,
58                 RunContainer:   run,
59                 PollInterval:   time.Duration(*pollInterval) * time.Second,
60                 DoneProcessing: make(chan struct{})}
61
62         err = dispatcher.RunDispatcher()
63         if err != nil {
64                 return err
65         }
66
67         runningCmdsMutex.Lock()
68         // Finished dispatching; interrupt any crunch jobs that are still running
69         for _, cmd := range runningCmds {
70                 cmd.Process.Signal(os.Interrupt)
71         }
72         runningCmdsMutex.Unlock()
73
74         // Wait for all running crunch jobs to complete / terminate
75         waitGroup.Wait()
76
77         return nil
78 }
79
80 func startFunc(container arvados.Container, cmd *exec.Cmd) error {
81         return cmd.Start()
82 }
83
84 var startCmd = startFunc
85
86 // Run a container.
87 //
88 // If the container is Locked, start a new crunch-run process and wait until
89 // crunch-run completes.  If the priority is set to zero, set an interrupt
90 // signal to the crunch-run process.
91 //
92 // If the container is in any other state, or is not Complete/Cancelled after
93 // crunch-run terminates, mark the container as Cancelled.
94 func run(dispatcher *dispatch.Dispatcher,
95         container arvados.Container,
96         status chan arvados.Container) {
97
98         uuid := container.UUID
99
100         if container.State == dispatch.Locked {
101                 waitGroup.Add(1)
102
103                 cmd := exec.Command(*crunchRunCommand, uuid)
104                 cmd.Stdin = nil
105                 cmd.Stderr = os.Stderr
106                 cmd.Stdout = os.Stderr
107
108                 log.Printf("Starting container %v", uuid)
109
110                 // Add this crunch job to the list of runningCmds only if we
111                 // succeed in starting crunch-run.
112
113                 runningCmdsMutex.Lock()
114                 if err := startCmd(container, cmd); err != nil {
115                         runningCmdsMutex.Unlock()
116                         log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
117                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
118                 } else {
119                         runningCmds[uuid] = cmd
120                         runningCmdsMutex.Unlock()
121
122                         // Need to wait for crunch-run to exit
123                         done := make(chan struct{})
124
125                         go func() {
126                                 if _, err := cmd.Process.Wait(); err != nil {
127                                         log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
128                                 }
129                                 log.Printf("sending done")
130                                 done <- struct{}{}
131                         }()
132
133                 Loop:
134                         for {
135                                 select {
136                                 case <-done:
137                                         break Loop
138                                 case c := <-status:
139                                         // Interrupt the child process if priority changes to 0
140                                         if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
141                                                 log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
142                                                 cmd.Process.Signal(os.Interrupt)
143                                         }
144                                 }
145                         }
146                         close(done)
147
148                         log.Printf("Finished container run for %v", uuid)
149
150                         // Remove the crunch job from runningCmds
151                         runningCmdsMutex.Lock()
152                         delete(runningCmds, uuid)
153                         runningCmdsMutex.Unlock()
154                 }
155                 waitGroup.Done()
156         }
157
158         // If the container is not finalized, then change it to "Cancelled".
159         err := dispatcher.Arv.Get("containers", uuid, nil, &container)
160         if err != nil {
161                 log.Printf("Error getting final container state: %v", err)
162         }
163         if container.LockedByUUID == dispatcher.Auth.UUID &&
164                 (container.State == dispatch.Locked || container.State == dispatch.Running) {
165                 log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
166                         *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
167                 dispatcher.UpdateState(uuid, dispatch.Cancelled)
168         }
169
170         // drain any subsequent status changes
171         for range status {
172         }
173
174         log.Printf("Finalized container %v", uuid)
175 }