6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
25 arv arvadosclient.ArvadosClient
26 runningCmds map[string]*exec.Cmd
27 runningCmdsMutex sync.Mutex
28 waitGroup sync.WaitGroup
29 doneProcessing chan bool
30 sigChan chan os.Signal
34 flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
36 pollInterval := flags.Int(
39 "Interval in seconds to poll for queued containers")
41 priorityPollInterval := flags.Int(
42 "container-priority-poll-interval",
44 "Interval in seconds to check priority of a dispatched container")
46 crunchRunCommand := flags.String(
48 "/usr/bin/crunch-run",
49 "Crunch command to run container")
51 // Parse args; omit the first arg which is the command name
52 flags.Parse(os.Args[1:])
55 arv, err = arvadosclient.MakeArvadosClient()
60 // Channel to terminate
61 doneProcessing = make(chan bool)
64 sigChan = make(chan os.Signal, 1)
65 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
66 go func(sig <-chan os.Signal) {
67 for sig := range sig {
68 log.Printf("Caught signal: %v", sig)
69 doneProcessing <- true
73 // Run all queued containers
74 runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
76 // Wait for all running crunch jobs to complete / terminate
82 // Poll for queued containers using pollInterval.
83 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
85 // Any errors encountered are logged but the program would continue to run (not exit).
86 // This is because, once one or more crunch jobs are running,
87 // we would need to wait for them complete.
88 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
89 ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
94 dispatchSlurm(priorityPollInterval, crunchRunCommand)
95 case <-doneProcessing:
103 type Container struct {
104 UUID string `json:"uuid"`
105 State string `json:"state"`
106 Priority int `json:"priority"`
109 // ContainerList is a list of the containers from api
110 type ContainerList struct {
111 Items []Container `json:"items"`
114 // Get the list of queued containers from API server and invoke run for each container.
115 func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
116 params := arvadosclient.Dict{
117 "filters": [][]string{[]string{"state", "=", "Queued"}},
120 var containers ContainerList
121 err := arv.List("containers", params, &containers)
123 log.Printf("Error getting list of queued containers: %q", err)
127 for i := 0; i < len(containers.Items); i++ {
128 log.Printf("About to submit queued container %v", containers.Items[i].UUID)
130 go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
134 // Run queued container:
135 // Set container state to locked (TBD)
136 // Run container using the given crunch-run command
137 // Set the container state to Running
138 // If the container priority becomes zero while crunch job is still running, terminate it.
139 func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
140 stdinReader, stdinWriter := io.Pipe()
142 cmd := exec.Command("sbatch", "--job-name="+uuid)
143 cmd.Stdin = stdinReader
144 cmd.Stderr = os.Stderr
145 cmd.Stdout = os.Stderr
146 if err := cmd.Start(); err != nil {
147 log.Printf("Error running container for %v: %q", uuid, err)
151 fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec %s %s\n", crunchRunCommand, uuid)
156 // Update container status to Running
157 err := arv.Update("containers", uuid,
159 "container": arvadosclient.Dict{"state": "Running"}},
162 log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
165 log.Printf("Submitted container run for %v", uuid)
167 // A goroutine to terminate the runner if container priority becomes zero
168 priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
170 for _ = range priorityTicker.C {
171 var container Container
172 err := arv.Get("containers", uuid, nil, &container)
174 log.Printf("Error getting container info for %v: %q", uuid, err)
176 if container.Priority == 0 {
177 priorityTicker.Stop()
178 cancelcmd := exec.Command("scancel", "--name="+uuid)