9187: Slurm dispatcher improvements around squeue
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 1 Jun 2016 20:06:26 +0000 (16:06 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 1 Jun 2016 20:06:26 +0000 (16:06 -0400)
* Clarify that status updates are not guaranteed to be delivered on a
heartbeat.
* Refactor slurm dispatcher to monitor the container in squeue in a separate
goroutine.
* Refactor polling squeue to a single goroutine and cache the results so that
monitoring 100 containers doesn't result in 100 calls to squeue.
* No longer set up strigger to cancel job on finish, instead cancel running
jobs not in squeue.
* Test both cases where a job is/is not in squeue.

sdk/go/dispatch/dispatch.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/crunch-finish-slurm.sh [deleted file]

index a27971f90655ca8e49046688119092c837b77938..785b6ec6650c22825f09d5085cfe77497d73afc3 100644 (file)
@@ -1,3 +1,6 @@
+// Framework for monitoring the Arvados container Queue, Locks container
+// records, and runs goroutine callbacks which implement execution and
+// monitoring of the containers.
 package dispatch
 
 import (
@@ -28,7 +31,7 @@ type apiClientAuthorizationList struct {
        Items []apiClientAuthorization `json:"items"`
 }
 
-// Container data
+// Represents an Arvados container record
 type Container struct {
        UUID               string           `json:"uuid"`
        State              string           `json:"state"`
@@ -45,9 +48,27 @@ type ContainerList struct {
 
 // Dispatcher holds the state of the dispatcher
 type Dispatcher struct {
-       Arv            arvadosclient.ArvadosClient
-       RunContainer   func(*Dispatcher, Container, chan Container)
-       PollInterval   time.Duration
+       // The Arvados client
+       Arv arvadosclient.ArvadosClient
+
+       // When a new queued container appears and is either already owned by
+       // this dispatcher or is successfully locked, the dispatcher will call
+       // go RunContainer().  The RunContainer() goroutine gets a channel over
+       // which it will receive updates to the container state.  The
+       // RunContainer() goroutine should only assume status updates come when
+       // the container record changes on the API server; if it needs to
+       // monitor the job submission to the underlying slurm/grid engine/etc
+       // queue it should spin up its own polling goroutines.  When the
+       // channel is closed, that means the container is no longer being
+       // handled by this dispatcher and the goroutine should terminate.  The
+       // goroutine is responsible for draining the 'status' channel, failure
+       // to do so may deadlock the dispatcher.
+       RunContainer func(*Dispatcher, Container, chan Container)
+
+       // Amount of time to wait between polling for updates.
+       PollInterval time.Duration
+
+       // Channel used to signal that RunDispatcher loop should exit.
        DoneProcessing chan struct{}
 
        mineMutex  sync.Mutex
@@ -159,7 +180,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
                // back to Queued and then locked by another dispatcher,
                // LockedByUUID will be different.  In either case, we want
                // to stop monitoring it.
-               log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+               log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
                dispatcher.notMine(container.UUID)
                return
        }
@@ -191,7 +212,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
                        "container": arvadosclient.Dict{"state": newState}},
                nil)
        if err != nil {
-               log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+               log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
        }
        return err
 }
@@ -199,14 +220,6 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
 // RunDispatcher runs the main loop of the dispatcher until receiving a message
 // on the dispatcher.DoneProcessing channel.  It also installs a signal handler
 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-//
-// When a new queued container appears and is successfully locked, the
-// dispatcher will call RunContainer() followed by MonitorContainer().  If a
-// container appears that is Locked or Running but not known to the dispatcher,
-// it will only call monitorContainer().  The monitorContainer() callback is
-// passed a channel over which it will receive updates to the container state.
-// The callback is responsible for draining the channel, if it fails to do so
-// it will deadlock the dispatcher.
 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
        err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
        if err != nil {
index aca60e9b7b5202702721ef1d172522cc67cec16d..0248f18433df7aa3e591ebb78e538b28931d622b 100644 (file)
@@ -115,7 +115,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
        apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
                arvadostest.StubResponse{500, string(`{}`)}
 
-       testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state")
+       testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to state \"Locked\"")
 }
 
 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
@@ -123,7 +123,7 @@ func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
        apiStubResponses["/arvados/v1/containers"] =
                arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
        apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
-               arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
+               arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1, "locked_by_uuid": "` + arvadostest.Dispatch1AuthUUID + `"}`)}
 
        testWithServerStub(c, apiStubResponses, "echo",
                `After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2".  Updating it to "Cancelled"`)
@@ -142,7 +142,7 @@ func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
        apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
-               arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+               arvadostest.StubResponse{200, string(`{"uuid": "` + arvadostest.Dispatch1AuthUUID + `", "api_token": "xyz"}`)}
 
        apiStub := arvadostest.ServerStub{apiStubResponses}
 
index 641b4bcd775a3b58573ae529402b7c837d2a2807..3e148203f134bb0155c1a56d90bb547cf2f35634 100644 (file)
@@ -14,9 +14,16 @@ import (
        "os"
        "os/exec"
        "strings"
+       "sync"
        "time"
 )
 
+type Squeue struct {
+       sync.Mutex
+       squeueContents []string
+       SqueueDone     chan struct{}
+}
+
 func main() {
        err := doMain()
        if err != nil {
@@ -26,7 +33,7 @@ func main() {
 
 var (
        crunchRunCommand *string
-       finishCommand    *string
+       squeueUpdater    Squeue
 )
 
 func doMain() error {
@@ -42,11 +49,6 @@ func doMain() error {
                "/usr/bin/crunch-run",
                "Crunch command to run container")
 
-       finishCommand = flags.String(
-               "finish-command",
-               "/usr/bin/crunch-finish-slurm.sh",
-               "Command to run from strigger when job is finished")
-
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
 
@@ -63,11 +65,17 @@ func doMain() error {
                PollInterval:   time.Duration(*pollInterval) * time.Second,
                DoneProcessing: make(chan struct{})}
 
+       squeueUpdater.SqueueDone = make(chan struct{})
+       go squeueUpdater.SyncSqueue(time.Duration(*pollInterval) * time.Second)
+
        err = dispatcher.RunDispatcher()
        if err != nil {
                return err
        }
 
+       squeueUpdater.SqueueDone <- struct{}{}
+       close(squeueUpdater.SqueueDone)
+
        return nil
 }
 
@@ -81,19 +89,12 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
                fmt.Sprintf("--priority=%d", container.Priority))
 }
 
-// striggerCmd
-func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
-       return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
-               fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
-}
-
 // squeueFunc
 func squeueFunc() *exec.Cmd {
        return exec.Command("squeue", "--format=%j")
 }
 
 // Wrap these so that they can be overridden by tests
-var striggerCmd = striggerFunc
 var sbatchCmd = sbatchFunc
 var squeueCmd = squeueFunc
 
@@ -182,44 +183,66 @@ func submit(dispatcher *dispatch.Dispatcher,
        return
 }
 
-// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
-// the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
-       insecure := "0"
-       if arv.ApiInsecure {
-               insecure = "1"
-       }
-       cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
-       cmd.Stdout = os.Stdout
-       cmd.Stderr = os.Stderr
-       err := cmd.Run()
-       if err != nil {
-               log.Printf("While setting up strigger: %v", err)
-               // BUG: we drop the error here and forget about it. A
-               // human has to notice the container is stuck in
-               // Running state, and fix it manually.
-       }
-}
+func (squeue *Squeue) runSqueue() ([]string, error) {
+       var newSqueueContents []string
 
-func checkSqueue(uuid string) (bool, error) {
        cmd := squeueCmd()
        sq, err := cmd.StdoutPipe()
        if err != nil {
-               return false, err
+               return nil, err
        }
        cmd.Start()
-       defer cmd.Wait()
        scanner := bufio.NewScanner(sq)
-       found := false
        for scanner.Scan() {
-               if scanner.Text() == uuid {
-                       found = true
-               }
+               newSqueueContents = append(newSqueueContents, scanner.Text())
        }
        if err := scanner.Err(); err != nil {
-               return false, err
+               cmd.Wait()
+               return nil, err
+       }
+
+       err = cmd.Wait()
+       if err != nil {
+               return nil, err
+       }
+
+       return newSqueueContents, nil
+}
+
+func (squeue *Squeue) CheckSqueue(uuid string, check bool) (bool, error) {
+       if check {
+               n, err := squeue.runSqueue()
+               if err != nil {
+                       return false, err
+               }
+               squeue.Lock()
+               squeue.squeueContents = n
+               squeue.Unlock()
+       }
+
+       if uuid != "" {
+               squeue.Lock()
+               defer squeue.Unlock()
+               for _, k := range squeue.squeueContents {
+                       if k == uuid {
+                               return true, nil
+                       }
+               }
+       }
+       return false, nil
+}
+
+func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
+       // TODO: considering using "squeue -i" instead of polling squeue.
+       ticker := time.NewTicker(pollInterval)
+       for {
+               select {
+               case <-squeueUpdater.SqueueDone:
+                       return
+               case <-ticker.C:
+                       squeue.CheckSqueue("", true)
+               }
        }
-       return found, nil
 }
 
 // Run or monitor a container.
@@ -239,50 +262,91 @@ func run(dispatcher *dispatch.Dispatcher,
        uuid := container.UUID
 
        if container.State == dispatch.Locked {
-               if inQ, err := checkSqueue(container.UUID); err != nil {
+               if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
+                       // maybe squeue is broken, put it back in the queue
                        log.Printf("Error running squeue: %v", err)
-                       dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+                       dispatcher.UpdateState(container.UUID, dispatch.Queued)
                } else if !inQ {
                        log.Printf("About to submit queued container %v", container.UUID)
 
-                       jobid, err := submit(dispatcher, container, *crunchRunCommand)
-                       if err != nil {
-                               log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
-                       } else {
-                               finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
+                       if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
+                               log.Printf("Error submitting container %s to slurm: %v",
+                                       container.UUID, err)
+                               // maybe sbatch is broken, put it back to queued
+                               dispatcher.UpdateState(container.UUID, dispatch.Queued)
                        }
                }
-       } else if container.State == dispatch.Running {
-               if inQ, err := checkSqueue(container.UUID); err != nil {
-                       log.Printf("Error running squeue: %v", err)
-                       dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
-               } else if !inQ {
-                       log.Printf("Container %s in Running state but not in slurm queue, marking Cancelled.", container.UUID)
-                       dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
-               }
        }
 
        log.Printf("Monitoring container %v started", uuid)
 
-       for container = range status {
-               if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
-                       log.Printf("Canceling container %s", container.UUID)
-
-                       err := exec.Command("scancel", "--name="+container.UUID).Run()
-                       if err != nil {
-                               log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
-                               if inQ, err := checkSqueue(container.UUID); err != nil {
+       // periodically check squeue
+       doneSqueue := make(chan struct{})
+       go func() {
+               squeueUpdater.CheckSqueue(container.UUID, true)
+               ticker := time.NewTicker(dispatcher.PollInterval)
+               for {
+                       select {
+                       case <-ticker.C:
+                               if inQ, err := squeueUpdater.CheckSqueue(container.UUID, false); err != nil {
                                        log.Printf("Error running squeue: %v", err)
-                                       continue
-                               } else if inQ {
-                                       log.Printf("Container %s is still in squeue after scancel.", container.UUID)
-                                       continue
+                                       // don't cancel, just leave it the way it is
+                               } else if !inQ {
+                                       var con dispatch.Container
+                                       err := dispatcher.Arv.Get("containers", uuid, nil, &con)
+                                       if err != nil {
+                                               log.Printf("Error getting final container state: %v", err)
+                                       }
+
+                                       var st string
+                                       switch con.State {
+                                       case dispatch.Locked:
+                                               st = dispatch.Queued
+                                       case dispatch.Running:
+                                               st = dispatch.Cancelled
+                                       default:
+                                               st = ""
+                                       }
+
+                                       if st != "" {
+                                               log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+                                                       uuid, con.State, st)
+                                               dispatcher.UpdateState(uuid, st)
+                                       }
                                }
+                       case <-doneSqueue:
+                               close(doneSqueue)
+                               ticker.Stop()
+                               return
                        }
+               }
+       }()
 
-                       err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+       for container = range status {
+               if container.State == dispatch.Locked || container.State == dispatch.Running {
+                       if container.Priority == 0 {
+                               log.Printf("Canceling container %s", container.UUID)
+
+                               err := exec.Command("scancel", "--name="+container.UUID).Run()
+                               if err != nil {
+                                       log.Printf("Error stopping container %s with scancel: %v",
+                                               container.UUID, err)
+                                       if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
+                                               log.Printf("Error running squeue: %v", err)
+                                               continue
+                                       } else if inQ {
+                                               log.Printf("Container %s is still in squeue after scancel.",
+                                                       container.UUID)
+                                               continue
+                                       }
+                               }
+
+                               err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+                       }
                }
        }
 
+       doneSqueue <- struct{}{}
+
        log.Printf("Monitoring container %v finished", uuid)
 }
index 348d5e48b87fdf19d26c75423f7f07656f42b1bd..d30c5dfceaca71c2acc8de3467e8d2325b1ea2fa 100644 (file)
@@ -1,12 +1,12 @@
 package main
 
 import (
+       "bytes"
+       "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
-
-       "bytes"
-       "fmt"
+       "io"
        "log"
        "math"
        "net/http"
@@ -35,35 +35,43 @@ var initialArgs []string
 
 func (s *TestSuite) SetUpSuite(c *C) {
        initialArgs = os.Args
-       arvadostest.StartAPI()
 }
 
 func (s *TestSuite) TearDownSuite(c *C) {
-       arvadostest.StopAPI()
 }
 
 func (s *TestSuite) SetUpTest(c *C) {
        args := []string{"crunch-dispatch-slurm"}
        os.Args = args
 
+       arvadostest.StartAPI()
        os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
 }
 
 func (s *TestSuite) TearDownTest(c *C) {
-       arvadostest.ResetEnv()
        os.Args = initialArgs
+       arvadostest.StopAPI()
 }
 
 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
        arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) TestIntegration(c *C) {
+func (s *TestSuite) TestIntegrationNormal(c *C) {
+       s.integrationTest(c, false)
+}
+
+func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
+       s.integrationTest(c, true)
+}
+
+func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
+       arvadostest.ResetEnv()
+
        arv, err := arvadosclient.MakeArvadosClient()
        c.Assert(err, IsNil)
 
        var sbatchCmdLine []string
-       var striggerCmdLine []string
 
        // Override sbatchCmd
        defer func(orig func(dispatch.Container) *exec.Cmd) {
@@ -74,30 +82,16 @@ func (s *TestSuite) TestIntegration(c *C) {
                return exec.Command("sh")
        }
 
-       // Override striggerCmd
-       defer func(orig func(jobid, containerUUID, finishCommand,
-               apiHost, apiToken, apiInsecure string) *exec.Cmd) {
-               striggerCmd = orig
-       }(striggerCmd)
-       striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
-               striggerCmdLine = striggerFunc(jobid, containerUUID, finishCommand,
-                       apiHost, apiToken, apiInsecure).Args
-               go func() {
-                       time.Sleep(5 * time.Second)
-                       arv.Update("containers", containerUUID,
-                               arvadosclient.Dict{
-                                       "container": arvadosclient.Dict{"state": dispatch.Complete}},
-                               nil)
-               }()
-               return exec.Command("echo", striggerCmdLine...)
-       }
-
        // Override squeueCmd
        defer func(orig func() *exec.Cmd) {
                squeueCmd = orig
        }(squeueCmd)
        squeueCmd = func() *exec.Cmd {
-               return exec.Command("echo")
+               if missingFromSqueue {
+                       return exec.Command("echo")
+               } else {
+                       return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+               }
        }
 
        // There should be no queued containers now
@@ -111,8 +105,6 @@ func (s *TestSuite) TestIntegration(c *C) {
 
        echo := "echo"
        crunchRunCommand = &echo
-       finishCmd := "/usr/bin/crunch-finish-slurm.sh"
-       finishCommand = &finishCmd
 
        doneProcessing := make(chan struct{})
        dispatcher := dispatch.Dispatcher{
@@ -122,8 +114,8 @@ func (s *TestSuite) TestIntegration(c *C) {
                        container dispatch.Container,
                        status chan dispatch.Container) {
                        go func() {
-                               time.Sleep(1)
                                dispatcher.UpdateState(container.UUID, dispatch.Running)
+                               time.Sleep(3 * time.Second)
                                dispatcher.UpdateState(container.UUID, dispatch.Complete)
                        }()
                        run(dispatcher, container, status)
@@ -131,19 +123,29 @@ func (s *TestSuite) TestIntegration(c *C) {
                },
                DoneProcessing: doneProcessing}
 
+       squeueUpdater.SqueueDone = make(chan struct{})
+       go squeueUpdater.SyncSqueue(time.Duration(500) * time.Millisecond)
+
        err = dispatcher.RunDispatcher()
        c.Assert(err, IsNil)
 
+       squeueUpdater.SqueueDone <- struct{}{}
+       close(squeueUpdater.SqueueDone)
+
        item := containers.Items[0]
        sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
                fmt.Sprintf("--job-name=%s", item.UUID),
                fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
                fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
                fmt.Sprintf("--priority=%d", item.Priority)}
-       c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
-       c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
-               "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
+       if missingFromSqueue {
+               // not in squeue when run() started, so it will have called sbatch
+               c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+       } else {
+               // already in squeue when run() started, will have just monitored it instead
+               c.Check(sbatchCmdLine, DeepEquals, []string(nil))
+       }
 
        // There should be no queued containers now
        err = arv.List("containers", params, &containers)
@@ -154,7 +156,11 @@ func (s *TestSuite) TestIntegration(c *C) {
        var container dispatch.Container
        err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
        c.Check(err, IsNil)
-       c.Check(container.State, Equals, "Complete")
+       if missingFromSqueue {
+               c.Check(container.State, Equals, "Cancelled")
+       } else {
+               c.Check(container.State, Equals, "Complete")
+       }
 }
 
 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
@@ -180,12 +186,10 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
        }
 
        buf := bytes.NewBuffer(nil)
-       log.SetOutput(buf)
+       log.SetOutput(io.MultiWriter(buf, os.Stderr))
        defer log.SetOutput(os.Stderr)
 
        crunchRunCommand = &crunchCmd
-       finishCmd := "/usr/bin/crunch-finish-slurm.sh"
-       finishCommand = &finishCmd
 
        doneProcessing := make(chan struct{})
        dispatcher := dispatch.Dispatcher{
@@ -195,7 +199,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                        container dispatch.Container,
                        status chan dispatch.Container) {
                        go func() {
-                               time.Sleep(1)
+                               time.Sleep(1 * time.Second)
                                dispatcher.UpdateState(container.UUID, dispatch.Running)
                                dispatcher.UpdateState(container.UUID, dispatch.Complete)
                        }()
diff --git a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
deleted file mode 100755 (executable)
index 95a37ba..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/sh
-
-# Script to be called by strigger when a job finishes.  This ensures the job
-# record has the correct state "Complete" even if the node running the job
-# failed.
-
-ARVADOS_API_HOST=$1
-ARVADOS_API_TOKEN=$2
-ARVADOS_API_HOST_INSECURE=$3
-uuid=$4
-jobid=$5
-
-# If it is possible to attach metadata to job records we could look up the
-# above information instead of getting it on the command line.  For example,
-# this is the recipe for getting the job name (container uuid) from the job id.
-#uuid=$(squeue --jobs=$jobid --states=all --format=%j --noheader)
-
-export ARVADOS_API_HOST ARVADOS_API_TOKEN ARVADOS_API_HOST_INSECURE
-
-exec arv container update --uuid $uuid --container '{"state": "Complete"}'