9187: Refactor dispatcher support into common library and update to use Locking API.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 19 May 2016 18:12:42 +0000 (14:12 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 26 May 2016 20:11:51 +0000 (16:11 -0400)
New dispatcher package in Go SDK provides framework for monitoring list of
queued/locked/running containers.  Try to lock containers in the queue; locked
or running containers are passed to RunContainer goroutine supplied by the
specific dispatcher.  Refactor existing dispatchers (-local and -slurm) to use
this framework.  Dispatchers have crash recovery behavior, can put containers
which are unaccounted in cancelled state.

sdk/go/dispatch/dispatch.go [new file with mode: 0644]
services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
new file mode 100644 (file)
index 0000000..355ed7c
--- /dev/null
@@ -0,0 +1,229 @@
+package dispatch
+
+import (
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "log"
+       "os"
+       "os/signal"
+       "sync"
+       "syscall"
+       "time"
+)
+
+// Constants for container states
+const (
+       Queued    = "Queued"
+       Locked    = "Locked"
+       Running   = "Running"
+       Complete  = "Complete"
+       Cancelled = "Cancelled"
+)
+
+type apiClientAuthorization struct {
+       UUID     string `json:"uuid"`
+       APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+       Items []apiClientAuthorization `json:"items"`
+}
+
+// Container data
+type Container struct {
+       UUID               string           `json:"uuid"`
+       State              string           `json:"state"`
+       Priority           int              `json:"priority"`
+       RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
+       LockedByUUID       string           `json:"locked_by_uuid"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+       Items          []Container `json:"items"`
+       ItemsAvailable int         `json:"items_available"`
+}
+
+// Dispatcher holds the state of the dispatcher
+type Dispatcher struct {
+       Arv            arvadosclient.ArvadosClient
+       RunContainer   func(*Dispatcher, Container, chan Container)
+       PollInterval   time.Duration
+       DoneProcessing chan struct{}
+
+       mineMutex  sync.Mutex
+       mineMap    map[string]chan Container
+       auth       apiClientAuthorization
+       containers chan Container
+}
+
+// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
+// for which this process is actively starting/monitoring.  Returns channel to
+// be used to send container status updates.
+func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+       dispatcher.mineMutex.Lock()
+       defer dispatcher.mineMutex.Unlock()
+       if ch, ok := dispatcher.mineMap[uuid]; ok {
+               return ch
+       }
+
+       ch := make(chan Container)
+       dispatcher.mineMap[uuid] = ch
+       return ch
+}
+
+// Release a container which is no longer being monitored.
+func (dispatcher *Dispatcher) notMine(uuid string) {
+       dispatcher.mineMutex.Lock()
+       defer dispatcher.mineMutex.Unlock()
+       if ch, ok := dispatcher.mineMap[uuid]; ok {
+               close(ch)
+               delete(dispatcher.mineMap, uuid)
+       }
+}
+
+// Check if there is a channel for updates associated with this container.  If
+// so send the container record on the channel and return true, if not return
+// false.
+func (dispatcher *Dispatcher) updateMine(c Container) bool {
+       dispatcher.mineMutex.Lock()
+       defer dispatcher.mineMutex.Unlock()
+       ch, ok := dispatcher.mineMap[c.UUID]
+       if ok {
+               ch <- c
+               return true
+       }
+       return false
+}
+
+func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
+       var containers ContainerList
+       err := dispatcher.Arv.List("containers", params, &containers)
+       if err != nil {
+               log.Printf("Error getting list of containers: %q", err)
+       } else {
+               if containers.ItemsAvailable > len(containers.Items) {
+                       // TODO: support paging
+                       log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+                               containers.ItemsAvailable,
+                               len(containers.Items))
+               }
+               for _, container := range containers.Items {
+                       touched[container.UUID] = true
+                       dispatcher.containers <- container
+               }
+       }
+}
+
+func (dispatcher *Dispatcher) pollContainers() {
+       ticker := time.NewTicker(dispatcher.PollInterval)
+
+       paramsQ := arvadosclient.Dict{
+               "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
+               "order":   []string{"priority desc"},
+               "limit":   "1000"}
+       paramsP := arvadosclient.Dict{
+               "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+               "limit":   "1000"}
+
+       for {
+               select {
+               case <-ticker.C:
+                       touched := make(map[string]bool)
+                       dispatcher.getContainers(paramsQ, touched)
+                       dispatcher.getContainers(paramsP, touched)
+                       dispatcher.mineMutex.Lock()
+                       var monitored []string
+                       for k := range dispatcher.mineMap {
+                               if _, ok := touched[k]; !ok {
+                                       monitored = append(monitored, k)
+                               }
+                       }
+                       dispatcher.mineMutex.Unlock()
+                       if monitored != nil {
+                               dispatcher.getContainers(arvadosclient.Dict{
+                                       "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+                       }
+               case <-dispatcher.DoneProcessing:
+                       close(dispatcher.containers)
+                       ticker.Stop()
+                       return
+               }
+       }
+}
+
+func (dispatcher *Dispatcher) handleUpdate(container Container) {
+       if dispatcher.updateMine(container) {
+               if container.State == Complete || container.State == Cancelled {
+                       log.Printf("Container %v now in state %v", container.UUID, container.State)
+                       dispatcher.notMine(container.UUID)
+               }
+               return
+       }
+
+       if container.State == Queued {
+               // Try to take the lock
+               if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+                       return
+               }
+               container.State = Locked
+       }
+
+       if container.State == Locked || container.State == Running {
+               go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+       }
+}
+
+// UpdateState makes an API call to change the state of a container.
+func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+       err := dispatcher.Arv.Update("containers", uuid,
+               arvadosclient.Dict{
+                       "container": arvadosclient.Dict{"state": newState}},
+               nil)
+       if err != nil {
+               log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+       }
+       return err
+}
+
+// RunDispatcher runs the main loop of the dispatcher until receiving a message
+// on the dispatcher.DoneProcessing channel.  It also installs a signal handler
+// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
+//
+// When a new queued container appears and is successfully locked, the
+// dispatcher will call RunContainer() followed by MonitorContainer().  If a
+// container appears that is Locked or Running but not known to the dispatcher,
+// it will only call monitorContainer().  The monitorContainer() callback is
+// passed a channel over which it will receive updates to the container state.
+// The callback is responsible for draining the channel, if it fails to do so
+// it will deadlock the dispatcher.
+func (dispatcher *Dispatcher) RunDispatcher() (err error) {
+       err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+       if err != nil {
+               log.Printf("Error getting my token UUID: %v", err)
+               return
+       }
+
+       dispatcher.mineMap = make(map[string]chan Container)
+       dispatcher.containers = make(chan Container)
+
+       // Graceful shutdown on signal
+       sigChan := make(chan os.Signal)
+       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+
+       go func(sig <-chan os.Signal) {
+               for sig := range sig {
+                       log.Printf("Caught signal: %v", sig)
+                       dispatcher.DoneProcessing <- struct{}{}
+               }
+       }(sigChan)
+
+       defer close(sigChan)
+       defer signal.Stop(sigChan)
+
+       go dispatcher.pollContainers()
+       for container := range dispatcher.containers {
+               dispatcher.handleUpdate(container)
+       }
+
+       return nil
+}
index 40238706063c6659f0e39d89ac1af7eca5e85795..cc472a40311adc36f91202512c3f4f7fb52e2b8f 100644 (file)
@@ -1,14 +1,15 @@
 package main
 
+// Dispatcher service for Crunch that runs containers locally.
+
 import (
        "flag"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/dispatch"
        "log"
        "os"
        "os/exec"
-       "os/signal"
        "sync"
-       "syscall"
        "time"
 )
 
@@ -20,12 +21,10 @@ func main() {
 }
 
 var (
-       arv              arvadosclient.ArvadosClient
        runningCmds      map[string]*exec.Cmd
        runningCmdsMutex sync.Mutex
        waitGroup        sync.WaitGroup
-       doneProcessing   chan bool
-       sigChan          chan os.Signal
+       crunchRunCommand *string
 )
 
 func doMain() error {
@@ -36,12 +35,7 @@ func doMain() error {
                10,
                "Interval in seconds to poll for queued containers")
 
-       priorityPollInterval := flags.Int(
-               "container-priority-poll-interval",
-               60,
-               "Interval in seconds to check priority of a dispatched container")
-
-       crunchRunCommand := flags.String(
+       crunchRunCommand = flags.String(
                "crunch-run-command",
                "/usr/bin/crunch-run",
                "Crunch command to run container")
@@ -49,35 +43,32 @@ func doMain() error {
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
 
-       var err error
-       arv, err = arvadosclient.MakeArvadosClient()
+       runningCmds = make(map[string]*exec.Cmd)
+
+       arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
+               log.Printf("Error making Arvados client: %v", err)
                return err
        }
+       arv.Retries = 25
 
-       // Channel to terminate
-       doneProcessing = make(chan bool)
-
-       // Map of running crunch jobs
-       runningCmds = make(map[string]*exec.Cmd)
-
-       // Graceful shutdown
-       sigChan = make(chan os.Signal, 1)
-       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-       go func(sig <-chan os.Signal) {
-               for sig := range sig {
-                       log.Printf("Caught signal: %v", sig)
-                       doneProcessing <- true
-               }
-       }(sigChan)
+       dispatcher := dispatch.Dispatcher{
+               Arv:            arv,
+               RunContainer:   run,
+               PollInterval:   time.Duration(*pollInterval) * time.Second,
+               DoneProcessing: make(chan struct{})}
 
-       // Run all queued containers
-       runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
+       err = dispatcher.RunDispatcher()
+       if err != nil {
+               return err
+       }
 
+       runningCmdsMutex.Lock()
        // Finished dispatching; interrupt any crunch jobs that are still running
        for _, cmd := range runningCmds {
                cmd.Process.Signal(os.Interrupt)
        }
+       runningCmdsMutex.Unlock()
 
        // Wait for all running crunch jobs to complete / terminate
        waitGroup.Wait()
@@ -85,166 +76,98 @@ func doMain() error {
        return nil
 }
 
-// Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
-       ticker := time.NewTicker(pollInterval)
-
-       for {
-               select {
-               case <-ticker.C:
-                       dispatchLocal(priorityPollInterval, crunchRunCommand)
-               case <-doneProcessing:
-                       ticker.Stop()
-                       return
-               }
-       }
+func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
+       return cmd.Start()
 }
 
-// Container data
-type Container struct {
-       UUID         string `json:"uuid"`
-       State        string `json:"state"`
-       Priority     int    `json:"priority"`
-       LockedByUUID string `json:"locked_by_uuid"`
-}
+var startCmd = startFunc
 
-// ContainerList is a list of the containers from api
-type ContainerList struct {
-       Items []Container `json:"items"`
-}
-
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
-       params := arvadosclient.Dict{
-               "filters": [][]string{[]string{"state", "=", "Queued"}},
-       }
+// Run a container.
+//
+// If the container is Locked, start a new crunch-run process and wait until
+// crunch-run completes.  If the priority is set to zero, set an interrupt
+// signal to the crunch-run process.
+//
+// If the container is in any other state, or is not Complete/Cancelled after
+// crunch-run terminates, mark the container as Cancelled.
+func run(dispatcher *dispatch.Dispatcher,
+       container dispatch.Container,
+       status chan dispatch.Container) {
 
-       var containers ContainerList
-       err := arv.List("containers", params, &containers)
-       if err != nil {
-               log.Printf("Error getting list of queued containers: %q", err)
-               return
-       }
+       uuid := container.UUID
 
-       for _, c := range containers.Items {
-               log.Printf("About to run queued container %v", c.UUID)
-               // Run the container
+       if container.State == dispatch.Locked {
                waitGroup.Add(1)
-               go func(c Container) {
-                       run(c.UUID, crunchRunCommand, pollInterval)
-                       waitGroup.Done()
-               }(c)
-       }
-}
-
-func updateState(uuid, newState string) error {
-       err := arv.Update("containers", uuid,
-               arvadosclient.Dict{
-                       "container": arvadosclient.Dict{"state": newState}},
-               nil)
-       if err != nil {
-               log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
-       }
-       return err
-}
-
-// Run queued container:
-// Set container state to Locked
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
-       if err := updateState(uuid, "Locked"); err != nil {
-               return
-       }
 
-       cmd := exec.Command(crunchRunCommand, uuid)
-       cmd.Stdin = nil
-       cmd.Stderr = os.Stderr
-       cmd.Stdout = os.Stderr
+               cmd := exec.Command(*crunchRunCommand, uuid)
+               cmd.Stdin = nil
+               cmd.Stderr = os.Stderr
+               cmd.Stdout = os.Stderr
 
-       // Add this crunch job to the list of runningCmds only if we
-       // succeed in starting crunch-run.
-       runningCmdsMutex.Lock()
-       if err := cmd.Start(); err != nil {
-               log.Printf("Error starting crunch-run for %v: %q", uuid, err)
-               runningCmdsMutex.Unlock()
-               updateState(uuid, "Queued")
-               return
-       }
-       runningCmds[uuid] = cmd
-       runningCmdsMutex.Unlock()
+               log.Printf("Starting container %v", uuid)
 
-       defer func() {
-               setFinalState(uuid)
+               // Add this crunch job to the list of runningCmds only if we
+               // succeed in starting crunch-run.
 
-               // Remove the crunch job from runningCmds
                runningCmdsMutex.Lock()
-               delete(runningCmds, uuid)
-               runningCmdsMutex.Unlock()
-       }()
-
-       log.Printf("Starting container %v", uuid)
-
-       updateState(uuid, "Running")
+               if err := startCmd(container, cmd); err != nil {
+                       runningCmdsMutex.Unlock()
+                       log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
+                       dispatcher.UpdateState(uuid, dispatch.Cancelled)
+               } else {
+                       runningCmds[uuid] = cmd
+                       runningCmdsMutex.Unlock()
+
+                       // Need to wait for crunch-run to exit
+                       done := make(chan struct{})
+
+                       go func() {
+                               if _, err := cmd.Process.Wait(); err != nil {
+                                       log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+                               }
+                               log.Printf("sending done")
+                               done <- struct{}{}
+                       }()
+
+               Loop:
+                       for {
+                               select {
+                               case <-done:
+                                       break Loop
+                               case c := <-status:
+                                       // Interrupt the child process if priority changes to 0
+                                       if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
+                                               log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+                                               cmd.Process.Signal(os.Interrupt)
+                                       }
+                               }
+                       }
+                       close(done)
 
-       cmdExited := make(chan struct{})
+                       log.Printf("Finished container run for %v", uuid)
 
-       // Kill the child process if container priority changes to zero
-       go func() {
-               ticker := time.NewTicker(pollInterval)
-               defer ticker.Stop()
-               for {
-                       select {
-                       case <-cmdExited:
-                               return
-                       case <-ticker.C:
-                       }
-                       var container Container
-                       err := arv.Get("containers", uuid, nil, &container)
-                       if err != nil {
-                               log.Printf("Error getting container %v: %q", uuid, err)
-                               continue
-                       }
-                       if container.Priority == 0 {
-                               log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
-                               cmd.Process.Signal(os.Interrupt)
-                       }
+                       // Remove the crunch job from runningCmds
+                       runningCmdsMutex.Lock()
+                       delete(runningCmds, uuid)
+                       runningCmdsMutex.Unlock()
                }
-       }()
-
-       // Wait for crunch-run to exit
-       if _, err := cmd.Process.Wait(); err != nil {
-               log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+               waitGroup.Done()
        }
-       close(cmdExited)
-
-       log.Printf("Finished container run for %v", uuid)
-}
 
-func setFinalState(uuid string) {
-       // The container state should now be 'Complete' if everything
-       // went well. If it started but crunch-run didn't change its
-       // final state to 'Running', fix that now. If it never even
-       // started, cancel it as unrunnable. (TODO: Requeue instead,
-       // and fix tests so they can tell something happened even if
-       // the final state is Queued.)
-       var container Container
-       err := arv.Get("containers", uuid, nil, &container)
+       // If the container is not finalized, then change it to "Cancelled".
+       err := dispatcher.Arv.Get("containers", uuid, nil, &container)
        if err != nil {
                log.Printf("Error getting final container state: %v", err)
        }
-       fixState := map[string]string{
-               "Running": "Complete",
-               "Locked": "Cancelled",
+       if container.State != dispatch.Complete && container.State != dispatch.Cancelled {
+               log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
+                       *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
+               dispatcher.UpdateState(uuid, dispatch.Cancelled)
        }
-       if newState, ok := fixState[container.State]; ok {
-               log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
-               updateState(uuid, newState)
+
+       // drain any subsequent status changes
+       for _ = range status {
        }
+
+       log.Printf("Finalized container %v", uuid)
 }
index e3ab3a4e1d55ddc5df609fc4273cdfc57e63430a..aca60e9b7b5202702721ef1d172522cc67cec16d 100644 (file)
@@ -1,19 +1,20 @@
 package main
 
 import (
+       "bytes"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-
-       "bytes"
+       "git.curoverse.com/arvados.git/sdk/go/dispatch"
+       . "gopkg.in/check.v1"
+       "io"
        "log"
        "net/http"
        "net/http/httptest"
        "os"
-       "syscall"
+       "os/exec"
+       "strings"
        "testing"
        "time"
-
-       . "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -32,6 +33,7 @@ var initialArgs []string
 func (s *TestSuite) SetUpSuite(c *C) {
        initialArgs = os.Args
        arvadostest.StartAPI()
+       runningCmds = make(map[string]*exec.Cmd)
 }
 
 func (s *TestSuite) TearDownSuite(c *C) {
@@ -41,12 +43,6 @@ func (s *TestSuite) TearDownSuite(c *C) {
 func (s *TestSuite) SetUpTest(c *C) {
        args := []string{"crunch-dispatch-local"}
        os.Args = args
-
-       var err error
-       arv, err = arvadosclient.MakeArvadosClient()
-       if err != nil {
-               c.Fatalf("Error making arvados client: %s", err)
-       }
 }
 
 func (s *TestSuite) TearDownTest(c *C) {
@@ -58,29 +54,48 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
        arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) Test_doMain(c *C) {
-       args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
-       os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+       arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, IsNil)
+
+       echo := "echo"
+       crunchRunCommand = &echo
+
+       doneProcessing := make(chan struct{})
+       dispatcher := dispatch.Dispatcher{
+               Arv:          arv,
+               PollInterval: time.Duration(1) * time.Second,
+               RunContainer: func(dispatcher *dispatch.Dispatcher,
+                       container dispatch.Container,
+                       status chan dispatch.Container) {
+                       run(dispatcher, container, status)
+                       doneProcessing <- struct{}{}
+               },
+               DoneProcessing: doneProcessing}
+
+       startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+               dispatcher.UpdateState(container.UUID, "Running")
+               dispatcher.UpdateState(container.UUID, "Complete")
+               return cmd.Start()
+       }
 
-       go func() {
-               time.Sleep(5 * time.Second)
-               sigChan <- syscall.SIGINT
-       }()
+       err = dispatcher.RunDispatcher()
+       c.Assert(err, IsNil)
 
-       err := doMain()
-       c.Check(err, IsNil)
+       // Wait for all running crunch jobs to complete / terminate
+       waitGroup.Wait()
 
        // There should be no queued containers now
        params := arvadosclient.Dict{
                "filters": [][]string{[]string{"state", "=", "Queued"}},
        }
-       var containers ContainerList
+       var containers dispatch.ContainerList
        err = arv.List("containers", params, &containers)
        c.Check(err, IsNil)
        c.Assert(len(containers.Items), Equals, 0)
 
        // Previously "Queued" container should now be in "Complete" state
-       var container Container
+       var container dispatch.Container
        err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
        c.Check(err, IsNil)
        c.Check(container.State, Equals, "Complete")
@@ -90,13 +105,13 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
        apiStubResponses := make(map[string]arvadostest.StubResponse)
        apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-       testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+       testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
 }
 
 func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
        apiStubResponses := make(map[string]arvadostest.StubResponse)
        apiStubResponses["/arvados/v1/containers"] =
-               arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1"}]}`)}
+               arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1","State":"Queued"}]}`)}
        apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
                arvadostest.StubResponse{500, string(`{}`)}
 
@@ -106,31 +121,35 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
        apiStubResponses := make(map[string]arvadostest.StubResponse)
        apiStubResponses["/arvados/v1/containers"] =
-               arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2"}]}`)}
+               arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
        apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
                arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
 
        testWithServerStub(c, apiStubResponses, "echo",
-               "After crunch-run process termination, the state is still 'Running' for zzzzz-dz642-xxxxxxxxxxxxxx2")
+               `After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2".  Updating it to "Cancelled"`)
 }
 
 func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
        apiStubResponses := make(map[string]arvadostest.StubResponse)
        apiStubResponses["/arvados/v1/containers"] =
-               arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3"}]}`)}
+               arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Queued"}]}`)}
+
        apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
                arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
 
-       testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting crunch-run for zzzzz-dz642-xxxxxxxxxxxxxx3")
+       testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting nosuchcommand for zzzzz-dz642-xxxxxxxxxxxxxx3")
 }
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+       apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
+               arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+
        apiStub := arvadostest.ServerStub{apiStubResponses}
 
        api := httptest.NewServer(&apiStub)
        defer api.Close()
 
-       arv = arvadosclient.ArvadosClient{
+       arv := arvadosclient.ArvadosClient{
                Scheme:    "http",
                ApiServer: api.URL[7:],
                ApiToken:  "abc123",
@@ -139,15 +158,38 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
        }
 
        buf := bytes.NewBuffer(nil)
-       log.SetOutput(buf)
+       log.SetOutput(io.MultiWriter(buf, os.Stderr))
        defer log.SetOutput(os.Stderr)
 
+       *crunchRunCommand = crunchCmd
+
+       doneProcessing := make(chan struct{})
+       dispatcher := dispatch.Dispatcher{
+               Arv:          arv,
+               PollInterval: time.Duration(1) * time.Second,
+               RunContainer: func(dispatcher *dispatch.Dispatcher,
+                       container dispatch.Container,
+                       status chan dispatch.Container) {
+                       run(dispatcher, container, status)
+                       doneProcessing <- struct{}{}
+               },
+               DoneProcessing: doneProcessing}
+
+       startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+               dispatcher.UpdateState(container.UUID, "Running")
+               dispatcher.UpdateState(container.UUID, "Complete")
+               return cmd.Start()
+       }
+
        go func() {
-               time.Sleep(2 * time.Second)
-               sigChan <- syscall.SIGTERM
+               for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+                       time.Sleep(100 * time.Millisecond)
+               }
+               dispatcher.DoneProcessing <- struct{}{}
        }()
 
-       runQueuedContainers(time.Second, time.Second, crunchCmd)
+       err := dispatcher.RunDispatcher()
+       c.Assert(err, IsNil)
 
        // Wait for all running crunch jobs to complete / terminate
        waitGroup.Wait()
index 53e470525385b4117b496e7cebae0bb41b69a9b3..641b4bcd775a3b58573ae529402b7c837d2a2807 100644 (file)
@@ -1,19 +1,19 @@
 package main
 
+// Dispatcher service for Crunch that submits containers to the slurm queue.
+
 import (
        "bufio"
        "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/dispatch"
        "io/ioutil"
        "log"
        "math"
        "os"
        "os/exec"
-       "os/signal"
-       "strconv"
-       "sync"
-       "syscall"
+       "strings"
        "time"
 )
 
@@ -25,12 +25,8 @@ func main() {
 }
 
 var (
-       arv              arvadosclient.ArvadosClient
-       runningCmds      map[string]*exec.Cmd
-       runningCmdsMutex sync.Mutex
-       waitGroup        sync.WaitGroup
-       doneProcessing   chan bool
-       sigChan          chan os.Signal
+       crunchRunCommand *string
+       finishCommand    *string
 )
 
 func doMain() error {
@@ -41,17 +37,12 @@ func doMain() error {
                10,
                "Interval in seconds to poll for queued containers")
 
-       priorityPollInterval := flags.Int(
-               "container-priority-poll-interval",
-               60,
-               "Interval in seconds to check priority of a dispatched container")
-
-       crunchRunCommand := flags.String(
+       crunchRunCommand = flags.String(
                "crunch-run-command",
                "/usr/bin/crunch-run",
                "Crunch command to run container")
 
-       finishCommand := flags.String(
+       finishCommand = flags.String(
                "finish-command",
                "/usr/bin/crunch-finish-slurm.sh",
                "Command to run from strigger when job is finished")
@@ -59,142 +50,56 @@ func doMain() error {
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
 
-       var err error
-       arv, err = arvadosclient.MakeArvadosClient()
+       arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
+               log.Printf("Error making Arvados client: %v", err)
                return err
        }
+       arv.Retries = 25
 
-       // Channel to terminate
-       doneProcessing = make(chan bool)
-
-       // Graceful shutdown
-       sigChan = make(chan os.Signal, 1)
-       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-       go func(sig <-chan os.Signal) {
-               for sig := range sig {
-                       log.Printf("Caught signal: %v", sig)
-                       doneProcessing <- true
-               }
-       }(sigChan)
-
-       // Run all queued containers
-       runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
-
-       // Wait for all running crunch jobs to complete / terminate
-       waitGroup.Wait()
-
-       return nil
-}
-
-type apiClientAuthorization struct {
-       UUID     string `json:"uuid"`
-       APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
-       Items []apiClientAuthorization `json:"items"`
-}
+       dispatcher := dispatch.Dispatcher{
+               Arv:            arv,
+               RunContainer:   run,
+               PollInterval:   time.Duration(*pollInterval) * time.Second,
+               DoneProcessing: make(chan struct{})}
 
-// Poll for queued containers using pollInterval.
-// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
-       var auth apiClientAuthorization
-       err := arv.Call("GET", "api_client_authorizations", "", "current", nil, &auth)
+       err = dispatcher.RunDispatcher()
        if err != nil {
-               log.Printf("Error getting my token UUID: %v", err)
-               return
-       }
-
-       ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
-       for {
-               select {
-               case <-ticker.C:
-                       dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
-               case <-doneProcessing:
-                       ticker.Stop()
-                       return
-               }
-       }
-}
-
-// Container data
-type Container struct {
-       UUID               string           `json:"uuid"`
-       State              string           `json:"state"`
-       Priority           int              `json:"priority"`
-       RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
-       LockedByUUID       string           `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
-       Items []Container `json:"items"`
-}
-
-// Get the list of queued containers from API server and invoke run
-// for each container.
-func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
-       params := arvadosclient.Dict{
-               "filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
-       }
-
-       var containers ContainerList
-       err := arv.List("containers", params, &containers)
-       if err != nil {
-               log.Printf("Error getting list of queued containers: %q", err)
-               return
+               return err
        }
 
-       for _, container := range containers.Items {
-               if container.State == "Locked" {
-                       if container.LockedByUUID != auth.UUID {
-                               // Locked by a different dispatcher
-                               continue
-                       } else if checkMine(container.UUID) {
-                               // I already have a goroutine running
-                               // for this container: it just hasn't
-                               // gotten past Locked state yet.
-                               continue
-                       }
-                       log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
-                               "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
-                               container.UUID, auth.UUID)
-                       setMine(container.UUID, true)
-                       go func() {
-                               waitContainer(container, pollInterval)
-                               setMine(container.UUID, false)
-                       }()
-               }
-               go run(container, crunchRunCommand, finishCommand, pollInterval)
-       }
+       return nil
 }
 
 // sbatchCmd
-func sbatchFunc(container Container) *exec.Cmd {
+func sbatchFunc(container dispatch.Container) *exec.Cmd {
        memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
        return exec.Command("sbatch", "--share", "--parsable",
-               "--job-name="+container.UUID,
-               "--mem-per-cpu="+strconv.Itoa(int(memPerCPU)),
-               "--cpus-per-task="+strconv.Itoa(int(container.RuntimeConstraints["vcpus"])))
+               fmt.Sprintf("--job-name=%s", container.UUID),
+               fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
+               fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+               fmt.Sprintf("--priority=%d", container.Priority))
 }
 
-var sbatchCmd = sbatchFunc
-
 // striggerCmd
 func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
        return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
                fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
 }
 
+// squeueFunc
+func squeueFunc() *exec.Cmd {
+       return exec.Command("squeue", "--format=%j")
+}
+
+// Wrap these so that they can be overridden by tests
 var striggerCmd = striggerFunc
+var sbatchCmd = sbatchFunc
+var squeueCmd = squeueFunc
 
 // Submit job to slurm using sbatch.
-func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+func submit(dispatcher *dispatch.Dispatcher,
+       container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
        submitErr = nil
 
        defer func() {
@@ -204,7 +109,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
                        // OK, no cleanup needed
                        return
                }
-               err := arv.Update("containers", container.UUID,
+               err := dispatcher.Arv.Update("containers", container.UUID,
                        arvadosclient.Dict{
                                "container": arvadosclient.Dict{"state": "Queued"}},
                        nil)
@@ -244,7 +149,6 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
                b, _ := ioutil.ReadAll(stdoutReader)
                stdoutReader.Close()
                stdoutChan <- b
-               close(stdoutChan)
        }()
 
        stderrChan := make(chan []byte)
@@ -252,7 +156,6 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
                b, _ := ioutil.ReadAll(stderrReader)
                stderrReader.Close()
                stderrChan <- b
-               close(stderrChan)
        }()
 
        // Send a tiny script on stdin to execute the crunch-run command
@@ -265,21 +168,28 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
        stdoutMsg := <-stdoutChan
        stderrmsg := <-stderrChan
 
+       close(stdoutChan)
+       close(stderrChan)
+
        if err != nil {
                submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
                return
        }
 
        // If everything worked out, got the jobid on stdout
-       jobid = string(stdoutMsg)
+       jobid = strings.TrimSpace(string(stdoutMsg))
 
        return
 }
 
 // finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
 // the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
-       cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
+       insecure := "0"
+       if arv.ApiInsecure {
+               insecure = "1"
+       }
+       cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
        cmd.Stdout = os.Stdout
        cmd.Stderr = os.Stderr
        err := cmd.Run()
@@ -291,104 +201,8 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
        }
 }
 
-// Run a queued container: [1] Set container state to locked. [2]
-// Execute crunch-run as a slurm batch job. [3] waitContainer().
-func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
-       setMine(container.UUID, true)
-       defer setMine(container.UUID, false)
-
-       // Update container status to Locked. This will fail if
-       // another dispatcher (token) has already locked it. It will
-       // succeed if *this* dispatcher has already locked it.
-       err := arv.Update("containers", container.UUID,
-               arvadosclient.Dict{
-                       "container": arvadosclient.Dict{"state": "Locked"}},
-               nil)
-       if err != nil {
-               log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
-               return
-       }
-
-       log.Printf("About to submit queued container %v", container.UUID)
-
-       jobid, err := submit(container, crunchRunCommand)
-       if err != nil {
-               log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
-               return
-       }
-
-       insecure := "0"
-       if arv.ApiInsecure {
-               insecure = "1"
-       }
-       finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
-
-       // Update container status to Running. This will fail if
-       // another dispatcher (token) has already locked it. It will
-       // succeed if *this* dispatcher has already locked it.
-       err = arv.Update("containers", container.UUID,
-               arvadosclient.Dict{
-                       "container": arvadosclient.Dict{"state": "Running"}},
-               nil)
-       if err != nil {
-               log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
-       }
-       log.Printf("Submitted container %v to slurm", container.UUID)
-       waitContainer(container, pollInterval)
-}
-
-// Wait for a container to finish. Cancel the slurm job if the
-// container priority changes to zero before it ends.
-func waitContainer(container Container, pollInterval time.Duration) {
-       log.Printf("Monitoring container %v started", container.UUID)
-       defer log.Printf("Monitoring container %v finished", container.UUID)
-
-       pollTicker := time.NewTicker(pollInterval)
-       defer pollTicker.Stop()
-       for _ = range pollTicker.C {
-               var updated Container
-               err := arv.Get("containers", container.UUID, nil, &updated)
-               if err != nil {
-                       log.Printf("Error getting container %s: %q", container.UUID, err)
-                       continue
-               }
-               if updated.State == "Complete" || updated.State == "Cancelled" {
-                       return
-               }
-               if updated.Priority != 0 {
-                       continue
-               }
-
-               // Priority is zero, but state is Running or Locked
-               log.Printf("Canceling container %s", container.UUID)
-
-               err = exec.Command("scancel", "--name="+container.UUID).Run()
-               if err != nil {
-                       log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
-                       if inQ, err := checkSqueue(container.UUID); err != nil {
-                               log.Printf("Error running squeue: %v", err)
-                               continue
-                       } else if inQ {
-                               log.Printf("Container %s is still in squeue; will retry", container.UUID)
-                               continue
-                       }
-               }
-
-               err = arv.Update("containers", container.UUID,
-                       arvadosclient.Dict{
-                               "container": arvadosclient.Dict{"state": "Cancelled"}},
-                       nil)
-               if err != nil {
-                       log.Printf("Error updating state for container %s: %s", container.UUID, err)
-                       continue
-               }
-
-               return
-       }
-}
-
 func checkSqueue(uuid string) (bool, error) {
-       cmd := exec.Command("squeue", "--format=%j")
+       cmd := squeueCmd()
        sq, err := cmd.StdoutPipe()
        if err != nil {
                return false, err
@@ -408,25 +222,67 @@ func checkSqueue(uuid string) (bool, error) {
        return found, nil
 }
 
-var mineMutex sync.RWMutex
-var mineMap = make(map[string]bool)
-
-// Goroutine-safely add/remove uuid to the set of "my" containers,
-// i.e., ones for which this process has a goroutine running.
-func setMine(uuid string, t bool) {
-       mineMutex.Lock()
-       if t {
-               mineMap[uuid] = true
-       } else {
-               delete(mineMap, uuid)
+// Run or monitor a container.
+//
+// If the container is marked as Locked, check if it is already in the slurm
+// queue.  If not, submit it.
+//
+// If the container is marked as Running, check if it is in the slurm queue.
+// If not, mark it as Cancelled.
+//
+// Monitor status updates.  If the priority changes to zero, cancel the
+// container using scancel.
+func run(dispatcher *dispatch.Dispatcher,
+       container dispatch.Container,
+       status chan dispatch.Container) {
+
+       uuid := container.UUID
+
+       if container.State == dispatch.Locked {
+               if inQ, err := checkSqueue(container.UUID); err != nil {
+                       log.Printf("Error running squeue: %v", err)
+                       dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+               } else if !inQ {
+                       log.Printf("About to submit queued container %v", container.UUID)
+
+                       jobid, err := submit(dispatcher, container, *crunchRunCommand)
+                       if err != nil {
+                               log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
+                       } else {
+                               finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
+                       }
+               }
+       } else if container.State == dispatch.Running {
+               if inQ, err := checkSqueue(container.UUID); err != nil {
+                       log.Printf("Error running squeue: %v", err)
+                       dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+               } else if !inQ {
+                       log.Printf("Container %s in Running state but not in slurm queue, marking Cancelled.", container.UUID)
+                       dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+               }
+       }
+
+       log.Printf("Monitoring container %v started", uuid)
+
+       for container = range status {
+               if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
+                       log.Printf("Canceling container %s", container.UUID)
+
+                       err := exec.Command("scancel", "--name="+container.UUID).Run()
+                       if err != nil {
+                               log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+                               if inQ, err := checkSqueue(container.UUID); err != nil {
+                                       log.Printf("Error running squeue: %v", err)
+                                       continue
+                               } else if inQ {
+                                       log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+                                       continue
+                               }
+                       }
+
+                       err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+               }
        }
-       mineMutex.Unlock()
-}
 
-// Check whether there is already a goroutine running for this
-// container.
-func checkMine(uuid string) bool {
-       mineMutex.RLocker().Lock()
-       defer mineMutex.RLocker().Unlock()
-       return mineMap[uuid]
+       log.Printf("Monitoring container %v finished", uuid)
 }
index 3dfb7d5a3e89ae5b9cb5678e545797e4fba47160..348d5e48b87fdf19d26c75423f7f07656f42b1bd 100644 (file)
@@ -3,6 +3,7 @@ package main
 import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/dispatch"
 
        "bytes"
        "fmt"
@@ -12,9 +13,7 @@ import (
        "net/http/httptest"
        "os"
        "os/exec"
-       "strconv"
        "strings"
-       "syscall"
        "testing"
        "time"
 
@@ -47,11 +46,6 @@ func (s *TestSuite) SetUpTest(c *C) {
        args := []string{"crunch-dispatch-slurm"}
        os.Args = args
 
-       var err error
-       arv, err = arvadosclient.MakeArvadosClient()
-       if err != nil {
-               c.Fatalf("Error making arvados client: %s", err)
-       }
        os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
 }
 
@@ -64,18 +58,18 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
        arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) Test_doMain(c *C) {
-       args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
-       os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+       arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, IsNil)
 
        var sbatchCmdLine []string
        var striggerCmdLine []string
 
        // Override sbatchCmd
-       defer func(orig func(Container) *exec.Cmd) {
+       defer func(orig func(dispatch.Container) *exec.Cmd) {
                sbatchCmd = orig
        }(sbatchCmd)
-       sbatchCmd = func(container Container) *exec.Cmd {
+       sbatchCmd = func(container dispatch.Container) *exec.Cmd {
                sbatchCmdLine = sbatchFunc(container).Args
                return exec.Command("sh")
        }
@@ -90,41 +84,65 @@ func (s *TestSuite) Test_doMain(c *C) {
                        apiHost, apiToken, apiInsecure).Args
                go func() {
                        time.Sleep(5 * time.Second)
-                       for _, state := range []string{"Running", "Complete"} {
-                               arv.Update("containers", containerUUID,
-                                       arvadosclient.Dict{
-                                               "container": arvadosclient.Dict{"state": state}},
-                                       nil)
-                       }
+                       arv.Update("containers", containerUUID,
+                               arvadosclient.Dict{
+                                       "container": arvadosclient.Dict{"state": dispatch.Complete}},
+                               nil)
                }()
-               return exec.Command("echo", "strigger")
+               return exec.Command("echo", striggerCmdLine...)
        }
 
-       go func() {
-               time.Sleep(8 * time.Second)
-               sigChan <- syscall.SIGINT
-       }()
+       // Override squeueCmd
+       defer func(orig func() *exec.Cmd) {
+               squeueCmd = orig
+       }(squeueCmd)
+       squeueCmd = func() *exec.Cmd {
+               return exec.Command("echo")
+       }
 
        // There should be no queued containers now
        params := arvadosclient.Dict{
                "filters": [][]string{[]string{"state", "=", "Queued"}},
        }
-       var containers ContainerList
-       err := arv.List("containers", params, &containers)
+       var containers dispatch.ContainerList
+       err = arv.List("containers", params, &containers)
        c.Check(err, IsNil)
        c.Check(len(containers.Items), Equals, 1)
 
-       err = doMain()
-       c.Check(err, IsNil)
+       echo := "echo"
+       crunchRunCommand = &echo
+       finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+       finishCommand = &finishCmd
+
+       doneProcessing := make(chan struct{})
+       dispatcher := dispatch.Dispatcher{
+               Arv:          arv,
+               PollInterval: time.Duration(1) * time.Second,
+               RunContainer: func(dispatcher *dispatch.Dispatcher,
+                       container dispatch.Container,
+                       status chan dispatch.Container) {
+                       go func() {
+                               time.Sleep(1)
+                               dispatcher.UpdateState(container.UUID, dispatch.Running)
+                               dispatcher.UpdateState(container.UUID, dispatch.Complete)
+                       }()
+                       run(dispatcher, container, status)
+                       doneProcessing <- struct{}{}
+               },
+               DoneProcessing: doneProcessing}
+
+       err = dispatcher.RunDispatcher()
+       c.Assert(err, IsNil)
 
        item := containers.Items[0]
        sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
                fmt.Sprintf("--job-name=%s", item.UUID),
-               fmt.Sprintf("--mem-per-cpu=%s", strconv.Itoa(int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576))))),
-               fmt.Sprintf("--cpus-per-task=%s", strconv.Itoa(int(item.RuntimeConstraints["vcpus"])))}
+               fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
+               fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
+               fmt.Sprintf("--priority=%d", item.Priority)}
        c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
-       c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
+       c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
                "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
 
        // There should be no queued containers now
@@ -133,7 +151,7 @@ func (s *TestSuite) Test_doMain(c *C) {
        c.Check(len(containers.Items), Equals, 0)
 
        // Previously "Queued" container should now be in "Complete" state
-       var container Container
+       var container dispatch.Container
        err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
        c.Check(err, IsNil)
        c.Check(container.State, Equals, "Complete")
@@ -144,7 +162,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
        apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
        apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-       testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+       testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
 }
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
@@ -153,7 +171,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
        api := httptest.NewServer(&apiStub)
        defer api.Close()
 
-       arv = arvadosclient.ArvadosClient{
+       arv := arvadosclient.ArvadosClient{
                Scheme:    "http",
                ApiServer: api.URL[7:],
                ApiToken:  "abc123",
@@ -165,14 +183,36 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
        log.SetOutput(buf)
        defer log.SetOutput(os.Stderr)
 
+       crunchRunCommand = &crunchCmd
+       finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+       finishCommand = &finishCmd
+
+       doneProcessing := make(chan struct{})
+       dispatcher := dispatch.Dispatcher{
+               Arv:          arv,
+               PollInterval: time.Duration(1) * time.Second,
+               RunContainer: func(dispatcher *dispatch.Dispatcher,
+                       container dispatch.Container,
+                       status chan dispatch.Container) {
+                       go func() {
+                               time.Sleep(1)
+                               dispatcher.UpdateState(container.UUID, dispatch.Running)
+                               dispatcher.UpdateState(container.UUID, dispatch.Complete)
+                       }()
+                       run(dispatcher, container, status)
+                       doneProcessing <- struct{}{}
+               },
+               DoneProcessing: doneProcessing}
+
        go func() {
                for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
                        time.Sleep(100 * time.Millisecond)
                }
-               sigChan <- syscall.SIGTERM
+               dispatcher.DoneProcessing <- struct{}{}
        }()
 
-       runQueuedContainers(2, 1, crunchCmd, crunchCmd)
+       err := dispatcher.RunDispatcher()
+       c.Assert(err, IsNil)
 
        c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }