3 // Dispatcher service for Crunch that runs containers locally.
7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/dispatch"
24 runningCmds map[string]*exec.Cmd
25 runningCmdsMutex sync.Mutex
26 waitGroup sync.WaitGroup
27 crunchRunCommand *string
31 flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
33 pollInterval := flags.Int(
36 "Interval in seconds to poll for queued containers")
38 crunchRunCommand = flags.String(
40 "/usr/bin/crunch-run",
41 "Crunch command to run container")
43 // Parse args; omit the first arg which is the command name
44 flags.Parse(os.Args[1:])
46 runningCmds = make(map[string]*exec.Cmd)
48 arv, err := arvadosclient.MakeArvadosClient()
50 log.Printf("Error making Arvados client: %v", err)
55 dispatcher := dispatch.Dispatcher{
58 PollInterval: time.Duration(*pollInterval) * time.Second,
59 DoneProcessing: make(chan struct{})}
61 err = dispatcher.RunDispatcher()
66 runningCmdsMutex.Lock()
67 // Finished dispatching; interrupt any crunch jobs that are still running
68 for _, cmd := range runningCmds {
69 cmd.Process.Signal(os.Interrupt)
71 runningCmdsMutex.Unlock()
73 // Wait for all running crunch jobs to complete / terminate
79 func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
83 var startCmd = startFunc
87 // If the container is Locked, start a new crunch-run process and wait until
88 // crunch-run completes. If the priority is set to zero, set an interrupt
89 // signal to the crunch-run process.
91 // If the container is in any other state, or is not Complete/Cancelled after
92 // crunch-run terminates, mark the container as Cancelled.
93 func run(dispatcher *dispatch.Dispatcher,
94 container dispatch.Container,
95 status chan dispatch.Container) {
97 uuid := container.UUID
99 if container.State == dispatch.Locked {
102 cmd := exec.Command(*crunchRunCommand, uuid)
104 cmd.Stderr = os.Stderr
105 cmd.Stdout = os.Stderr
107 log.Printf("Starting container %v", uuid)
109 // Add this crunch job to the list of runningCmds only if we
110 // succeed in starting crunch-run.
112 runningCmdsMutex.Lock()
113 if err := startCmd(container, cmd); err != nil {
114 runningCmdsMutex.Unlock()
115 log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
116 dispatcher.UpdateState(uuid, dispatch.Cancelled)
118 runningCmds[uuid] = cmd
119 runningCmdsMutex.Unlock()
121 // Need to wait for crunch-run to exit
122 done := make(chan struct{})
125 if _, err := cmd.Process.Wait(); err != nil {
126 log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
128 log.Printf("sending done")
138 // Interrupt the child process if priority changes to 0
139 if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
140 log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
141 cmd.Process.Signal(os.Interrupt)
147 log.Printf("Finished container run for %v", uuid)
149 // Remove the crunch job from runningCmds
150 runningCmdsMutex.Lock()
151 delete(runningCmds, uuid)
152 runningCmdsMutex.Unlock()
157 // If the container is not finalized, then change it to "Cancelled".
158 err := dispatcher.Arv.Get("containers", uuid, nil, &container)
160 log.Printf("Error getting final container state: %v", err)
162 if container.LockedByUUID == dispatcher.Auth.UUID &&
163 (container.State == dispatch.Locked || container.State == dispatch.Running) {
164 log.Printf("After %s process termination, container state for %v is %q. Updating it to %q",
165 *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
166 dispatcher.UpdateState(uuid, dispatch.Cancelled)
169 // drain any subsequent status changes
170 for _ = range status {
173 log.Printf("Finalized container %v", uuid)