6518: Dispatch to slurm using sbatch
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
1 package main
2
3 import (
4         "flag"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "io"
8         "log"
9         "os"
10         "os/exec"
11         "os/signal"
12         "sync"
13         "syscall"
14         "time"
15 )
16
17 func main() {
18         err := doMain()
19         if err != nil {
20                 log.Fatalf("%q", err)
21         }
22 }
23
24 var (
25         arv              arvadosclient.ArvadosClient
26         runningCmds      map[string]*exec.Cmd
27         runningCmdsMutex sync.Mutex
28         waitGroup        sync.WaitGroup
29         doneProcessing   chan bool
30         sigChan          chan os.Signal
31 )
32
33 func doMain() error {
34         flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
35
36         pollInterval := flags.Int(
37                 "poll-interval",
38                 10,
39                 "Interval in seconds to poll for queued containers")
40
41         priorityPollInterval := flags.Int(
42                 "container-priority-poll-interval",
43                 60,
44                 "Interval in seconds to check priority of a dispatched container")
45
46         crunchRunCommand := flags.String(
47                 "crunch-run-command",
48                 "/usr/bin/crunch-run",
49                 "Crunch command to run container")
50
51         // Parse args; omit the first arg which is the command name
52         flags.Parse(os.Args[1:])
53
54         var err error
55         arv, err = arvadosclient.MakeArvadosClient()
56         if err != nil {
57                 return err
58         }
59
60         // Channel to terminate
61         doneProcessing = make(chan bool)
62
63         // Graceful shutdown
64         sigChan = make(chan os.Signal, 1)
65         signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
66         go func(sig <-chan os.Signal) {
67                 for sig := range sig {
68                         log.Printf("Caught signal: %v", sig)
69                         doneProcessing <- true
70                 }
71         }(sigChan)
72
73         // Run all queued containers
74         runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
75
76         // Wait for all running crunch jobs to complete / terminate
77         waitGroup.Wait()
78
79         return nil
80 }
81
82 // Poll for queued containers using pollInterval.
83 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
84 //
85 // Any errors encountered are logged but the program would continue to run (not exit).
86 // This is because, once one or more crunch jobs are running,
87 // we would need to wait for them complete.
88 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
89         ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
90
91         for {
92                 select {
93                 case <-ticker.C:
94                         dispatchSlurm(priorityPollInterval, crunchRunCommand)
95                 case <-doneProcessing:
96                         ticker.Stop()
97                         return
98                 }
99         }
100 }
101
102 // Container data
103 type Container struct {
104         UUID     string `json:"uuid"`
105         State    string `json:"state"`
106         Priority int    `json:"priority"`
107 }
108
109 // ContainerList is a list of the containers from api
110 type ContainerList struct {
111         Items []Container `json:"items"`
112 }
113
114 // Get the list of queued containers from API server and invoke run for each container.
115 func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
116         params := arvadosclient.Dict{
117                 "filters": [][]string{[]string{"state", "=", "Queued"}},
118         }
119
120         var containers ContainerList
121         err := arv.List("containers", params, &containers)
122         if err != nil {
123                 log.Printf("Error getting list of queued containers: %q", err)
124                 return
125         }
126
127         for i := 0; i < len(containers.Items); i++ {
128                 log.Printf("About to submit queued container %v", containers.Items[i].UUID)
129                 // Run the container
130                 go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
131         }
132 }
133
134 // Run queued container:
135 // Set container state to locked (TBD)
136 // Run container using the given crunch-run command
137 // Set the container state to Running
138 // If the container priority becomes zero while crunch job is still running, terminate it.
139 func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
140         stdinReader, stdinWriter := io.Pipe()
141
142         cmd := exec.Command("sbatch", "--job-name="+uuid)
143         cmd.Stdin = stdinReader
144         cmd.Stderr = os.Stderr
145         cmd.Stdout = os.Stderr
146         if err := cmd.Start(); err != nil {
147                 log.Printf("Error running container for %v: %q", uuid, err)
148                 return
149         }
150
151         fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec %s %s\n", crunchRunCommand, uuid)
152
153         stdinWriter.Close()
154         cmd.Wait()
155
156         // Update container status to Running
157         err := arv.Update("containers", uuid,
158                 arvadosclient.Dict{
159                         "container": arvadosclient.Dict{"state": "Running"}},
160                 nil)
161         if err != nil {
162                 log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
163         }
164
165         log.Printf("Submitted container run for %v", uuid)
166
167         // A goroutine to terminate the runner if container priority becomes zero
168         priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
169         go func() {
170                 for _ = range priorityTicker.C {
171                         var container Container
172                         err := arv.Get("containers", uuid, nil, &container)
173                         if err != nil {
174                                 log.Printf("Error getting container info for %v: %q", uuid, err)
175                         } else {
176                                 if container.Priority == 0 {
177                                         priorityTicker.Stop()
178                                         cancelcmd := exec.Command("scancel", "--name="+uuid)
179                                         cancelcmd.Run()
180                                 }
181                         }
182                 }
183         }()
184
185 }