Merge branch 'master' into 11454-wb-federated-search
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Tue, 30 Jan 2018 18:29:50 +0000 (15:29 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Tue, 30 Jan 2018 18:29:50 +0000 (15:29 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

30 files changed:
sdk/cwl/tests/arvados-tests.sh
sdk/cwl/tests/hg19/hg19.fa [new file with mode: 0644]
sdk/cwl/tests/hg19/hg19.fa.amb [new file with mode: 0644]
sdk/cwl/tests/hg19/hg19.fa.ann [new file with mode: 0644]
sdk/cwl/tests/hg19/hg19.fa.fai [new file with mode: 0644]
sdk/go/arvadostest/fixtures.go
services/api/app/controllers/arvados/v1/schema_controller.rb
services/api/test/fixtures/users.yml
services/api/test/functional/arvados/v1/schema_controller_test.rb
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/slurm.go [new file with mode: 0644]
services/crunch-dispatch-slurm/squeue.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/fuse/arvados_fuse/fusedir.py
tools/arvbox/lib/arvbox/docker/Dockerfile.base
tools/arvbox/lib/arvbox/docker/Dockerfile.demo
tools/arvbox/lib/arvbox/docker/crunch-setup.sh
tools/arvbox/lib/arvbox/docker/go-setup.sh [new file with mode: 0644]
tools/arvbox/lib/arvbox/docker/keep-setup.sh
tools/arvbox/lib/arvbox/docker/service/arv-git-httpd/run-service
tools/arvbox/lib/arvbox/docker/service/composer/run-service
tools/arvbox/lib/arvbox/docker/service/crunch-dispatch-local/run-service
tools/arvbox/lib/arvbox/docker/service/keep-web/run-service
tools/arvbox/lib/arvbox/docker/service/keepproxy/run-service
tools/arvbox/lib/arvbox/docker/service/postgres/run
tools/arvbox/lib/arvbox/docker/service/websockets/run-service
tools/sync-groups/sync-groups.go
tools/sync-groups/sync-groups_test.go

index 26b31615ea41950a7f3f13535571b2755981eb7a..d3c1e90637d5419320b0115b386780d6321d9975 100755 (executable)
@@ -6,4 +6,7 @@
 if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
     arv-put --portable-data-hash testdir/*
 fi
+if ! arv-get f225e6259bdd63bc7240599648dde9f1+97 > /dev/null ; then
+    arv-put --portable-data-hash hg19/*
+fi
 exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum
diff --git a/sdk/cwl/tests/hg19/hg19.fa b/sdk/cwl/tests/hg19/hg19.fa
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/sdk/cwl/tests/hg19/hg19.fa.amb b/sdk/cwl/tests/hg19/hg19.fa.amb
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/sdk/cwl/tests/hg19/hg19.fa.ann b/sdk/cwl/tests/hg19/hg19.fa.ann
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/sdk/cwl/tests/hg19/hg19.fa.fai b/sdk/cwl/tests/hg19/hg19.fa.fai
new file mode 100644 (file)
index 0000000..e69de29
index 5e530658abe96534d9ed3488a975a85a778af16d..d057c09b227e9f375d2b3d04e95d9327044c4f33 100644 (file)
@@ -13,6 +13,7 @@ const (
        DataManagerToken        = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1"
        ManagementToken         = "jg3ajndnq63sywcd50gbs5dskdc9ckkysb0nsqmfz08nwf17nl"
        ActiveUserUUID          = "zzzzz-tpzed-xurymjxw79nv3jz"
+       FederatedActiveUserUUID = "zbbbb-tpzed-xurymjxw79nv3jz"
        SpectatorUserUUID       = "zzzzz-tpzed-l1s2piq4t4mps8r"
        UserAgreementCollection = "zzzzz-4zz18-uukreo9rbgwsujr" // user_agreement_in_anonymously_accessible_project
        FooCollection           = "zzzzz-4zz18-fy296fx3hot09f7"
index d4be3c8093fee71692d5b1ed7b2d5fd57c96e44d..91685f59988bf1a852a7a25a2aa46f47a3f32300 100644 (file)
@@ -44,6 +44,7 @@ class Arvados::V1::SchemaController < ApplicationController
         rootUrl: root_url,
         servicePath: "arvados/v1/",
         batchPath: "batch",
+        uuidPrefix: Rails.application.config.uuid_prefix,
         defaultTrashLifetime: Rails.application.config.default_trash_lifetime,
         blobSignatureTtl: Rails.application.config.blob_signature_ttl,
         maxRequestSize: Rails.application.config.max_request_size,
index 808795213351870986689299f3a9cbcbf087265c..8fb800c5f94f8a93bdc2f3990282ea76df7bb51b 100644 (file)
@@ -84,6 +84,22 @@ active:
       role: Computational biologist
     getting_started_shown: 2015-03-26 12:34:56.789000000 Z
 
+federated_active:
+  owner_uuid: zzzzz-tpzed-000000000000000
+  uuid: zbbbb-tpzed-xurymjxw79nv3jz
+  email: zbbbb-active-user@arvados.local
+  first_name: Active
+  last_name: User
+  identity_url: https://active-user.openid.local
+  is_active: true
+  is_admin: false
+  username: federatedactive
+  prefs:
+    profile:
+      organization: example.com
+      role: Computational biologist
+    getting_started_shown: 2015-03-26 12:34:56.789000000 Z
+
 project_viewer:
   owner_uuid: zzzzz-tpzed-000000000000000
   uuid: zzzzz-tpzed-projectviewer1a
index 235b78e332f337c0e618da27709517ee9c29af3f..c15060d1a9847cf33f774399b6decf7ff8f96b45 100644 (file)
@@ -33,6 +33,7 @@ class Arvados::V1::SchemaControllerTest < ActionController::TestCase
     assert_match(/^[0-9a-f]+(-modified)?$/, discovery_doc['source_version'])
     assert_equal discovery_doc['websocketUrl'], Rails.application.config.websocket_address
     assert_equal discovery_doc['workbenchUrl'], Rails.application.config.workbench_address
+    assert_equal('zzzzz', discovery_doc['uuidPrefix'])
   end
 
   test "discovery document overrides source_version with config" do
index 3c89103f38dbc1cd094047d173c10bfe0b28f08e..ae2ca58421d3f3a58f0a4a0f28cbdc2beb56e6af 100644 (file)
@@ -7,14 +7,12 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
-       "bytes"
        "context"
        "flag"
        "fmt"
        "log"
        "math"
        "os"
-       "os/exec"
        "regexp"
        "strings"
        "time"
@@ -43,9 +41,12 @@ type Config struct {
 
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
+
+       slurm Slurm
 }
 
 func main() {
+       theConfig.slurm = &slurmCLI{}
        err := doMain()
        if err != nil {
                log.Fatal(err)
@@ -175,8 +176,7 @@ func niceness(priority int) int {
        return (1000 - priority) * 10
 }
 
-// sbatchCmd
-func sbatchFunc(container arvados.Container) *exec.Cmd {
+func sbatchArgs(container arvados.Container) []string {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
 
        var disk int64
@@ -198,61 +198,22 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
 
-       return exec.Command("sbatch", sbatchArgs...)
-}
-
-// scancelCmd
-func scancelFunc(container arvados.Container) *exec.Cmd {
-       return exec.Command("scancel", "--name="+container.UUID)
-}
-
-// scontrolCmd
-func scontrolFunc(container arvados.Container) *exec.Cmd {
-       return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+       return sbatchArgs
 }
 
-// Wrap these so that they can be overridden by tests
-var sbatchCmd = sbatchFunc
-var scancelCmd = scancelFunc
-var scontrolCmd = scontrolFunc
-
-// Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
-       cmd := sbatchCmd(container)
-
-       // Send a tiny script on stdin to execute the crunch-run
-       // command (slurm requires this to be a #! script)
-
        // append() here avoids modifying crunchRunCommand's
        // underlying array, which is shared with other goroutines.
-       args := append([]string(nil), crunchRunCommand...)
-       args = append(args, container.UUID)
-       cmd.Stdin = strings.NewReader(execScript(args))
-
-       var stdout, stderr bytes.Buffer
-       cmd.Stdout = &stdout
-       cmd.Stderr = &stderr
+       crArgs := append([]string(nil), crunchRunCommand...)
+       crArgs = append(crArgs, container.UUID)
+       crScript := strings.NewReader(execScript(crArgs))
 
-       // Mutex between squeue sync and running sbatch or scancel.
        sqCheck.L.Lock()
        defer sqCheck.L.Unlock()
 
-       log.Printf("exec sbatch %+q", cmd.Args)
-       err := cmd.Run()
-
-       switch err.(type) {
-       case nil:
-               log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
-               return nil
-
-       case *exec.ExitError:
-               dispatcher.Unlock(container.UUID)
-               return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
-
-       default:
-               dispatcher.Unlock(container.UUID)
-               return fmt.Errorf("exec failed: %v", err)
-       }
+       sbArgs := sbatchArgs(container)
+       log.Printf("running sbatch %+q", sbArgs)
+       return theConfig.slurm.Batch(crScript, sbArgs)
 }
 
 // Submit a container to the slurm queue (or resume monitoring if it's
@@ -313,10 +274,8 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
                        } else if updated.Priority == 0 {
                                log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
                                scancel(ctr)
-                       } else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
-                               // dynamically adjust priority
-                               log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
-                               scontrolUpdate(updated)
+                       } else {
+                               renice(updated)
                        }
                }
        }
@@ -324,12 +283,11 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
 
 func scancel(ctr arvados.Container) {
        sqCheck.L.Lock()
-       cmd := scancelCmd(ctr)
-       msg, err := cmd.CombinedOutput()
+       err := theConfig.slurm.Cancel(ctr.UUID)
        sqCheck.L.Unlock()
 
        if err != nil {
-               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               log.Printf("scancel: %s", err)
                time.Sleep(time.Second)
        } else if sqCheck.HasUUID(ctr.UUID) {
                log.Printf("container %s is still in squeue after scancel", ctr.UUID)
@@ -337,17 +295,24 @@ func scancel(ctr arvados.Container) {
        }
 }
 
-func scontrolUpdate(ctr arvados.Container) {
+func renice(ctr arvados.Container) {
+       nice := niceness(ctr.Priority)
+       oldnice := sqCheck.GetNiceness(ctr.UUID)
+       if nice == oldnice || oldnice == -1 {
+               return
+       }
+       log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
        sqCheck.L.Lock()
-       cmd := scontrolCmd(ctr)
-       msg, err := cmd.CombinedOutput()
+       err := theConfig.slurm.Renice(ctr.UUID, nice)
        sqCheck.L.Unlock()
 
        if err != nil {
-               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               log.Printf("renice: %s", err)
                time.Sleep(time.Second)
-       } else if sqCheck.HasUUID(ctr.UUID) {
-               log.Printf("Container %s priority is now %v, niceness is now %v",
+               return
+       }
+       if sqCheck.HasUUID(ctr.UUID) {
+               log.Printf("container %s has arvados priority %d, slurm nice %d",
                        ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
        }
 }
index a823755379574155420e465b2ec2b72f234a3024..830976d66bdd9e3dbce855936a241b7b83ec8e7b 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "bytes"
        "context"
+       "errors"
        "fmt"
        "io"
        "io/ioutil"
@@ -64,51 +65,55 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
        arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) integrationTest(c *C,
-       newSqueueCmd func() *exec.Cmd,
-       newScancelCmd func(arvados.Container) *exec.Cmd,
-       newSbatchCmd func(arvados.Container) *exec.Cmd,
-       newScontrolCmd func(arvados.Container) *exec.Cmd,
-       sbatchCmdComps []string,
-       runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
-       arvadostest.ResetEnv()
+type slurmFake struct {
+       didBatch  [][]string
+       didCancel []string
+       didRenice [][]string
+       queue     string
+       // If non-nil, run this func during the 2nd+ call to Cancel()
+       onCancel func()
+       // Error returned by Batch()
+       errBatch error
+}
 
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, IsNil)
+func (sf *slurmFake) Batch(script io.Reader, args []string) error {
+       sf.didBatch = append(sf.didBatch, args)
+       return sf.errBatch
+}
 
-       var sbatchCmdLine []string
+func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd {
+       return exec.Command("echo", sf.queue)
+}
 
-       // Override sbatchCmd
-       defer func(orig func(arvados.Container) *exec.Cmd) {
-               sbatchCmd = orig
-       }(sbatchCmd)
+func (sf *slurmFake) Renice(name string, nice int) error {
+       sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
+       return nil
+}
 
-       if newSbatchCmd != nil {
-               sbatchCmd = newSbatchCmd
-       } else {
-               sbatchCmd = func(container arvados.Container) *exec.Cmd {
-                       sbatchCmdLine = sbatchFunc(container).Args
-                       return exec.Command("sh")
-               }
+func (sf *slurmFake) Cancel(name string) error {
+       sf.didCancel = append(sf.didCancel, name)
+       if len(sf.didCancel) == 1 {
+               // simulate error on first attempt
+               return errors.New("something terrible happened")
+       }
+       if sf.onCancel != nil {
+               sf.onCancel()
        }
+       return nil
+}
 
-       // Override squeueCmd
-       defer func(orig func() *exec.Cmd) {
-               squeueCmd = orig
-       }(squeueCmd)
-       squeueCmd = newSqueueCmd
+func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+       expectBatch [][]string,
+       runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+       arvadostest.ResetEnv()
 
-       // Override scancel
-       defer func(orig func(arvados.Container) *exec.Cmd) {
-               scancelCmd = orig
-       }(scancelCmd)
-       scancelCmd = newScancelCmd
+       arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, IsNil)
 
-       // Override scontrol
-       defer func(orig func(arvados.Container) *exec.Cmd) {
-               scontrolCmd = orig
-       }(scontrolCmd)
-       scontrolCmd = newScontrolCmd
+       defer func(orig Slurm) {
+               theConfig.slurm = orig
+       }(theConfig.slurm)
+       theConfig.slurm = slurm
 
        // There should be one queued container
        params := arvadosclient.Dict{
@@ -130,6 +135,7 @@ func (s *TestSuite) integrationTest(c *C,
                RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
                        go func() {
                                runContainer(disp, ctr)
+                               slurm.queue = ""
                                doneRun <- struct{}{}
                        }()
                        run(disp, ctr, status)
@@ -145,7 +151,7 @@ func (s *TestSuite) integrationTest(c *C,
 
        sqCheck.Stop()
 
-       c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+       c.Check(slurm.didBatch, DeepEquals, expectBatch)
 
        // There should be no queued containers now
        err = arv.List("containers", params, &containers)
@@ -160,77 +166,47 @@ func (s *TestSuite) integrationTest(c *C,
 }
 
 func (s *TestSuite) TestIntegrationNormal(c *C) {
-       done := false
        container := s.integrationTest(c,
-               func() *exec.Cmd {
-                       if done {
-                               return exec.Command("true")
-                       } else {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-                       }
-               },
-               nil,
+               &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
                nil,
-               nil,
-               []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        time.Sleep(3 * time.Second)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
-                       done = true
                })
        c.Check(container.State, Equals, arvados.ContainerStateComplete)
 }
 
 func (s *TestSuite) TestIntegrationCancel(c *C) {
-       var cmd *exec.Cmd
-       var scancelCmdLine []string
-       attempt := 0
-
+       slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+       readyToCancel := make(chan bool)
+       slurm.onCancel = func() { <-readyToCancel }
        container := s.integrationTest(c,
-               func() *exec.Cmd {
-                       if cmd != nil && cmd.ProcessState != nil {
-                               return exec.Command("true")
-                       } else {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-                       }
-               },
-               func(container arvados.Container) *exec.Cmd {
-                       if attempt++; attempt == 1 {
-                               return exec.Command("false")
-                       } else {
-                               scancelCmdLine = scancelFunc(container).Args
-                               cmd = exec.Command("echo")
-                               return cmd
-                       }
-               },
-               nil,
+               slurm,
                nil,
-               []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
-                       time.Sleep(1 * time.Second)
+                       time.Sleep(time.Second)
                        dispatcher.Arv.Update("containers", container.UUID,
                                arvadosclient.Dict{
                                        "container": arvadosclient.Dict{"priority": 0}},
                                nil)
+                       readyToCancel <- true
+                       close(readyToCancel)
                })
        c.Check(container.State, Equals, arvados.ContainerStateCancelled)
-       c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
+       c.Check(len(slurm.didCancel) > 1, Equals, true)
+       c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
 }
 
 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
-       container := s.integrationTest(c,
-               func() *exec.Cmd { return exec.Command("echo") },
-               nil,
-               nil,
-               nil,
-               []string{"sbatch",
+       container := s.integrationTest(c, &slurmFake{},
+               [][]string{{
                        fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
                        fmt.Sprintf("--mem=%d", 11445),
                        fmt.Sprintf("--cpus-per-task=%d", 4),
                        fmt.Sprintf("--tmp=%d", 45777),
-                       fmt.Sprintf("--nice=%d", 9990)},
+                       fmt.Sprintf("--nice=%d", 9990)}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        time.Sleep(3 * time.Second)
@@ -241,13 +217,8 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
 
 func (s *TestSuite) TestSbatchFail(c *C) {
        container := s.integrationTest(c,
-               func() *exec.Cmd { return exec.Command("echo") },
-               nil,
-               func(container arvados.Container) *exec.Cmd {
-                       return exec.Command("false")
-               },
-               nil,
-               []string(nil),
+               &slurmFake{errBatch: errors.New("something terrible happened")},
+               [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
@@ -387,71 +358,47 @@ func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
 }
 
 func testSbatchFuncWithArgs(c *C, args []string) {
+       defer func() { theConfig.SbatchArguments = nil }()
        theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
 
        container := arvados.Container{
                UUID:               "123",
                RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
                Priority:           1}
-       sbatchCmd := sbatchFunc(container)
 
        var expected []string
-       expected = append(expected, "sbatch")
        expected = append(expected, theConfig.SbatchArguments...)
        expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
-
-       c.Check(sbatchCmd.Args, DeepEquals, expected)
+       c.Check(sbatchArgs(container), DeepEquals, expected)
 }
 
 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
-       theConfig.SbatchArguments = nil
        container := arvados.Container{
                UUID:                 "123",
                RuntimeConstraints:   arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
                SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
                Priority:             1}
-       sbatchCmd := sbatchFunc(container)
 
-       var expected []string
-       expected = append(expected, "sbatch")
-       expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990", "--partition=blurb,b2")
-
-       c.Check(sbatchCmd.Args, DeepEquals, expected)
+       c.Check(sbatchArgs(container), DeepEquals, []string{
+               "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
+               "--partition=blurb,b2",
+       })
 }
 
 func (s *TestSuite) TestIntegrationChangePriority(c *C) {
-       var scontrolCmdLine []string
-       step := 0
-
-       container := s.integrationTest(c,
-               func() *exec.Cmd {
-                       if step == 0 {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-                       } else if step == 1 {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 4000 100")
-                       } else {
-                               return exec.Command("echo")
-                       }
-               },
-               func(arvados.Container) *exec.Cmd { return exec.Command("true") },
-               nil,
-               func(container arvados.Container) *exec.Cmd {
-                       scontrolCmdLine = scontrolFunc(container).Args
-                       step = 1
-                       return exec.Command("true")
-               },
-               []string(nil),
+       slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+       container := s.integrationTest(c, slurm, nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
-                       time.Sleep(1 * time.Second)
+                       time.Sleep(time.Second)
                        dispatcher.Arv.Update("containers", container.UUID,
                                arvadosclient.Dict{
                                        "container": arvadosclient.Dict{"priority": 600}},
                                nil)
-                       time.Sleep(1 * time.Second)
-                       step = 2
+                       time.Sleep(time.Second)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
                })
        c.Check(container.State, Equals, arvados.ContainerStateComplete)
-       c.Check(scontrolCmdLine, DeepEquals, []string{"scontrol", "update", "JobName=zzzzz-dz642-queuedcontainer", "Nice=4000"})
+       c.Assert(len(slurm.didRenice), Not(Equals), 0)
+       c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
 }
diff --git a/services/crunch-dispatch-slurm/slurm.go b/services/crunch-dispatch-slurm/slurm.go
new file mode 100644 (file)
index 0000000..bd19377
--- /dev/null
@@ -0,0 +1,73 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "fmt"
+       "io"
+       "log"
+       "os/exec"
+       "strings"
+)
+
+type Slurm interface {
+       Cancel(name string) error
+       Renice(name string, nice int) error
+       QueueCommand(args []string) *exec.Cmd
+       Batch(script io.Reader, args []string) error
+}
+
+type slurmCLI struct{}
+
+func (scli *slurmCLI) Batch(script io.Reader, args []string) error {
+       return scli.run(script, "sbatch", args)
+}
+
+func (scli *slurmCLI) Cancel(name string) error {
+       for _, args := range [][]string{
+               // If the slurm job hasn't started yet, remove it from
+               // the queue.
+               {"--state=pending"},
+               // If the slurm job has started, send SIGTERM. If we
+               // cancel a running job without a --signal argument,
+               // slurm will send SIGTERM and then (after some
+               // site-configured interval) SIGKILL. This would kill
+               // crunch-run without stopping the container, which we
+               // don't want.
+               {"--batch", "--signal=TERM", "--state=running"},
+               {"--batch", "--signal=TERM", "--state=suspended"},
+       } {
+               err := scli.run(nil, "scancel", append([]string{"--name=" + name}, args...))
+               if err != nil {
+                       // scancel exits 0 if no job matches the given
+                       // name and state. Any error from scancel here
+                       // really indicates something is wrong.
+                       return err
+               }
+       }
+       return nil
+}
+
+func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd {
+       return exec.Command("squeue", args...)
+}
+
+func (scli *slurmCLI) Renice(name string, nice int) error {
+       return scli.run(nil, "scontrol", []string{"update", "JobName=" + name, fmt.Sprintf("Nice=%d", nice)})
+}
+
+func (scli *slurmCLI) run(stdin io.Reader, prog string, args []string) error {
+       cmd := exec.Command(prog, args...)
+       cmd.Stdin = stdin
+       out, err := cmd.CombinedOutput()
+       outTrim := strings.TrimSpace(string(out))
+       if err != nil || len(out) > 0 {
+               log.Printf("%q %q: %q", cmd.Path, cmd.Args, outTrim)
+       }
+       if err != nil {
+               err = fmt.Errorf("%s: %s (%q)", cmd.Path, err, outTrim)
+       }
+       return err
+}
index 819c2d2510f992ab75c18249d8b46d6c13828d6d..5ecfe8ff2fc049201eb27c8e58c7a02eb84cbbb4 100644 (file)
@@ -8,7 +8,6 @@ import (
        "bytes"
        "fmt"
        "log"
-       "os/exec"
        "strings"
        "sync"
        "time"
@@ -29,12 +28,6 @@ type SqueueChecker struct {
        sync.Cond
 }
 
-func squeueFunc() *exec.Cmd {
-       return exec.Command("squeue", "--all", "--format=%j %y %Q")
-}
-
-var squeueCmd = squeueFunc
-
 // HasUUID checks if a given container UUID is in the slurm queue.
 // This does not run squeue directly, but instead blocks until woken
 // up by next successful update of squeue.
@@ -84,7 +77,7 @@ func (sqc *SqueueChecker) check() {
        sqc.L.Lock()
        defer sqc.L.Unlock()
 
-       cmd := squeueCmd()
+       cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
        stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
        cmd.Stdout, cmd.Stderr = stdout, stderr
        if err := cmd.Run(); err != nil {
index b480c068c597ecc178b08be7278af2edbd72bce9..705bea486b21797ea4c6523ba28e03dda4ad269c 100644 (file)
@@ -6,7 +6,6 @@ package main
 
 import (
        "bytes"
-       "context"
        "encoding/json"
        "errors"
        "flag"
@@ -33,6 +32,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "golang.org/x/net/context"
 
        dockertypes "github.com/docker/docker/api/types"
        dockercontainer "github.com/docker/docker/api/types/container"
@@ -75,60 +75,13 @@ type ThinDockerClient interface {
        ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
                networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
        ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
-       ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+       ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
        ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
        ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
        ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
        ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
 }
 
-// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
-// that executes the docker requests on dockerclient.Client
-type ThinDockerClientProxy struct {
-       Docker *dockerclient.Client
-}
-
-// ContainerAttach invokes dockerclient.Client.ContainerAttach
-func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
-       return proxy.Docker.ContainerAttach(ctx, container, options)
-}
-
-// ContainerCreate invokes dockerclient.Client.ContainerCreate
-func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
-       networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
-       return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
-}
-
-// ContainerStart invokes dockerclient.Client.ContainerStart
-func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
-       return proxy.Docker.ContainerStart(ctx, container, options)
-}
-
-// ContainerStop invokes dockerclient.Client.ContainerStop
-func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
-       return proxy.Docker.ContainerStop(ctx, container, timeout)
-}
-
-// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
-       return proxy.Docker.ContainerWait(ctx, container, condition)
-}
-
-// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
-func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
-       return proxy.Docker.ImageInspectWithRaw(ctx, image)
-}
-
-// ImageLoad invokes dockerclient.Client.ImageLoad
-func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
-       return proxy.Docker.ImageLoad(ctx, input, quiet)
-}
-
-// ImageRemove invokes dockerclient.Client.ImageRemove
-func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
-       return proxy.Docker.ImageRemove(ctx, image, options)
-}
-
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
@@ -161,10 +114,12 @@ type ContainerRunner struct {
        ArvMountExit   chan error
        finalState     string
 
-       statLogger   io.WriteCloser
-       statReporter *crunchstat.Reporter
-       statInterval time.Duration
-       cgroupRoot   string
+       statLogger       io.WriteCloser
+       statReporter     *crunchstat.Reporter
+       hoststatLogger   io.WriteCloser
+       hoststatReporter *crunchstat.Reporter
+       statInterval     time.Duration
+       cgroupRoot       string
        // What we expect the container's cgroup parent to be.
        expectCgroupParent string
        // What we tell docker to use as the container's cgroup
@@ -180,7 +135,6 @@ type ContainerRunner struct {
        setCgroupParent string
 
        cStateLock sync.Mutex
-       cStarted   bool // StartContainer() succeeded
        cCancelled bool // StopContainer() invoked
 
        enableNetwork string // one of "default" or "always"
@@ -197,11 +151,10 @@ func (runner *ContainerRunner) setupSignals() {
        signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
        go func(sig chan os.Signal) {
-               s := <-sig
-               if s != nil {
-                       runner.CrunchLog.Printf("Caught signal %v", s)
+               for s := range sig {
+                       runner.CrunchLog.Printf("caught signal: %v", s)
+                       runner.stop()
                }
-               runner.stop()
        }(runner.SigChan)
 }
 
@@ -209,25 +162,20 @@ func (runner *ContainerRunner) setupSignals() {
 func (runner *ContainerRunner) stop() {
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
-       if runner.cCancelled {
+       if runner.ContainerID == "" {
                return
        }
        runner.cCancelled = true
-       if runner.cStarted {
-               timeout := time.Duration(10)
-               err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
-               if err != nil {
-                       runner.CrunchLog.Printf("StopContainer failed: %s", err)
-               }
-               // Suppress multiple calls to stop()
-               runner.cStarted = false
+       runner.CrunchLog.Printf("removing container")
+       err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+       if err != nil {
+               runner.CrunchLog.Printf("error removing container: %s", err)
        }
 }
 
 func (runner *ContainerRunner) stopSignals() {
        if runner.SigChan != nil {
                signal.Stop(runner.SigChan)
-               close(runner.SigChan)
        }
 }
 
@@ -597,53 +545,74 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
        // Handle docker log protocol
        // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
+       defer close(runner.loggingDone)
 
        header := make([]byte, 8)
-       for {
-               _, readerr := io.ReadAtLeast(containerReader, header, 8)
-
-               if readerr == nil {
-                       readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
-                       if header[0] == 1 {
-                               // stdout
-                               _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
-                       } else {
-                               // stderr
-                               _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
+       var err error
+       for err == nil {
+               _, err = io.ReadAtLeast(containerReader, header, 8)
+               if err != nil {
+                       if err == io.EOF {
+                               err = nil
                        }
+                       break
                }
+               readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
+               if header[0] == 1 {
+                       // stdout
+                       _, err = io.CopyN(runner.Stdout, containerReader, readsize)
+               } else {
+                       // stderr
+                       _, err = io.CopyN(runner.Stderr, containerReader, readsize)
+               }
+       }
 
-               if readerr != nil {
-                       if readerr != io.EOF {
-                               runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
-                       }
-
-                       closeerr := runner.Stdout.Close()
-                       if closeerr != nil {
-                               runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
-                       }
+       if err != nil {
+               runner.CrunchLog.Printf("error reading docker logs: %v", err)
+       }
 
-                       closeerr = runner.Stderr.Close()
-                       if closeerr != nil {
-                               runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
-                       }
+       err = runner.Stdout.Close()
+       if err != nil {
+               runner.CrunchLog.Printf("error closing stdout logs: %v", err)
+       }
 
-                       if runner.statReporter != nil {
-                               runner.statReporter.Stop()
-                               closeerr = runner.statLogger.Close()
-                               if closeerr != nil {
-                                       runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
-                               }
-                       }
+       err = runner.Stderr.Close()
+       if err != nil {
+               runner.CrunchLog.Printf("error closing stderr logs: %v", err)
+       }
 
-                       runner.loggingDone <- true
-                       close(runner.loggingDone)
-                       return
+       if runner.statReporter != nil {
+               runner.statReporter.Stop()
+               err = runner.statLogger.Close()
+               if err != nil {
+                       runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
 }
 
-func (runner *ContainerRunner) StartCrunchstat() {
+func (runner *ContainerRunner) stopHoststat() error {
+       if runner.hoststatReporter == nil {
+               return nil
+       }
+       runner.hoststatReporter.Stop()
+       err := runner.hoststatLogger.Close()
+       if err != nil {
+               return fmt.Errorf("error closing hoststat logs: %v", err)
+       }
+       return nil
+}
+
+func (runner *ContainerRunner) startHoststat() {
+       runner.hoststatLogger = NewThrottledLogger(runner.NewLogWriter("hoststat"))
+       runner.hoststatReporter = &crunchstat.Reporter{
+               Logger:     log.New(runner.hoststatLogger, "", 0),
+               CgroupRoot: runner.cgroupRoot,
+               PollPeriod: runner.statInterval,
+       }
+       runner.hoststatReporter.Start()
+}
+
+func (runner *ContainerRunner) startCrunchstat() {
        runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
        runner.statReporter = &crunchstat.Reporter{
                CID:          runner.ContainerID,
@@ -973,46 +942,38 @@ func (runner *ContainerRunner) StartContainer() error {
                }
                return fmt.Errorf("could not start container: %v%s", err, advice)
        }
-       runner.cStarted = true
        return nil
 }
 
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() (err error) {
+func (runner *ContainerRunner) WaitFinish() error {
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+       arvMountExit := runner.ArvMountExit
+       for {
+               select {
+               case waitBody := <-waitOk:
+                       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+                       code := int(waitBody.StatusCode)
+                       runner.ExitCode = &code
+
+                       // wait for stdout/stderr to complete
+                       <-runner.loggingDone
+                       return nil
 
-       go func() {
-               <-runner.ArvMountExit
-               if runner.cStarted {
+               case err := <-waitErr:
+                       return fmt.Errorf("container wait: %v", err)
+
+               case <-arvMountExit:
                        runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
                        runner.stop()
+                       // arvMountExit will always be ready now that
+                       // it's closed, but that doesn't interest us.
+                       arvMountExit = nil
                }
-       }()
-
-       var waitBody dockercontainer.ContainerWaitOKBody
-       select {
-       case waitBody = <-waitOk:
-       case err = <-waitErr:
-       }
-
-       // Container isn't running any more
-       runner.cStarted = false
-
-       if err != nil {
-               return fmt.Errorf("container wait: %v", err)
        }
-
-       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
-       code := int(waitBody.StatusCode)
-       runner.ExitCode = &code
-
-       // wait for stdout/stderr to complete
-       <-runner.loggingDone
-
-       return nil
 }
 
 var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
@@ -1191,10 +1152,6 @@ func (runner *ContainerRunner) UploadOutputFile(
 
 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
 func (runner *ContainerRunner) CaptureOutput() error {
-       if runner.finalState != "Complete" {
-               return nil
-       }
-
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
                // Output may have been set directly by the container, so
                // refresh the container record to check.
@@ -1579,6 +1536,7 @@ func (runner *ContainerRunner) Run() (err error) {
                }
 
                checkErr(runner.CaptureOutput())
+               checkErr(runner.stopHoststat())
                checkErr(runner.CommitLogs())
                checkErr(runner.UpdateContainerFinal())
        }()
@@ -1587,9 +1545,8 @@ func (runner *ContainerRunner) Run() (err error) {
        if err != nil {
                return
        }
-
-       // setup signal handling
        runner.setupSignals()
+       runner.startHoststat()
 
        // check for and/or load image
        err = runner.LoadImage()
@@ -1638,7 +1595,7 @@ func (runner *ContainerRunner) Run() (err error) {
        }
        runner.finalState = "Cancelled"
 
-       runner.StartCrunchstat()
+       runner.startCrunchstat()
 
        err = runner.StartContainer()
        if err != nil {
@@ -1647,7 +1604,7 @@ func (runner *ContainerRunner) Run() (err error) {
        }
 
        err = runner.WaitFinish()
-       if err == nil {
+       if err == nil && !runner.IsCancelled() {
                runner.finalState = "Complete"
        }
        return
@@ -1739,10 +1696,8 @@ func main() {
        // API version 1.21 corresponds to Docker 1.9, which is currently the
        // minimum version we want to support.
        docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-       dockerClientProxy := ThinDockerClientProxy{Docker: docker}
-
-       cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
 
+       cr := NewContainerRunner(api, kc, docker, containerId)
        if dockererr != nil {
                cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
                cr.checkBrokenNode(dockererr)
index 4979cf8a0c801ee5bc56e48933e97f736dc3d00d..22989bb2ece510e6f239151b267968fde25f1203 100644 (file)
@@ -7,7 +7,6 @@ package main
 import (
        "bufio"
        "bytes"
-       "context"
        "crypto/md5"
        "encoding/json"
        "errors"
@@ -30,6 +29,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "golang.org/x/net/context"
 
        dockertypes "github.com/docker/docker/api/types"
        dockercontainer "github.com/docker/docker/api/types/container"
@@ -42,11 +42,17 @@ func TestCrunchExec(t *testing.T) {
        TestingT(t)
 }
 
-type TestSuite struct{}
-
 // Gocheck boilerplate
 var _ = Suite(&TestSuite{})
 
+type TestSuite struct {
+       docker *TestDockerClient
+}
+
+func (s *TestSuite) SetUpTest(c *C) {
+       s.docker = NewTestDockerClient()
+}
+
 type ArvTestClient struct {
        Total   int64
        Calls   int
@@ -88,18 +94,18 @@ type TestDockerClient struct {
        logReader   io.ReadCloser
        logWriter   io.WriteCloser
        fn          func(t *TestDockerClient)
-       finish      int
+       exitCode    int
        stop        chan bool
        cwd         string
        env         []string
        api         *ArvTestClient
        realTemp    string
+       calledWait  bool
 }
 
-func NewTestDockerClient(exitCode int) *TestDockerClient {
+func NewTestDockerClient() *TestDockerClient {
        t := &TestDockerClient{}
        t.logReader, t.logWriter = io.Pipe()
-       t.finish = exitCode
        t.stop = make(chan bool, 1)
        t.cwd = "/"
        return t
@@ -131,16 +137,16 @@ func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockerco
 }
 
 func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
-       if t.finish == 3 {
+       if t.exitCode == 3 {
                return errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`)
        }
-       if t.finish == 4 {
+       if t.exitCode == 4 {
                return errors.New(`panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`)
        }
-       if t.finish == 5 {
+       if t.exitCode == 5 {
                return errors.New(`Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`)
        }
-       if t.finish == 6 {
+       if t.exitCode == 6 {
                return errors.New(`Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`)
        }
 
@@ -152,25 +158,24 @@ func (t *TestDockerClient) ContainerStart(ctx context.Context, container string,
        }
 }
 
-func (t *TestDockerClient) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+func (t *TestDockerClient) ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error {
        t.stop <- true
        return nil
 }
 
 func (t *TestDockerClient) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
-       body := make(chan dockercontainer.ContainerWaitOKBody)
+       t.calledWait = true
+       body := make(chan dockercontainer.ContainerWaitOKBody, 1)
        err := make(chan error)
        go func() {
                t.fn(t)
-               body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.finish)}
-               close(body)
-               close(err)
+               body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.exitCode)}
        }()
        return body, err
 }
 
 func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
-       if t.finish == 2 {
+       if t.exitCode == 2 {
                return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
        }
 
@@ -182,7 +187,7 @@ func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string
 }
 
 func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
-       if t.finish == 2 {
+       if t.exitCode == 2 {
                return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
        }
        _, err := io.Copy(ioutil.Discard, input)
@@ -396,8 +401,7 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
 
 func (s *TestSuite) TestLoadImage(c *C) {
        kc := &KeepTestClient{}
-       docker := NewTestDockerClient(0)
-       cr := NewContainerRunner(&ArvTestClient{}, kc, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
        _, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
@@ -512,8 +516,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
        // (2) Keep error
-       docker := NewTestDockerClient(0)
-       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.Container.ContainerImage = hwPDH
 
        err := cr.LoadImage()
@@ -531,8 +534,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
        // (4) Collection doesn't contain image
-       docker := NewTestDockerClient(0)
-       cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.Container.ContainerImage = hwPDH
 
        err := cr.LoadImage()
@@ -572,12 +574,11 @@ func dockerLog(fd byte, msg string) []byte {
 }
 
 func (s *TestSuite) TestRunContainer(c *C) {
-       docker := NewTestDockerClient(0)
-       docker.fn = func(t *TestDockerClient) {
+       s.docker.fn = func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, "Hello world\n"))
                t.logWriter.Close()
        }
-       cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
        var logs TestLogs
        cr.NewLogWriter = logs.NewTestLoggingWriter
@@ -667,18 +668,18 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
 
 // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
 // dress rehearsal of the Run() function, starting from a JSON container record.
-func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
+func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
        rec := arvados.Container{}
        err := json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient(exitCode)
-       docker.fn = fn
-       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+       s.docker.exitCode = exitCode
+       s.docker.fn = fn
+       s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api = &ArvTestClient{Container: rec}
-       docker.api = api
-       cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       s.docker.api = api
+       cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.statInterval = 100 * time.Millisecond
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
@@ -687,7 +688,7 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f
        c.Assert(err, IsNil)
        defer os.RemoveAll(realTemp)
 
-       docker.realTemp = realTemp
+       s.docker.realTemp = realTemp
 
        tempcount := 0
        cr.MkTempDir = func(_ string, prefix string) (string, error) {
@@ -730,7 +731,7 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f
 }
 
 func (s *TestSuite) TestFullRunHello(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -751,7 +752,7 @@ func (s *TestSuite) TestFullRunHello(c *C) {
 }
 
 func (s *TestSuite) TestCrunchstat(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
                "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
                "cwd": ".",
@@ -784,7 +785,7 @@ func (s *TestSuite) TestCrunchstat(c *C) {
 
 func (s *TestSuite) TestNodeInfoLog(c *C) {
        os.Setenv("SLURMD_NODENAME", "compute2")
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
                "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
                "cwd": ".",
@@ -818,7 +819,7 @@ func (s *TestSuite) TestNodeInfoLog(c *C) {
 }
 
 func (s *TestSuite) TestContainerRecordLog(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
                "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
                "cwd": ".",
@@ -841,7 +842,7 @@ func (s *TestSuite) TestContainerRecordLog(c *C) {
 }
 
 func (s *TestSuite) TestFullRunStderr(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -866,7 +867,7 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
 }
 
 func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["pwd"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -887,7 +888,7 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
 }
 
 func (s *TestSuite) TestFullRunSetCwd(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["pwd"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -909,7 +910,7 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
 func (s *TestSuite) TestStopOnSignal(c *C) {
        s.testStopContainer(c, func(cr *ContainerRunner) {
                go func() {
-                       for !cr.cStarted {
+                       for !s.docker.calledWait {
                                time.Sleep(time.Millisecond)
                        }
                        cr.SigChan <- syscall.SIGINT
@@ -943,16 +944,15 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
        err := json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient(0)
-       docker.fn = func(t *TestDockerClient) {
+       s.docker.fn = func(t *TestDockerClient) {
                <-t.stop
                t.logWriter.Write(dockerLog(1, "foo\n"))
                t.logWriter.Close()
        }
-       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+       s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api := &ArvTestClient{Container: rec}
-       cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
        setup(cr)
 
@@ -978,7 +978,7 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
 }
 
 func (s *TestSuite) TestFullRunSetEnv(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $FROBIZ"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -1359,7 +1359,7 @@ func (s *TestSuite) TestStdout(c *C) {
                "runtime_constraints": {}
        }`
 
-       api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1370,17 +1370,16 @@ func (s *TestSuite) TestStdout(c *C) {
 }
 
 // Used by the TestStdoutWithWrongPath*()
-func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
+func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
        rec := arvados.Container{}
        err = json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient(0)
-       docker.fn = fn
-       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+       s.docker.fn = fn
+       s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api = &ArvTestClient{Container: rec}
-       cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
 
@@ -1389,7 +1388,7 @@ func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (ap
 }
 
 func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
-       _, _, err := StdoutErrorRunHelper(c, `{
+       _, _, err := s.stdoutErrorRunHelper(c, `{
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} },
     "output_path": "/tmp"
 }`, func(t *TestDockerClient) {})
@@ -1399,7 +1398,7 @@ func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
 }
 
 func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
-       _, _, err := StdoutErrorRunHelper(c, `{
+       _, _, err := s.stdoutErrorRunHelper(c, `{
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} },
     "output_path": "/tmp"
 }`, func(t *TestDockerClient) {})
@@ -1409,7 +1408,7 @@ func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
 }
 
 func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
-       _, _, err := StdoutErrorRunHelper(c, `{
+       _, _, err := s.stdoutErrorRunHelper(c, `{
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} },
     "output_path": "/tmp"
 }`, func(t *TestDockerClient) {})
@@ -1421,7 +1420,7 @@ func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
 func (s *TestSuite) TestFullRunWithAPI(c *C) {
        os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
        defer os.Unsetenv("ARVADOS_API_HOST")
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -1444,7 +1443,7 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
 func (s *TestSuite) TestFullRunSetOutput(c *C) {
        os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
        defer os.Unsetenv("ARVADOS_API_HOST")
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -1484,7 +1483,7 @@ func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C
 
        extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1519,7 +1518,7 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
                "a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
        }
 
-       api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, runner, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1571,7 +1570,7 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
                "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
        }
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1612,7 +1611,7 @@ func (s *TestSuite) TestOutputSymlinkToInput(c *C) {
                "a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
        }
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                os.Symlink("/keep/foo/sub1file2", t.realTemp+"/2/baz")
                os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/2/baz2")
                os.Symlink("/keep/foo2/subdir1", t.realTemp+"/2/baz3")
@@ -1654,7 +1653,7 @@ func (s *TestSuite) TestOutputError(c *C) {
 
        extraMounts := []string{}
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                os.Symlink("/etc/hosts", t.realTemp+"/2/baz")
                t.logWriter.Close()
        })
@@ -1678,7 +1677,7 @@ func (s *TestSuite) TestOutputSymlinkToOutput(c *C) {
 
        extraMounts := []string{}
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                rf, _ := os.Create(t.realTemp + "/2/realfile")
                rf.Write([]byte("foo"))
                rf.Close()
@@ -1735,7 +1734,7 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
                "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
        }
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1770,7 +1769,7 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
                "runtime_constraints": {}
        }`
 
-       api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1790,7 +1789,7 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
 }
 
 func (s *TestSuite) TestStderrMount(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo hello;exit 1"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1887,7 +1886,7 @@ exec echo killme
        ech := tf.Name()
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1912,7 +1911,7 @@ func (s *TestSuite) TestFullBrokenDocker2(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1935,7 +1934,7 @@ func (s *TestSuite) TestFullBrokenDocker3(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1957,7 +1956,7 @@ func (s *TestSuite) TestBadCommand1(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1979,7 +1978,7 @@ func (s *TestSuite) TestBadCommand2(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -2001,7 +2000,7 @@ func (s *TestSuite) TestBadCommand3(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
index becd66975f676a76b09eb8da6761582fd2e94b5e..34fd594be2dd6ce2793eebe1ac884c92aa8fdd58 100644 (file)
@@ -1028,64 +1028,71 @@ class SharedDirectory(Directory):
         self.current_user = api.users().current().execute(num_retries=num_retries)
         self._poll = True
         self._poll_time = poll_time
+        self._updating_lock = threading.Lock()
 
     @use_counter
     def update(self):
-        with llfuse.lock_released:
-            all_projects = arvados.util.list_all(
-                self.api.groups().list, self.num_retries,
-                filters=[['group_class','=','project']])
-            objects = {}
-            for ob in all_projects:
-                objects[ob['uuid']] = ob
-
-            roots = []
-            root_owners = {}
-            for ob in all_projects:
-                if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
-                    roots.append(ob)
-                    root_owners[ob['owner_uuid']] = True
-
-            lusers = arvados.util.list_all(
-                self.api.users().list, self.num_retries,
-                filters=[['uuid','in', list(root_owners)]])
-            lgroups = arvados.util.list_all(
-                self.api.groups().list, self.num_retries,
-                filters=[['uuid','in', list(root_owners)]])
-
-            users = {}
-            groups = {}
-
-            for l in lusers:
-                objects[l["uuid"]] = l
-            for l in lgroups:
-                objects[l["uuid"]] = l
-
-            contents = {}
-            for r in root_owners:
-                if r in objects:
-                    obr = objects[r]
-                    if obr.get("name"):
-                        contents[obr["name"]] = obr
-                    #elif obr.get("username"):
-                    #    contents[obr["username"]] = obr
-                    elif "first_name" in obr:
-                        contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
-
-
-            for r in roots:
-                if r['owner_uuid'] not in objects:
-                    contents[r['name']] = r
-
-        # end with llfuse.lock_released, re-acquire lock
-
         try:
+            with llfuse.lock_released:
+                self._updating_lock.acquire()
+                if not self.stale():
+                    return
+
+                all_projects = arvados.util.list_all(
+                    self.api.groups().list, self.num_retries,
+                    filters=[['group_class','=','project']],
+                    select=["uuid", "owner_uuid"])
+                objects = {}
+                for ob in all_projects:
+                    objects[ob['uuid']] = ob
+
+                roots = []
+                root_owners = set()
+                current_uuid = self.current_user['uuid']
+                for ob in all_projects:
+                    if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
+                        roots.append(ob['uuid'])
+                        root_owners.add(ob['owner_uuid'])
+
+                lusers = arvados.util.list_all(
+                    self.api.users().list, self.num_retries,
+                    filters=[['uuid','in', list(root_owners)]])
+                lgroups = arvados.util.list_all(
+                    self.api.groups().list, self.num_retries,
+                    filters=[['uuid','in', list(root_owners)+roots]])
+
+                for l in lusers:
+                    objects[l["uuid"]] = l
+                for l in lgroups:
+                    objects[l["uuid"]] = l
+
+                contents = {}
+                for r in root_owners:
+                    if r in objects:
+                        obr = objects[r]
+                        if obr.get("name"):
+                            contents[obr["name"]] = obr
+                        #elif obr.get("username"):
+                        #    contents[obr["username"]] = obr
+                        elif "first_name" in obr:
+                            contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
+
+                for r in roots:
+                    if r in objects:
+                        obr = objects[r]
+                        if obr['owner_uuid'] not in objects:
+                            contents[obr["name"]] = obr
+
+            # end with llfuse.lock_released, re-acquire lock
+
             self.merge(contents.items(),
                        lambda i: i[0],
                        lambda a, i: a.uuid() == i[1]['uuid'],
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
         except Exception:
-            _logger.exception()
+            _logger.exception("arv-mount shared dir error")
+        finally:
+            self._updating_lock.release()
 
     def want_event_subscribe(self):
         return True
index 83d507b62b4931163b73dded52240705ebd2ae70..18d430dd7cd037b958e592533bdc9e2be53aadd1 100644 (file)
@@ -90,6 +90,7 @@ ADD crunch-setup.sh gitolite.rc \
     keep-setup.sh common.sh createusers.sh \
     logger runsu.sh waitforpostgres.sh \
     application_yml_override.py api-setup.sh \
+    go-setup.sh \
     /usr/local/lib/arvbox/
 
 ADD runit /etc/runit
index 80344c16f2ef9bfcf0f97bf14f07a8c1cb97ce73..7cb51edfdc61ab6c9b67b29353eeb2f79e8774eb 100644 (file)
@@ -24,13 +24,16 @@ RUN echo "production" > /var/lib/arvados/workbench_rails_env
 
 RUN chown -R 1000:1000 /usr/src && /usr/local/lib/arvbox/createusers.sh
 
+RUN sudo -u arvbox /var/lib/arvbox/service/composer/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/keep-web/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/sso/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/api/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/workbench/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/doc/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/vm/run-service --only-deps
-RUN sudo -u arvbox /var/lib/arvbox/service/keep-web/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/keepproxy/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/arv-git-httpd/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/crunch-dispatch-local/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/websockets/run-service --only-deps
 RUN sudo -u arvbox /usr/local/lib/arvbox/keep-setup.sh --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/sdk/run-service
index 30ecafb889c7e38acb45380d68c5f8634bcb9c3c..b3ec5cd10441f695522c50500a2e64fd3f6d8f5d 100755 (executable)
@@ -7,16 +7,11 @@ exec 2>&1
 set -eux -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunchstat"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/sdk/go/crunchrunner"
-install bin/crunchstat bin/crunchrunner /usr/local/bin
+install $GOPATH/bin/crunchstat $GOPATH/bin/crunchrunner /usr/local/bin
 
 if test -s /var/lib/arvados/api_rails_env ; then
   RAILS_ENV=$(cat /var/lib/arvados/api_rails_env)
diff --git a/tools/arvbox/lib/arvbox/docker/go-setup.sh b/tools/arvbox/lib/arvbox/docker/go-setup.sh
new file mode 100644 (file)
index 0000000..f068ce6
--- /dev/null
@@ -0,0 +1,16 @@
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+mkdir -p /var/lib/gopath
+cd /var/lib/gopath
+
+export GOPATH=$PWD
+mkdir -p "$GOPATH/src/git.curoverse.com"
+ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
+
+flock /var/lib/gopath/gopath.lock go get -t github.com/kardianos/govendor
+cd "$GOPATH/src/git.curoverse.com/arvados.git"
+flock /var/lib/gopath/gopath.lock go get -v -d ...
+flock /var/lib/gopath/gopath.lock "$GOPATH/bin/govendor" sync
index 5da2cfac440c6a4aafa20359ea718478d12816b3..8ef66a60687ce817e46308311dbcd4d80c6691ad 100755 (executable)
@@ -8,15 +8,10 @@ sleep 2
 set -eux -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keepstore"
-install bin/keepstore /usr/local/bin
+install $GOPATH/bin/keepstore /usr/local/bin
 
 if test "$1" = "--only-deps" ; then
     exit
index 806f9cd37a375e17ffeacc0943d03beb8ded614e..1383f7140f4ed961637d8c8ef160bfb3b575d317 100755 (executable)
@@ -7,15 +7,10 @@ exec 2>&1
 set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/arv-git-httpd"
-install bin/arv-git-httpd /usr/local/bin
+install $GOPATH/bin/arv-git-httpd /usr/local/bin
 
 if test "$1" = "--only-deps" ; then
     exit
index ac4441de099ab37e1a0a36109807cf9e127db507..abd350f073c0f449b37b25362185b9b24a963136 100755 (executable)
@@ -10,13 +10,13 @@ set -ex -o pipefail
 
 cd /usr/src/composer
 
-npm -d install yarn
-
-PATH=$PATH:/usr/src/composer/node_modules/.bin
+npm -d install --prefix /usr/local --global yarn
 
 yarn install
 
-if test "$1" != "--only-deps" ; then
-    echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/composer.yml
-    exec ng serve --host 0.0.0.0 --port 4200 --env=webdev
+if test "$1" = "--only-deps" ; then
+    exit
 fi
+
+echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/composer.yml
+exec node_modules/.bin/ng serve --host 0.0.0.0 --port 4200 --env=webdev
index e7a302682155b751fe46a1151b5673672327f3d7..decbccddeeecce662a0e353da0dd01c26ce91021 100755 (executable)
@@ -4,19 +4,18 @@
 # SPDX-License-Identifier: AGPL-3.0
 
 exec 2>&1
-set -eux -o pipefail
+set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunch-run"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunch-dispatch-local"
-install bin/crunch-run bin/crunch-dispatch-local /usr/local/bin
+install $GOPATH/bin/crunch-run $GOPATH/bin/crunch-dispatch-local /usr/local/bin
+
+if test "$1" = "--only-deps" ; then
+    exit
+fi
 
 cat > /usr/local/bin/crunch-run.sh <<EOF
 #!/bin/sh
index ee985a0e51b512d3263dcf7d72b7b4cdc530bd9f..70f2470b9fe7decd8a03efdfb09d5da8ab52f372 100755 (executable)
@@ -7,15 +7,10 @@ exec 2>&1
 set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keep-web"
-install bin/keep-web /usr/local/bin
+install $GOPATH/bin/keep-web /usr/local/bin
 
 if test "$1" = "--only-deps" ; then
     exit
index dae2dfdd786589a3023e0143a5f6c7f8b3711c0a..199247b7a0e2bfc6dcabdd929dc5177275f730bc 100755 (executable)
@@ -8,15 +8,10 @@ sleep 2
 set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keepproxy"
-install bin/keepproxy /usr/local/bin
+install $GOPATH/bin/keepproxy /usr/local/bin
 
 if test "$1" = "--only-deps" ; then
     exit
index 45407cb9ab48d34c7baa7811aef92376ed759d0b..3ef78ee45575676dc881059efd60cc57bd64cbd9 100755 (executable)
@@ -5,6 +5,8 @@
 
 flock /var/lib/arvados/createusers.lock /usr/local/lib/arvbox/createusers.sh
 
+make-ssl-cert generate-default-snakeoil --force-overwrite
+
 . /usr/local/lib/arvbox/common.sh
 
 chown -R $PGUSER:$PGGROUP /var/lib/postgresql
index cb56ac7f4de5dbb7f3ad6d22c6b8166933a32f81..2d01d907985c0c9ca6e0cf1e39969e1b4ce2d7fd 100755 (executable)
@@ -14,14 +14,10 @@ else
   RAILS_ENV=development
 fi
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
+. /usr/local/lib/arvbox/go-setup.sh
 
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/ws"
-install bin/ws /usr/local/bin/arvados-ws
+install $GOPATH/bin/ws /usr/local/bin/arvados-ws
 
 if test "$1" = "--only-deps" ; then
     exit
index ebc40b13cb8c2ad2ec8152df975d6a0863ffc45b..10569b2e139b89a2904820ab62dab6cbc3a747b5 100644 (file)
@@ -307,7 +307,7 @@ func doMain(cfg *ConfigParams) error {
                }
                userIDToUUID[uID] = u.UUID
                if cfg.Verbose {
-                       log.Printf("Seen user %q (%s)", u.Username, u.Email)
+                       log.Printf("Seen user %q (%s)", u.Username, u.UUID)
                }
        }
 
@@ -317,6 +317,11 @@ func doMain(cfg *ConfigParams) error {
                return err
        }
        log.Printf("Found %d remote groups", len(remoteGroups))
+       if cfg.Verbose {
+               for groupUUID := range remoteGroups {
+                       log.Printf("- Group %q: %d users", remoteGroups[groupUUID].Group.Name, len(remoteGroups[groupUUID].PreviousMembers))
+               }
+       }
 
        membershipsRemoved := 0
 
@@ -504,9 +509,9 @@ func GetRemoteGroups(cfg *ConfigParams, allUsers map[string]arvados.User) (remot
                                Operator: "=",
                                Operand:  group.UUID,
                        }, {
-                               Attr:     "head_kind",
-                               Operator: "=",
-                               Operand:  "arvados#user",
+                               Attr:     "head_uuid",
+                               Operator: "like",
+                               Operand:  "%-tpzed-%",
                        }},
                }
                // User -> Group filter
@@ -528,9 +533,9 @@ func GetRemoteGroups(cfg *ConfigParams, allUsers map[string]arvados.User) (remot
                                Operator: "=",
                                Operand:  group.UUID,
                        }, {
-                               Attr:     "tail_kind",
-                               Operator: "=",
-                               Operand:  "arvados#user",
+                               Attr:     "tail_uuid",
+                               Operator: "like",
+                               Operand:  "%-tpzed-%",
                        }},
                }
                g2uLinks, err := GetAll(cfg.Client, "links", g2uFilter, &LinkList{})
@@ -579,7 +584,7 @@ func GetRemoteGroups(cfg *ConfigParams, allUsers map[string]arvados.User) (remot
 // RemoveMemberFromGroup remove all links related to the membership
 func RemoveMemberFromGroup(cfg *ConfigParams, user arvados.User, group arvados.Group) error {
        if cfg.Verbose {
-               log.Printf("Getting group membership links for user %q (%s) on group %q (%s)", user.Email, user.UUID, group.Name, group.UUID)
+               log.Printf("Getting group membership links for user %q (%s) on group %q (%s)", user.Username, user.UUID, group.Name, group.UUID)
        }
        var links []interface{}
        // Search for all group<->user links (both ways)
index e776648a803736581dcb61724631ac844cdca68f..4a3e470c42f2b56f82199938d857bd295e564e9a 100644 (file)
@@ -83,7 +83,6 @@ func (s *TestSuite) SetUpTest(c *C) {
        c.Assert(len(s.users), Not(Equals), 0)
 }
 
-// Clean any membership link and remote group created by the test
 func (s *TestSuite) TearDownTest(c *C) {
        var dst interface{}
        // Reset database to fixture state after every test run.
@@ -93,7 +92,7 @@ func (s *TestSuite) TearDownTest(c *C) {
 
 var _ = Suite(&TestSuite{})
 
-// MakeTempCVSFile creates a temp file with data as comma separated values
+// MakeTempCSVFile creates a temp file with data as comma separated values
 func MakeTempCSVFile(data [][]string) (f *os.File, err error) {
        f, err = ioutil.TempFile("", "test_sync_remote_groups")
        if err != nil {
@@ -266,11 +265,15 @@ func (s *TestSuite) TestIgnoreSpaces(c *C) {
 
 // The absence of a user membership on the CSV file implies its removal
 func (s *TestSuite) TestMembershipRemoval(c *C) {
-       activeUserEmail := s.users[arvadostest.ActiveUserUUID].Email
-       activeUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
+       localUserEmail := s.users[arvadostest.ActiveUserUUID].Email
+       localUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
+       remoteUserEmail := s.users[arvadostest.FederatedActiveUserUUID].Email
+       remoteUserUUID := s.users[arvadostest.FederatedActiveUserUUID].UUID
        data := [][]string{
-               {"TestGroup1", activeUserEmail},
-               {"TestGroup2", activeUserEmail},
+               {"TestGroup1", localUserEmail},
+               {"TestGroup1", remoteUserEmail},
+               {"TestGroup2", localUserEmail},
+               {"TestGroup2", remoteUserEmail},
        }
        tmpfile, err := MakeTempCSVFile(data)
        c.Assert(err, IsNil)
@@ -283,11 +286,13 @@ func (s *TestSuite) TestMembershipRemoval(c *C) {
                groupUUID, err := RemoteGroupExists(s.cfg, groupName)
                c.Assert(err, IsNil)
                c.Assert(groupUUID, Not(Equals), "")
-               c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, true)
+               c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, true)
+               c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, true)
        }
-       // New CSV with one previous membership missing
+       // New CSV with some previous membership missing
        data = [][]string{
-               {"TestGroup1", activeUserEmail},
+               {"TestGroup1", localUserEmail},
+               {"TestGroup2", remoteUserEmail},
        }
        tmpfile2, err := MakeTempCSVFile(data)
        c.Assert(err, IsNil)
@@ -295,16 +300,18 @@ func (s *TestSuite) TestMembershipRemoval(c *C) {
        s.cfg.Path = tmpfile2.Name()
        err = doMain(s.cfg)
        c.Assert(err, IsNil)
-       // Confirm TestGroup1 membership still exist
+       // Confirm TestGroup1 memberships
        groupUUID, err := RemoteGroupExists(s.cfg, "TestGroup1")
        c.Assert(err, IsNil)
        c.Assert(groupUUID, Not(Equals), "")
-       c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, true)
-       // Confirm TestGroup2 membership was removed
+       c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, true)
+       c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, false)
+       // Confirm TestGroup1 memberships
        groupUUID, err = RemoteGroupExists(s.cfg, "TestGroup2")
        c.Assert(err, IsNil)
        c.Assert(groupUUID, Not(Equals), "")
-       c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, false)
+       c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, false)
+       c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, true)
 }
 
 // If a group doesn't exist on the system, create it before adding users