}(sigChan)
// Run all queued containers
- runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+ runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
// Finished dispatching; interrupt any crunch jobs that are still running
for _, cmd := range runningCmds {
// Any errors encountered are logged but the program would continue to run (not exit).
// This is because, once one or more crunch jobs are running,
// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
- ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
+ ticker := time.NewTicker(pollInterval)
for {
select {
// Container data
type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
+ UUID string `json:"uuid"`
+ State string `json:"state"`
+ Priority int `json:"priority"`
+ LockedByUUID string `json:"locked_by_uuid"`
}
// ContainerList is a list of the containers from api
}
// Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
for i := 0; i < len(containers.Items); i++ {
log.Printf("About to run queued container %v", containers.Items[i].UUID)
// Run the container
- go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+ go run(containers.Items[i].UUID, crunchRunCommand, pollInterval)
}
}
+func updateState(uuid, newState string) error {
+ err := arv.Update("containers", uuid,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": newState}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+ }
+ return err
+}
+
// Run queued container:
-// Set container state to locked (TBD)
+// Set container state to Locked
// Run container using the given crunch-run command
// Set the container state to Running
// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
- cmd := exec.Command(crunchRunCommand, uuid)
+func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
+ if err := updateState(uuid, "Locked"); err != nil {
+ return
+ }
+ cmd := exec.Command(crunchRunCommand, uuid)
cmd.Stdin = nil
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stderr
+
+ // Add this crunch job to the list of runningCmds only if we
+ // succeed in starting crunch-run.
+ runningCmdsMutex.Lock()
if err := cmd.Start(); err != nil {
- log.Printf("Error running container for %v: %q", uuid, err)
+ log.Printf("Error starting crunch-run for %v: %q", uuid, err)
+ runningCmdsMutex.Unlock()
+ updateState(uuid, "Queued")
return
}
-
- // Add this crunch job to the list of runningCmds
- runningCmdsMutex.Lock()
runningCmds[uuid] = cmd
runningCmdsMutex.Unlock()
- log.Printf("Started container run for %v", uuid)
+ defer func() {
+ setFinalState(uuid)
+
+ // Remove the crunch job from runningCmds
+ runningCmdsMutex.Lock()
+ delete(runningCmds, uuid)
+ runningCmdsMutex.Unlock()
+ }()
+
+ log.Printf("Starting container %v", uuid)
// Add this crunch job to waitGroup
waitGroup.Add(1)
defer waitGroup.Done()
- // Update container status to Running
- err := arv.Update("containers", uuid,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Running"}},
- nil)
- if err != nil {
- log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
- }
+ updateState(uuid, "Running")
+
+ cmdExited := make(chan struct{})
- // A goroutine to terminate the runner if container priority becomes zero
- priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+ // Kill the child process if container priority changes to zero
go func() {
- for _ = range priorityTicker.C {
+ ticker := time.NewTicker(pollInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-cmdExited:
+ return
+ case <-ticker.C:
+ }
var container Container
err := arv.Get("containers", uuid, nil, &container)
if err != nil {
- log.Printf("Error getting container info for %v: %q", uuid, err)
- } else {
- if container.Priority == 0 {
- priorityTicker.Stop()
- cmd.Process.Signal(os.Interrupt)
- }
+ log.Printf("Error getting container %v: %q", uuid, err)
+ continue
+ }
+ if container.Priority == 0 {
+ log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+ cmd.Process.Signal(os.Interrupt)
}
}
}()
- // Wait for the crunch job to exit
+ // Wait for crunch-run to exit
if _, err := cmd.Process.Wait(); err != nil {
log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
}
+ close(cmdExited)
- // Remove the crunch job to runningCmds
- runningCmdsMutex.Lock()
- delete(runningCmds, uuid)
- runningCmdsMutex.Unlock()
-
- priorityTicker.Stop()
+ log.Printf("Finished container run for %v", uuid)
+}
- // The container state should be 'Complete'
+func setFinalState(uuid string) {
+ // The container state should now be 'Complete' if everything
+ // went well. If it started but crunch-run didn't change its
+ // final state to 'Running', fix that now. If it never even
+ // started, cancel it as unrunnable. (TODO: Requeue instead,
+ // and fix tests so they can tell something happened even if
+ // the final state is Queued.)
var container Container
- err = arv.Get("containers", uuid, nil, &container)
- if container.State == "Running" {
- log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
- err = arv.Update("containers", uuid,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Complete"}},
- nil)
- if err != nil {
- log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
- }
+ err := arv.Get("containers", uuid, nil, &container)
+ if err != nil {
+ log.Printf("Error getting final container state: %v", err)
+ }
+ fixState := map[string]string{
+ "Running": "Complete",
+ "Locked": "Cancelled",
+ }
+ if newState, ok := fixState[container.State]; ok {
+ log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
+ updateState(uuid, newState)
}
-
- log.Printf("Finished container run for %v", uuid)
}
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "io/ioutil"
+ "bytes"
"log"
"net/http"
"net/http/httptest"
"os"
- "strings"
"syscall"
"testing"
"time"
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error updating container state")
+ testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state")
}
func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
- testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error running container for zzzzz-dz642-xxxxxxxxxxxxxx3")
+ testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting crunch-run for zzzzz-dz642-xxxxxxxxxxxxxx3")
}
func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
Retries: 0,
}
- tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
- c.Check(err, IsNil)
- defer os.Remove(tempfile.Name())
- log.SetOutput(tempfile)
+ buf := bytes.NewBuffer(nil)
+ log.SetOutput(buf)
+ defer log.SetOutput(os.Stderr)
go func() {
time.Sleep(2 * time.Second)
sigChan <- syscall.SIGTERM
}()
- runQueuedContainers(1, 1, crunchCmd)
+ runQueuedContainers(time.Second, time.Second, crunchCmd)
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
- buf, _ := ioutil.ReadFile(tempfile.Name())
- c.Check(strings.Contains(string(buf), expected), Equals, true)
+ c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}