Merge branch '9275-cwl-runner-creates-jobs' closes #9275
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
1 package main
2
3 // Dispatcher service for Crunch that runs containers locally.
4
5 import (
6         "flag"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/dispatch"
9         "log"
10         "os"
11         "os/exec"
12         "sync"
13         "time"
14 )
15
16 func main() {
17         err := doMain()
18         if err != nil {
19                 log.Fatalf("%q", err)
20         }
21 }
22
23 var (
24         runningCmds      map[string]*exec.Cmd
25         runningCmdsMutex sync.Mutex
26         waitGroup        sync.WaitGroup
27         crunchRunCommand *string
28 )
29
30 func doMain() error {
31         flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
32
33         pollInterval := flags.Int(
34                 "poll-interval",
35                 10,
36                 "Interval in seconds to poll for queued containers")
37
38         crunchRunCommand = flags.String(
39                 "crunch-run-command",
40                 "/usr/bin/crunch-run",
41                 "Crunch command to run container")
42
43         // Parse args; omit the first arg which is the command name
44         flags.Parse(os.Args[1:])
45
46         runningCmds = make(map[string]*exec.Cmd)
47
48         arv, err := arvadosclient.MakeArvadosClient()
49         if err != nil {
50                 log.Printf("Error making Arvados client: %v", err)
51                 return err
52         }
53         arv.Retries = 25
54
55         dispatcher := dispatch.Dispatcher{
56                 Arv:            arv,
57                 RunContainer:   run,
58                 PollInterval:   time.Duration(*pollInterval) * time.Second,
59                 DoneProcessing: make(chan struct{})}
60
61         err = dispatcher.RunDispatcher()
62         if err != nil {
63                 return err
64         }
65
66         runningCmdsMutex.Lock()
67         // Finished dispatching; interrupt any crunch jobs that are still running
68         for _, cmd := range runningCmds {
69                 cmd.Process.Signal(os.Interrupt)
70         }
71         runningCmdsMutex.Unlock()
72
73         // Wait for all running crunch jobs to complete / terminate
74         waitGroup.Wait()
75
76         return nil
77 }
78
79 func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
80         return cmd.Start()
81 }
82
83 var startCmd = startFunc
84
85 // Run a container.
86 //
87 // If the container is Locked, start a new crunch-run process and wait until
88 // crunch-run completes.  If the priority is set to zero, set an interrupt
89 // signal to the crunch-run process.
90 //
91 // If the container is in any other state, or is not Complete/Cancelled after
92 // crunch-run terminates, mark the container as Cancelled.
93 func run(dispatcher *dispatch.Dispatcher,
94         container dispatch.Container,
95         status chan dispatch.Container) {
96
97         uuid := container.UUID
98
99         if container.State == dispatch.Locked {
100                 waitGroup.Add(1)
101
102                 cmd := exec.Command(*crunchRunCommand, uuid)
103                 cmd.Stdin = nil
104                 cmd.Stderr = os.Stderr
105                 cmd.Stdout = os.Stderr
106
107                 log.Printf("Starting container %v", uuid)
108
109                 // Add this crunch job to the list of runningCmds only if we
110                 // succeed in starting crunch-run.
111
112                 runningCmdsMutex.Lock()
113                 if err := startCmd(container, cmd); err != nil {
114                         runningCmdsMutex.Unlock()
115                         log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
116                         dispatcher.UpdateState(uuid, dispatch.Cancelled)
117                 } else {
118                         runningCmds[uuid] = cmd
119                         runningCmdsMutex.Unlock()
120
121                         // Need to wait for crunch-run to exit
122                         done := make(chan struct{})
123
124                         go func() {
125                                 if _, err := cmd.Process.Wait(); err != nil {
126                                         log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
127                                 }
128                                 log.Printf("sending done")
129                                 done <- struct{}{}
130                         }()
131
132                 Loop:
133                         for {
134                                 select {
135                                 case <-done:
136                                         break Loop
137                                 case c := <-status:
138                                         // Interrupt the child process if priority changes to 0
139                                         if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
140                                                 log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
141                                                 cmd.Process.Signal(os.Interrupt)
142                                         }
143                                 }
144                         }
145                         close(done)
146
147                         log.Printf("Finished container run for %v", uuid)
148
149                         // Remove the crunch job from runningCmds
150                         runningCmdsMutex.Lock()
151                         delete(runningCmds, uuid)
152                         runningCmdsMutex.Unlock()
153                 }
154                 waitGroup.Done()
155         }
156
157         // If the container is not finalized, then change it to "Cancelled".
158         err := dispatcher.Arv.Get("containers", uuid, nil, &container)
159         if err != nil {
160                 log.Printf("Error getting final container state: %v", err)
161         }
162         if container.LockedByUUID == dispatcher.Auth.UUID &&
163                 (container.State == dispatch.Locked || container.State == dispatch.Running) {
164                 log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
165                         *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
166                 dispatcher.UpdateState(uuid, dispatch.Cancelled)
167         }
168
169         // drain any subsequent status changes
170         for _ = range status {
171         }
172
173         log.Printf("Finalized container %v", uuid)
174 }