package main
import (
+ "bufio"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
return nil
}
+type apiClientAuthorization struct {
+ UUID string `json:"uuid"`
+ APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+ Items []apiClientAuthorization `json:"items"`
+}
+
// Poll for queued containers using pollInterval.
// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
//
// This is because, once one or more crunch jobs are running,
// we would need to wait for them complete.
func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
- ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+ var authList apiClientAuthorizationList
+ err := arv.List("api_client_authorizations", map[string]interface{}{
+ "filters": [][]interface{}{{"api_token", "=", arv.ApiToken}},
+ }, &authList)
+ if err != nil || len(authList.Items) != 1 {
+ log.Printf("Error getting my token UUID: %v (%d)", err, len(authList.Items))
+ return
+ }
+ auth := authList.Items[0]
+ ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
for {
select {
case <-ticker.C:
- dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
+ dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
case <-doneProcessing:
ticker.Stop()
return
State string `json:"state"`
Priority int `json:"priority"`
RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
+ LockedByUUID string `json:"locked_by_uuid"`
}
// ContainerList is a list of the containers from api
Items []Container `json:"items"`
}
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
+// Get the list of queued containers from API server and invoke run
+// for each container.
+func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
params := arvadosclient.Dict{
- "filters": [][]string{[]string{"state", "=", "Queued"}},
+ "filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
}
var containers ContainerList
return
}
- for i := 0; i < len(containers.Items); i++ {
- log.Printf("About to submit queued container %v", containers.Items[i].UUID)
- // Run the container
- go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
+ for _, container := range containers.Items {
+ if container.State == "Locked" {
+ if container.LockedByUUID != auth.UUID {
+ // Locked by a different dispatcher
+ continue
+ } else if checkMine(container.UUID) {
+ // I already have a goroutine running
+ // for this container: it just hasn't
+ // gotten past Locked state yet.
+ continue
+ }
+ log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
+ "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
+ container.UUID, auth.UUID)
+ setMine(container.UUID, true)
+ go func() {
+ waitContainer(container, pollInterval)
+ setMine(container.UUID, false)
+ }()
+ }
+ go run(container, crunchRunCommand, finishCommand, pollInterval)
}
}
// sbatchCmd
func sbatchFunc(container Container) *exec.Cmd {
- memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"]*1048576)))
+ memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
return exec.Command("sbatch", "--share", "--parsable",
"--job-name="+container.UUID,
"--mem-per-cpu="+strconv.Itoa(int(memPerCPU)),
func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
submitErr = nil
- // Mark record as complete if anything errors out.
defer func() {
- if submitErr != nil {
- // This really should be an "Error" state, see #8018
- updateErr := arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Complete"}},
- nil)
- if updateErr != nil {
- log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
- }
+ // If we didn't get as far as submitting a slurm job,
+ // unlock the container and return it to the queue.
+ if submitErr == nil {
+ // OK, no cleanup needed
+ return
+ }
+ err := arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Queued"}},
+ nil)
+ if err != nil {
+ log.Printf("Error unlocking container %s: %v", container.UUID, err)
}
}()
stdoutChan := make(chan []byte)
go func() {
b, _ := ioutil.ReadAll(stdoutReader)
+ stdoutReader.Close()
stdoutChan <- b
close(stdoutChan)
}()
stderrChan := make(chan []byte)
go func() {
b, _ := ioutil.ReadAll(stderrReader)
+ stderrReader.Close()
stderrChan <- b
close(stderrChan)
}()
err := cmd.Run()
if err != nil {
log.Printf("While setting up strigger: %v", err)
+ // BUG: we drop the error here and forget about it. A
+ // human has to notice the container is stuck in
+ // Running state, and fix it manually.
}
}
-// Run a queued container.
-// Set container state to locked (TBD)
-// Submit job to slurm to execute crunch-run command for the container
-// If the container priority becomes zero while crunch job is still running, cancel the job.
-func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
+// Run a queued container: [1] Set container state to locked. [2]
+// Execute crunch-run as a slurm batch job. [3] waitContainer().
+func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
+ setMine(container.UUID, true)
+ defer setMine(container.UUID, false)
+
+ // Update container status to Locked. This will fail if
+ // another dispatcher (token) has already locked it. It will
+ // succeed if *this* dispatcher has already locked it.
+ err := arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Locked"}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
+ return
+ }
+
+ log.Printf("About to submit queued container %v", container.UUID)
jobid, err := submit(container, crunchRunCommand)
if err != nil {
- log.Printf("Error queuing container run: %v", err)
+ log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
return
}
}
finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
- // Update container status to Running, this is a temporary workaround
- // to avoid resubmitting queued containers because record locking isn't
- // implemented yet.
+ // Update container status to Running. This will fail if
+ // another dispatcher (token) has already locked it. It will
+ // succeed if *this* dispatcher has already locked it.
err = arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": "Running"}},
if err != nil {
log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
}
+ log.Printf("Submitted container %v to slurm", container.UUID)
+ waitContainer(container, pollInterval)
+}
- log.Printf("Submitted container run for %v", container.UUID)
-
- containerUUID := container.UUID
+// Wait for a container to finish. Cancel the slurm job if the
+// container priority changes to zero before it ends.
+func waitContainer(container Container, pollInterval time.Duration) {
+ log.Printf("Monitoring container %v started", container.UUID)
+ defer log.Printf("Monitoring container %v finished", container.UUID)
+
+ pollTicker := time.NewTicker(pollInterval)
+ defer pollTicker.Stop()
+ for _ = range pollTicker.C {
+ var updated Container
+ err := arv.Get("containers", container.UUID, nil, &updated)
+ if err != nil {
+ log.Printf("Error getting container %s: %q", container.UUID, err)
+ continue
+ }
+ if updated.State == "Complete" || updated.State == "Cancelled" {
+ return
+ }
+ if updated.Priority != 0 {
+ continue
+ }
- // A goroutine to terminate the runner if container priority becomes zero
- priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
- go func() {
- for _ = range priorityTicker.C {
- var container Container
- err := arv.Get("containers", containerUUID, nil, &container)
- if err != nil {
- log.Printf("Error getting container info for %v: %q", container.UUID, err)
- } else {
- if container.Priority == 0 {
- log.Printf("Canceling container %v", container.UUID)
- priorityTicker.Stop()
- cancelcmd := exec.Command("scancel", "--name="+container.UUID)
- cancelcmd.Run()
- }
- if container.State == "Complete" {
- priorityTicker.Stop()
- }
+ // Priority is zero, but state is Running or Locked
+ log.Printf("Canceling container %s", container.UUID)
+
+ err = exec.Command("scancel", "--name="+container.UUID).Run()
+ if err != nil {
+ log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+ if inQ, err := checkSqueue(container.UUID); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ continue
+ } else if inQ {
+ log.Printf("Container %s is still in squeue; will retry", container.UUID)
+ continue
}
}
- }()
+ err = arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Cancelled"}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating state for container %s: %s", container.UUID, err)
+ continue
+ }
+
+ return
+ }
+}
+
+func checkSqueue(uuid string) (bool, error) {
+ cmd := exec.Command("squeue", "--format=%j")
+ sq, err := cmd.StdoutPipe()
+ if err != nil {
+ return false, err
+ }
+ cmd.Start()
+ defer cmd.Wait()
+ scanner := bufio.NewScanner(sq)
+ found := false
+ for scanner.Scan() {
+ if scanner.Text() == uuid {
+ found = true
+ }
+ }
+ if err := scanner.Err(); err != nil {
+ return false, err
+ }
+ return found, nil
+}
+
+var mineMutex sync.RWMutex
+var mineMap = make(map[string]bool)
+
+// Goroutine-safely add/remove uuid to the set of "my" containers,
+// i.e., ones for which this process has a goroutine running.
+func setMine(uuid string, t bool) {
+ mineMutex.Lock()
+ if t {
+ mineMap[uuid] = true
+ } else {
+ delete(mineMap, uuid)
+ }
+ mineMutex.Unlock()
+}
+
+// Check whether there is already a goroutine running for this
+// container.
+func checkMine(uuid string) bool {
+ mineMutex.RLocker().Lock()
+ defer mineMutex.RLocker().Unlock()
+ return mineMap[uuid]
}
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "bytes"
"fmt"
- "io/ioutil"
"log"
"math"
"net/http"
if err != nil {
c.Fatalf("Error making arvados client: %s", err)
}
+ os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
}
func (s *TestSuite) TearDownTest(c *C) {
apiHost, apiToken, apiInsecure).Args
go func() {
time.Sleep(5 * time.Second)
- arv.Update("containers", containerUUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Complete"}},
- nil)
+ for _, state := range []string{"Running", "Complete"} {
+ arv.Update("containers", containerUUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": state}},
+ nil)
+ }
}()
return exec.Command("echo", "strigger")
}
c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
- "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h 1 zzzzz-dz642-queuedcontainer"})
+ "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
// There should be no queued containers now
err = arv.List("containers", params, &containers)
func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
+ apiStubResponses["/arvados/v1/api_client_authorizations"] = arvadostest.StubResponse{200, string(`{"items":[{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}]}`)}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
Retries: 0,
}
- tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
- c.Check(err, IsNil)
- defer os.Remove(tempfile.Name())
- log.SetOutput(tempfile)
+ buf := bytes.NewBuffer(nil)
+ log.SetOutput(buf)
+ defer log.SetOutput(os.Stderr)
go func() {
- time.Sleep(2 * time.Second)
+ for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+ time.Sleep(100 * time.Millisecond)
+ }
sigChan <- syscall.SIGTERM
}()
runQueuedContainers(2, 1, crunchCmd, crunchCmd)
- buf, _ := ioutil.ReadFile(tempfile.Name())
- c.Check(strings.Contains(string(buf), expected), Equals, true)
+ c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}