Merge branch '12902-queued-to-cancelled'
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 1 Feb 2018 16:51:40 +0000 (11:51 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 1 Feb 2018 16:51:40 +0000 (11:51 -0500)
refs #12902

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

38 files changed:
doc/_includes/_mount_types.liquid
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/arvados-tests.sh
sdk/cwl/tests/hg19/hg19.fa [new file with mode: 0644]
sdk/cwl/tests/hg19/hg19.fa.amb [new file with mode: 0644]
sdk/cwl/tests/hg19/hg19.fa.ann [new file with mode: 0644]
sdk/cwl/tests/hg19/hg19.fa.fai [new file with mode: 0644]
sdk/cwl/tests/test_make_output.py
sdk/go/arvadostest/fixtures.go
services/api/test/fixtures/users.yml
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/slurm.go [new file with mode: 0644]
services/crunch-dispatch-slurm/squeue.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/fuse/arvados_fuse/fusedir.py
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
tools/arvbox/lib/arvbox/docker/Dockerfile.base
tools/arvbox/lib/arvbox/docker/Dockerfile.demo
tools/arvbox/lib/arvbox/docker/crunch-setup.sh
tools/arvbox/lib/arvbox/docker/go-setup.sh [new file with mode: 0644]
tools/arvbox/lib/arvbox/docker/keep-setup.sh
tools/arvbox/lib/arvbox/docker/service/arv-git-httpd/run-service
tools/arvbox/lib/arvbox/docker/service/composer/run-service
tools/arvbox/lib/arvbox/docker/service/crunch-dispatch-local/run-service
tools/arvbox/lib/arvbox/docker/service/keep-web/run-service
tools/arvbox/lib/arvbox/docker/service/keepproxy/run-service
tools/arvbox/lib/arvbox/docker/service/postgres/run
tools/arvbox/lib/arvbox/docker/service/websockets/run-service
tools/sync-groups/sync-groups.go
tools/sync-groups/sync-groups_test.go

index 734b07c8b7970c00bdd6e99be3815e9d8d31f1f0..fc8a7991b38c65616704d0d096d7500a1b9e1473 100644 (file)
@@ -64,7 +64,7 @@ When a container's output_path is a tmp mount backed by local disk, this output
 
 1. Only mount points of kind @collection@ are supported.
 
-2. Mount points underneath output_path must not use @"writable":true@. If any of them are set as @writable@, the API will refuse to create/update the container request, and crunch-run will fail the container.
+2. Mount points underneath output_path which have "writable":true are copied into output_path during container initialization and may be updated, renamed, or deleted by the running container.  The original collection is not modified.  On container completion, files remaining in the output are saved to the output collection.   The mount at output_path must be big enough to accommodate copies of the inner writable mounts.
 
 3. If any such mount points are configured as @exclude_from_output":true@, they will be excluded from the output.
 
index e82fd9feef1f3d88c4068e7d0d6cbfee6232c3f2..71ddd172214c4dac1b20907c8cf5a18bce6c37b2 100644 (file)
@@ -224,13 +224,16 @@ class ArvCwlRunner(object):
 
     def check_features(self, obj):
         if isinstance(obj, dict):
-            if obj.get("writable"):
-                raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
+            if obj.get("writable") and self.work_api != "containers":
+                raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
             if obj.get("class") == "DockerRequirement":
                 if obj.get("dockerOutputDirectory"):
-                    # TODO: can be supported by containers API, but not jobs API.
-                    raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
-                        "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+                    if self.work_api != "containers":
+                        raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+                            "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
+                    if not obj.get("dockerOutputDirectory").startswith('/'):
+                        raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
+                            "Option 'dockerOutputDirectory' must be an absolute path.")
             for v in obj.itervalues():
                 self.check_features(v)
         elif isinstance(obj, list):
@@ -279,7 +282,7 @@ class ArvCwlRunner(object):
 
         def rewrite(fileobj):
             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
-            for k in ("basename", "listing", "contents", "nameext", "nameroot", "dirname"):
+            for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
                 if k in fileobj:
                     del fileobj[k]
 
index 014e1b94aae5b283ea851fd74e480dd5df926f55..abe67c8fb3c552c7e66925093ae1377b1e26b4e9 100644 (file)
@@ -105,17 +105,32 @@ class ArvadosContainer(object):
                 generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
                                                     separateDirs=False)
 
+                logger.debug("generatemapper is %s", generatemapper._pathmap)
+
                 with Perf(metrics, "createfiles %s" % self.name):
                     for f, p in generatemapper.items():
                         if not p.target:
                             pass
-                        elif p.type in ("File", "Directory"):
-                            source, path = self.arvrunner.fs_access.get_collection(p.resolved)
-                            vwd.copy(path, p.target, source_collection=source)
+                        elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
+                            if p.resolved.startswith("_:"):
+                                vwd.mkdirs(p.target)
+                            else:
+                                source, path = self.arvrunner.fs_access.get_collection(p.resolved)
+                                vwd.copy(path, p.target, source_collection=source)
                         elif p.type == "CreateFile":
                             with vwd.open(p.target, "w") as n:
                                 n.write(p.resolved.encode("utf-8"))
 
+                def keepemptydirs(p):
+                    if isinstance(p, arvados.collection.RichCollectionBase):
+                        if len(p) == 0:
+                            p.open(".keep", "w").close()
+                        else:
+                            for c in p:
+                                keepemptydirs(p[c])
+
+                keepemptydirs(vwd)
+
                 with Perf(metrics, "generatefiles.save_new %s" % self.name):
                     vwd.save_new()
 
@@ -126,6 +141,8 @@ class ArvadosContainer(object):
                     mounts[mountpoint] = {"kind": "collection",
                                           "portable_data_hash": vwd.portable_data_hash(),
                                           "path": p.target}
+                    if p.type.startswith("Writable"):
+                        mounts[mountpoint]["writable"] = True
 
         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
         if self.environment:
index b667dac1ca5cec6f272c390be8fcd17e1628764c..81faff44e6297476d28d84c7f78d7201dff29122 100644 (file)
@@ -36,8 +36,13 @@ class ArvadosCommandTool(CommandLineTool):
 
     def job(self, joborder, output_callback, **kwargs):
         if self.work_api == "containers":
-            kwargs["outdir"] = "/var/spool/cwl"
-            kwargs["docker_outdir"] = "/var/spool/cwl"
+            dockerReq, is_req = self.get_requirement("DockerRequirement")
+            if dockerReq and dockerReq.get("dockerOutputDirectory"):
+                kwargs["outdir"] = dockerReq.get("dockerOutputDirectory")
+                kwargs["docker_outdir"] = dockerReq.get("dockerOutputDirectory")
+            else:
+                kwargs["outdir"] = "/var/spool/cwl"
+                kwargs["docker_outdir"] = "/var/spool/cwl"
         elif self.work_api == "jobs":
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["docker_outdir"] = "$(task.outdir)"
index 914ccaa5a1049868cfe7f840f6bf7d56e957218c..998890a31c50acac0513479d0fad9675fd790647 100644 (file)
@@ -225,12 +225,16 @@ class StagingPathMapper(PathMapper):
         tgt = os.path.join(stagedir, obj["basename"])
         basetgt, baseext = os.path.splitext(tgt)
         n = 1
-        while tgt in self.targets:
-            n += 1
-            tgt = "%s_%i%s" % (basetgt, n, baseext)
+        if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
+            while tgt in self.targets:
+                n += 1
+                tgt = "%s_%i%s" % (basetgt, n, baseext)
         self.targets.add(tgt)
         if obj["class"] == "Directory":
-            self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
+            if obj.get("writable"):
+                self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
+            else:
+                self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
             if loc.startswith("_:") or self._follow_dirs:
                 self.visitlisting(obj.get("listing", []), tgt, basedir)
         elif obj["class"] == "File":
@@ -239,7 +243,7 @@ class StagingPathMapper(PathMapper):
             if "contents" in obj and loc.startswith("_:"):
                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
             else:
-                if copy:
+                if copy or obj.get("writable"):
                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
                 else:
                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
index 2ca63cfe5048a62f0a1853e2aca06be865ea1fd4..fb5d036e941969df71b6a3062d09bb87d4328739 100644 (file)
@@ -161,7 +161,7 @@ def upload_docker(arvrunner, tool):
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
         if docker_req:
-            if docker_req.get("dockerOutputDirectory"):
+            if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
                 # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
index 88e3d80db35293a5fab4aec7dbd4db023e26a943..e5484651edcd44d0f2ef67c3bf0dbd1244ffca60 100644 (file)
@@ -41,7 +41,7 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20180116213856',
+          'cwltool==1.0.20180130110340',
           'schema-salad==2.6.20171201034858',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',
index 26b31615ea41950a7f3f13535571b2755981eb7a..d3c1e90637d5419320b0115b386780d6321d9975 100755 (executable)
@@ -6,4 +6,7 @@
 if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
     arv-put --portable-data-hash testdir/*
 fi
+if ! arv-get f225e6259bdd63bc7240599648dde9f1+97 > /dev/null ; then
+    arv-put --portable-data-hash hg19/*
+fi
 exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum
diff --git a/sdk/cwl/tests/hg19/hg19.fa b/sdk/cwl/tests/hg19/hg19.fa
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/sdk/cwl/tests/hg19/hg19.fa.amb b/sdk/cwl/tests/hg19/hg19.fa.amb
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/sdk/cwl/tests/hg19/hg19.fa.ann b/sdk/cwl/tests/hg19/hg19.fa.ann
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/sdk/cwl/tests/hg19/hg19.fa.fai b/sdk/cwl/tests/hg19/hg19.fa.fai
new file mode 100644 (file)
index 0000000..e69de29
index 05c9ee7410ac7b12ac1ae29770db0617709ce710..806d63ab85f3f1a9a08c73f9ea76f3dc7b3ecc09 100644 (file)
@@ -59,11 +59,13 @@ class TestMakeOutput(unittest.TestCase):
         final.save_new.assert_has_calls([mock.call(ensure_unique_name=True, name='Test output', owner_uuid='zzzzz-j7d0g-zzzzzzzzzzzzzzz')])
         self.assertEqual("""{
     "bar": {
+        "basename": "baz.txt",
         "class": "File",
         "location": "baz.txt",
         "size": 4
     },
     "foo": {
+        "basename": "foo.txt",
         "class": "File",
         "location": "foo.txt",
         "size": 3
index 5e530658abe96534d9ed3488a975a85a778af16d..d057c09b227e9f375d2b3d04e95d9327044c4f33 100644 (file)
@@ -13,6 +13,7 @@ const (
        DataManagerToken        = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1"
        ManagementToken         = "jg3ajndnq63sywcd50gbs5dskdc9ckkysb0nsqmfz08nwf17nl"
        ActiveUserUUID          = "zzzzz-tpzed-xurymjxw79nv3jz"
+       FederatedActiveUserUUID = "zbbbb-tpzed-xurymjxw79nv3jz"
        SpectatorUserUUID       = "zzzzz-tpzed-l1s2piq4t4mps8r"
        UserAgreementCollection = "zzzzz-4zz18-uukreo9rbgwsujr" // user_agreement_in_anonymously_accessible_project
        FooCollection           = "zzzzz-4zz18-fy296fx3hot09f7"
index 808795213351870986689299f3a9cbcbf087265c..8fb800c5f94f8a93bdc2f3990282ea76df7bb51b 100644 (file)
@@ -84,6 +84,22 @@ active:
       role: Computational biologist
     getting_started_shown: 2015-03-26 12:34:56.789000000 Z
 
+federated_active:
+  owner_uuid: zzzzz-tpzed-000000000000000
+  uuid: zbbbb-tpzed-xurymjxw79nv3jz
+  email: zbbbb-active-user@arvados.local
+  first_name: Active
+  last_name: User
+  identity_url: https://active-user.openid.local
+  is_active: true
+  is_admin: false
+  username: federatedactive
+  prefs:
+    profile:
+      organization: example.com
+      role: Computational biologist
+    getting_started_shown: 2015-03-26 12:34:56.789000000 Z
+
 project_viewer:
   owner_uuid: zzzzz-tpzed-000000000000000
   uuid: zzzzz-tpzed-projectviewer1a
index 3c89103f38dbc1cd094047d173c10bfe0b28f08e..ae2ca58421d3f3a58f0a4a0f28cbdc2beb56e6af 100644 (file)
@@ -7,14 +7,12 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
-       "bytes"
        "context"
        "flag"
        "fmt"
        "log"
        "math"
        "os"
-       "os/exec"
        "regexp"
        "strings"
        "time"
@@ -43,9 +41,12 @@ type Config struct {
 
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
+
+       slurm Slurm
 }
 
 func main() {
+       theConfig.slurm = &slurmCLI{}
        err := doMain()
        if err != nil {
                log.Fatal(err)
@@ -175,8 +176,7 @@ func niceness(priority int) int {
        return (1000 - priority) * 10
 }
 
-// sbatchCmd
-func sbatchFunc(container arvados.Container) *exec.Cmd {
+func sbatchArgs(container arvados.Container) []string {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
 
        var disk int64
@@ -198,61 +198,22 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
 
-       return exec.Command("sbatch", sbatchArgs...)
-}
-
-// scancelCmd
-func scancelFunc(container arvados.Container) *exec.Cmd {
-       return exec.Command("scancel", "--name="+container.UUID)
-}
-
-// scontrolCmd
-func scontrolFunc(container arvados.Container) *exec.Cmd {
-       return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+       return sbatchArgs
 }
 
-// Wrap these so that they can be overridden by tests
-var sbatchCmd = sbatchFunc
-var scancelCmd = scancelFunc
-var scontrolCmd = scontrolFunc
-
-// Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
-       cmd := sbatchCmd(container)
-
-       // Send a tiny script on stdin to execute the crunch-run
-       // command (slurm requires this to be a #! script)
-
        // append() here avoids modifying crunchRunCommand's
        // underlying array, which is shared with other goroutines.
-       args := append([]string(nil), crunchRunCommand...)
-       args = append(args, container.UUID)
-       cmd.Stdin = strings.NewReader(execScript(args))
-
-       var stdout, stderr bytes.Buffer
-       cmd.Stdout = &stdout
-       cmd.Stderr = &stderr
+       crArgs := append([]string(nil), crunchRunCommand...)
+       crArgs = append(crArgs, container.UUID)
+       crScript := strings.NewReader(execScript(crArgs))
 
-       // Mutex between squeue sync and running sbatch or scancel.
        sqCheck.L.Lock()
        defer sqCheck.L.Unlock()
 
-       log.Printf("exec sbatch %+q", cmd.Args)
-       err := cmd.Run()
-
-       switch err.(type) {
-       case nil:
-               log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
-               return nil
-
-       case *exec.ExitError:
-               dispatcher.Unlock(container.UUID)
-               return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
-
-       default:
-               dispatcher.Unlock(container.UUID)
-               return fmt.Errorf("exec failed: %v", err)
-       }
+       sbArgs := sbatchArgs(container)
+       log.Printf("running sbatch %+q", sbArgs)
+       return theConfig.slurm.Batch(crScript, sbArgs)
 }
 
 // Submit a container to the slurm queue (or resume monitoring if it's
@@ -313,10 +274,8 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
                        } else if updated.Priority == 0 {
                                log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
                                scancel(ctr)
-                       } else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
-                               // dynamically adjust priority
-                               log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
-                               scontrolUpdate(updated)
+                       } else {
+                               renice(updated)
                        }
                }
        }
@@ -324,12 +283,11 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
 
 func scancel(ctr arvados.Container) {
        sqCheck.L.Lock()
-       cmd := scancelCmd(ctr)
-       msg, err := cmd.CombinedOutput()
+       err := theConfig.slurm.Cancel(ctr.UUID)
        sqCheck.L.Unlock()
 
        if err != nil {
-               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               log.Printf("scancel: %s", err)
                time.Sleep(time.Second)
        } else if sqCheck.HasUUID(ctr.UUID) {
                log.Printf("container %s is still in squeue after scancel", ctr.UUID)
@@ -337,17 +295,24 @@ func scancel(ctr arvados.Container) {
        }
 }
 
-func scontrolUpdate(ctr arvados.Container) {
+func renice(ctr arvados.Container) {
+       nice := niceness(ctr.Priority)
+       oldnice := sqCheck.GetNiceness(ctr.UUID)
+       if nice == oldnice || oldnice == -1 {
+               return
+       }
+       log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
        sqCheck.L.Lock()
-       cmd := scontrolCmd(ctr)
-       msg, err := cmd.CombinedOutput()
+       err := theConfig.slurm.Renice(ctr.UUID, nice)
        sqCheck.L.Unlock()
 
        if err != nil {
-               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               log.Printf("renice: %s", err)
                time.Sleep(time.Second)
-       } else if sqCheck.HasUUID(ctr.UUID) {
-               log.Printf("Container %s priority is now %v, niceness is now %v",
+               return
+       }
+       if sqCheck.HasUUID(ctr.UUID) {
+               log.Printf("container %s has arvados priority %d, slurm nice %d",
                        ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
        }
 }
index a823755379574155420e465b2ec2b72f234a3024..830976d66bdd9e3dbce855936a241b7b83ec8e7b 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "bytes"
        "context"
+       "errors"
        "fmt"
        "io"
        "io/ioutil"
@@ -64,51 +65,55 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
        arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) integrationTest(c *C,
-       newSqueueCmd func() *exec.Cmd,
-       newScancelCmd func(arvados.Container) *exec.Cmd,
-       newSbatchCmd func(arvados.Container) *exec.Cmd,
-       newScontrolCmd func(arvados.Container) *exec.Cmd,
-       sbatchCmdComps []string,
-       runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
-       arvadostest.ResetEnv()
+type slurmFake struct {
+       didBatch  [][]string
+       didCancel []string
+       didRenice [][]string
+       queue     string
+       // If non-nil, run this func during the 2nd+ call to Cancel()
+       onCancel func()
+       // Error returned by Batch()
+       errBatch error
+}
 
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, IsNil)
+func (sf *slurmFake) Batch(script io.Reader, args []string) error {
+       sf.didBatch = append(sf.didBatch, args)
+       return sf.errBatch
+}
 
-       var sbatchCmdLine []string
+func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd {
+       return exec.Command("echo", sf.queue)
+}
 
-       // Override sbatchCmd
-       defer func(orig func(arvados.Container) *exec.Cmd) {
-               sbatchCmd = orig
-       }(sbatchCmd)
+func (sf *slurmFake) Renice(name string, nice int) error {
+       sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
+       return nil
+}
 
-       if newSbatchCmd != nil {
-               sbatchCmd = newSbatchCmd
-       } else {
-               sbatchCmd = func(container arvados.Container) *exec.Cmd {
-                       sbatchCmdLine = sbatchFunc(container).Args
-                       return exec.Command("sh")
-               }
+func (sf *slurmFake) Cancel(name string) error {
+       sf.didCancel = append(sf.didCancel, name)
+       if len(sf.didCancel) == 1 {
+               // simulate error on first attempt
+               return errors.New("something terrible happened")
+       }
+       if sf.onCancel != nil {
+               sf.onCancel()
        }
+       return nil
+}
 
-       // Override squeueCmd
-       defer func(orig func() *exec.Cmd) {
-               squeueCmd = orig
-       }(squeueCmd)
-       squeueCmd = newSqueueCmd
+func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+       expectBatch [][]string,
+       runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+       arvadostest.ResetEnv()
 
-       // Override scancel
-       defer func(orig func(arvados.Container) *exec.Cmd) {
-               scancelCmd = orig
-       }(scancelCmd)
-       scancelCmd = newScancelCmd
+       arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, IsNil)
 
-       // Override scontrol
-       defer func(orig func(arvados.Container) *exec.Cmd) {
-               scontrolCmd = orig
-       }(scontrolCmd)
-       scontrolCmd = newScontrolCmd
+       defer func(orig Slurm) {
+               theConfig.slurm = orig
+       }(theConfig.slurm)
+       theConfig.slurm = slurm
 
        // There should be one queued container
        params := arvadosclient.Dict{
@@ -130,6 +135,7 @@ func (s *TestSuite) integrationTest(c *C,
                RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
                        go func() {
                                runContainer(disp, ctr)
+                               slurm.queue = ""
                                doneRun <- struct{}{}
                        }()
                        run(disp, ctr, status)
@@ -145,7 +151,7 @@ func (s *TestSuite) integrationTest(c *C,
 
        sqCheck.Stop()
 
-       c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+       c.Check(slurm.didBatch, DeepEquals, expectBatch)
 
        // There should be no queued containers now
        err = arv.List("containers", params, &containers)
@@ -160,77 +166,47 @@ func (s *TestSuite) integrationTest(c *C,
 }
 
 func (s *TestSuite) TestIntegrationNormal(c *C) {
-       done := false
        container := s.integrationTest(c,
-               func() *exec.Cmd {
-                       if done {
-                               return exec.Command("true")
-                       } else {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-                       }
-               },
-               nil,
+               &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
                nil,
-               nil,
-               []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        time.Sleep(3 * time.Second)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
-                       done = true
                })
        c.Check(container.State, Equals, arvados.ContainerStateComplete)
 }
 
 func (s *TestSuite) TestIntegrationCancel(c *C) {
-       var cmd *exec.Cmd
-       var scancelCmdLine []string
-       attempt := 0
-
+       slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+       readyToCancel := make(chan bool)
+       slurm.onCancel = func() { <-readyToCancel }
        container := s.integrationTest(c,
-               func() *exec.Cmd {
-                       if cmd != nil && cmd.ProcessState != nil {
-                               return exec.Command("true")
-                       } else {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-                       }
-               },
-               func(container arvados.Container) *exec.Cmd {
-                       if attempt++; attempt == 1 {
-                               return exec.Command("false")
-                       } else {
-                               scancelCmdLine = scancelFunc(container).Args
-                               cmd = exec.Command("echo")
-                               return cmd
-                       }
-               },
-               nil,
+               slurm,
                nil,
-               []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
-                       time.Sleep(1 * time.Second)
+                       time.Sleep(time.Second)
                        dispatcher.Arv.Update("containers", container.UUID,
                                arvadosclient.Dict{
                                        "container": arvadosclient.Dict{"priority": 0}},
                                nil)
+                       readyToCancel <- true
+                       close(readyToCancel)
                })
        c.Check(container.State, Equals, arvados.ContainerStateCancelled)
-       c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
+       c.Check(len(slurm.didCancel) > 1, Equals, true)
+       c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
 }
 
 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
-       container := s.integrationTest(c,
-               func() *exec.Cmd { return exec.Command("echo") },
-               nil,
-               nil,
-               nil,
-               []string{"sbatch",
+       container := s.integrationTest(c, &slurmFake{},
+               [][]string{{
                        fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
                        fmt.Sprintf("--mem=%d", 11445),
                        fmt.Sprintf("--cpus-per-task=%d", 4),
                        fmt.Sprintf("--tmp=%d", 45777),
-                       fmt.Sprintf("--nice=%d", 9990)},
+                       fmt.Sprintf("--nice=%d", 9990)}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        time.Sleep(3 * time.Second)
@@ -241,13 +217,8 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
 
 func (s *TestSuite) TestSbatchFail(c *C) {
        container := s.integrationTest(c,
-               func() *exec.Cmd { return exec.Command("echo") },
-               nil,
-               func(container arvados.Container) *exec.Cmd {
-                       return exec.Command("false")
-               },
-               nil,
-               []string(nil),
+               &slurmFake{errBatch: errors.New("something terrible happened")},
+               [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
@@ -387,71 +358,47 @@ func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
 }
 
 func testSbatchFuncWithArgs(c *C, args []string) {
+       defer func() { theConfig.SbatchArguments = nil }()
        theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
 
        container := arvados.Container{
                UUID:               "123",
                RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
                Priority:           1}
-       sbatchCmd := sbatchFunc(container)
 
        var expected []string
-       expected = append(expected, "sbatch")
        expected = append(expected, theConfig.SbatchArguments...)
        expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
-
-       c.Check(sbatchCmd.Args, DeepEquals, expected)
+       c.Check(sbatchArgs(container), DeepEquals, expected)
 }
 
 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
-       theConfig.SbatchArguments = nil
        container := arvados.Container{
                UUID:                 "123",
                RuntimeConstraints:   arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
                SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
                Priority:             1}
-       sbatchCmd := sbatchFunc(container)
 
-       var expected []string
-       expected = append(expected, "sbatch")
-       expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990", "--partition=blurb,b2")
-
-       c.Check(sbatchCmd.Args, DeepEquals, expected)
+       c.Check(sbatchArgs(container), DeepEquals, []string{
+               "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
+               "--partition=blurb,b2",
+       })
 }
 
 func (s *TestSuite) TestIntegrationChangePriority(c *C) {
-       var scontrolCmdLine []string
-       step := 0
-
-       container := s.integrationTest(c,
-               func() *exec.Cmd {
-                       if step == 0 {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
-                       } else if step == 1 {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 4000 100")
-                       } else {
-                               return exec.Command("echo")
-                       }
-               },
-               func(arvados.Container) *exec.Cmd { return exec.Command("true") },
-               nil,
-               func(container arvados.Container) *exec.Cmd {
-                       scontrolCmdLine = scontrolFunc(container).Args
-                       step = 1
-                       return exec.Command("true")
-               },
-               []string(nil),
+       slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+       container := s.integrationTest(c, slurm, nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
-                       time.Sleep(1 * time.Second)
+                       time.Sleep(time.Second)
                        dispatcher.Arv.Update("containers", container.UUID,
                                arvadosclient.Dict{
                                        "container": arvadosclient.Dict{"priority": 600}},
                                nil)
-                       time.Sleep(1 * time.Second)
-                       step = 2
+                       time.Sleep(time.Second)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
                })
        c.Check(container.State, Equals, arvados.ContainerStateComplete)
-       c.Check(scontrolCmdLine, DeepEquals, []string{"scontrol", "update", "JobName=zzzzz-dz642-queuedcontainer", "Nice=4000"})
+       c.Assert(len(slurm.didRenice), Not(Equals), 0)
+       c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
 }
diff --git a/services/crunch-dispatch-slurm/slurm.go b/services/crunch-dispatch-slurm/slurm.go
new file mode 100644 (file)
index 0000000..bd19377
--- /dev/null
@@ -0,0 +1,73 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "fmt"
+       "io"
+       "log"
+       "os/exec"
+       "strings"
+)
+
+type Slurm interface {
+       Cancel(name string) error
+       Renice(name string, nice int) error
+       QueueCommand(args []string) *exec.Cmd
+       Batch(script io.Reader, args []string) error
+}
+
+type slurmCLI struct{}
+
+func (scli *slurmCLI) Batch(script io.Reader, args []string) error {
+       return scli.run(script, "sbatch", args)
+}
+
+func (scli *slurmCLI) Cancel(name string) error {
+       for _, args := range [][]string{
+               // If the slurm job hasn't started yet, remove it from
+               // the queue.
+               {"--state=pending"},
+               // If the slurm job has started, send SIGTERM. If we
+               // cancel a running job without a --signal argument,
+               // slurm will send SIGTERM and then (after some
+               // site-configured interval) SIGKILL. This would kill
+               // crunch-run without stopping the container, which we
+               // don't want.
+               {"--batch", "--signal=TERM", "--state=running"},
+               {"--batch", "--signal=TERM", "--state=suspended"},
+       } {
+               err := scli.run(nil, "scancel", append([]string{"--name=" + name}, args...))
+               if err != nil {
+                       // scancel exits 0 if no job matches the given
+                       // name and state. Any error from scancel here
+                       // really indicates something is wrong.
+                       return err
+               }
+       }
+       return nil
+}
+
+func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd {
+       return exec.Command("squeue", args...)
+}
+
+func (scli *slurmCLI) Renice(name string, nice int) error {
+       return scli.run(nil, "scontrol", []string{"update", "JobName=" + name, fmt.Sprintf("Nice=%d", nice)})
+}
+
+func (scli *slurmCLI) run(stdin io.Reader, prog string, args []string) error {
+       cmd := exec.Command(prog, args...)
+       cmd.Stdin = stdin
+       out, err := cmd.CombinedOutput()
+       outTrim := strings.TrimSpace(string(out))
+       if err != nil || len(out) > 0 {
+               log.Printf("%q %q: %q", cmd.Path, cmd.Args, outTrim)
+       }
+       if err != nil {
+               err = fmt.Errorf("%s: %s (%q)", cmd.Path, err, outTrim)
+       }
+       return err
+}
index 819c2d2510f992ab75c18249d8b46d6c13828d6d..5ecfe8ff2fc049201eb27c8e58c7a02eb84cbbb4 100644 (file)
@@ -8,7 +8,6 @@ import (
        "bytes"
        "fmt"
        "log"
-       "os/exec"
        "strings"
        "sync"
        "time"
@@ -29,12 +28,6 @@ type SqueueChecker struct {
        sync.Cond
 }
 
-func squeueFunc() *exec.Cmd {
-       return exec.Command("squeue", "--all", "--format=%j %y %Q")
-}
-
-var squeueCmd = squeueFunc
-
 // HasUUID checks if a given container UUID is in the slurm queue.
 // This does not run squeue directly, but instead blocks until woken
 // up by next successful update of squeue.
@@ -84,7 +77,7 @@ func (sqc *SqueueChecker) check() {
        sqc.L.Lock()
        defer sqc.L.Unlock()
 
-       cmd := squeueCmd()
+       cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
        stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
        cmd.Stdout, cmd.Stderr = stdout, stderr
        if err := cmd.Run(); err != nil {
index b480c068c597ecc178b08be7278af2edbd72bce9..a9f1c25d37fa1714868d371c245dc7aa8d041543 100644 (file)
@@ -6,7 +6,6 @@ package main
 
 import (
        "bytes"
-       "context"
        "encoding/json"
        "errors"
        "flag"
@@ -33,6 +32,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "golang.org/x/net/context"
 
        dockertypes "github.com/docker/docker/api/types"
        dockercontainer "github.com/docker/docker/api/types/container"
@@ -75,60 +75,13 @@ type ThinDockerClient interface {
        ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
                networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
        ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
-       ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+       ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
        ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
        ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
        ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
        ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
 }
 
-// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
-// that executes the docker requests on dockerclient.Client
-type ThinDockerClientProxy struct {
-       Docker *dockerclient.Client
-}
-
-// ContainerAttach invokes dockerclient.Client.ContainerAttach
-func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
-       return proxy.Docker.ContainerAttach(ctx, container, options)
-}
-
-// ContainerCreate invokes dockerclient.Client.ContainerCreate
-func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
-       networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
-       return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
-}
-
-// ContainerStart invokes dockerclient.Client.ContainerStart
-func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
-       return proxy.Docker.ContainerStart(ctx, container, options)
-}
-
-// ContainerStop invokes dockerclient.Client.ContainerStop
-func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
-       return proxy.Docker.ContainerStop(ctx, container, timeout)
-}
-
-// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
-       return proxy.Docker.ContainerWait(ctx, container, condition)
-}
-
-// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
-func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
-       return proxy.Docker.ImageInspectWithRaw(ctx, image)
-}
-
-// ImageLoad invokes dockerclient.Client.ImageLoad
-func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
-       return proxy.Docker.ImageLoad(ctx, input, quiet)
-}
-
-// ImageRemove invokes dockerclient.Client.ImageRemove
-func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
-       return proxy.Docker.ImageRemove(ctx, image, options)
-}
-
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
@@ -150,21 +103,23 @@ type ContainerRunner struct {
        LogsPDH       *string
        RunArvMount
        MkTempDir
-       ArvMount       *exec.Cmd
-       ArvMountPoint  string
-       HostOutputDir  string
-       CleanupTempDir []string
-       Binds          []string
-       Volumes        map[string]struct{}
-       OutputPDH      *string
-       SigChan        chan os.Signal
-       ArvMountExit   chan error
-       finalState     string
-
-       statLogger   io.WriteCloser
-       statReporter *crunchstat.Reporter
-       statInterval time.Duration
-       cgroupRoot   string
+       ArvMount      *exec.Cmd
+       ArvMountPoint string
+       HostOutputDir string
+       Binds         []string
+       Volumes       map[string]struct{}
+       OutputPDH     *string
+       SigChan       chan os.Signal
+       ArvMountExit  chan error
+       finalState    string
+       parentTemp    string
+
+       statLogger       io.WriteCloser
+       statReporter     *crunchstat.Reporter
+       hoststatLogger   io.WriteCloser
+       hoststatReporter *crunchstat.Reporter
+       statInterval     time.Duration
+       cgroupRoot       string
        // What we expect the container's cgroup parent to be.
        expectCgroupParent string
        // What we tell docker to use as the container's cgroup
@@ -180,7 +135,6 @@ type ContainerRunner struct {
        setCgroupParent string
 
        cStateLock sync.Mutex
-       cStarted   bool // StartContainer() succeeded
        cCancelled bool // StopContainer() invoked
 
        enableNetwork string // one of "default" or "always"
@@ -197,11 +151,10 @@ func (runner *ContainerRunner) setupSignals() {
        signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
        go func(sig chan os.Signal) {
-               s := <-sig
-               if s != nil {
-                       runner.CrunchLog.Printf("Caught signal %v", s)
+               for s := range sig {
+                       runner.CrunchLog.Printf("caught signal: %v", s)
+                       runner.stop()
                }
-               runner.stop()
        }(runner.SigChan)
 }
 
@@ -209,25 +162,20 @@ func (runner *ContainerRunner) setupSignals() {
 func (runner *ContainerRunner) stop() {
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
-       if runner.cCancelled {
+       if runner.ContainerID == "" {
                return
        }
        runner.cCancelled = true
-       if runner.cStarted {
-               timeout := time.Duration(10)
-               err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
-               if err != nil {
-                       runner.CrunchLog.Printf("StopContainer failed: %s", err)
-               }
-               // Suppress multiple calls to stop()
-               runner.cStarted = false
+       runner.CrunchLog.Printf("removing container")
+       err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+       if err != nil {
+               runner.CrunchLog.Printf("error removing container: %s", err)
        }
 }
 
 func (runner *ContainerRunner) stopSignals() {
        if runner.SigChan != nil {
                signal.Stop(runner.SigChan)
-               close(runner.SigChan)
        }
 }
 
@@ -379,11 +327,42 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
 
 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
        if runner.ArvMountPoint == "" {
-               runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+               runner.ArvMountPoint, err = runner.MkTempDir(runner.parentTemp, prefix)
        }
        return
 }
 
+func copyfile(src string, dst string) (err error) {
+       srcfile, err := os.Open(src)
+       if err != nil {
+               return
+       }
+
+       os.MkdirAll(path.Dir(dst), 0777)
+
+       dstfile, err := os.Create(dst)
+       if err != nil {
+               return
+       }
+       _, err = io.Copy(dstfile, srcfile)
+       if err != nil {
+               return
+       }
+
+       err = srcfile.Close()
+       err2 := dstfile.Close()
+
+       if err != nil {
+               return
+       }
+
+       if err2 != nil {
+               return err2
+       }
+
+       return nil
+}
+
 func (runner *ContainerRunner) SetupMounts() (err error) {
        err = runner.SetupArvMountPoint("keep")
        if err != nil {
@@ -411,6 +390,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        runner.Binds = nil
        runner.Volumes = make(map[string]struct{})
        needCertMount := true
+       type copyFile struct {
+               src  string
+               bind string
+       }
+       var copyFiles []copyFile
 
        var binds []string
        for bind := range runner.Container.Mounts {
@@ -466,7 +450,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                pdhOnly = false
                                src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
                        } else if mnt.PortableDataHash != "" {
-                               if mnt.Writable {
+                               if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
                                        return fmt.Errorf("Can never write to a collection specified by portable data hash")
                                }
                                idx := strings.Index(mnt.PortableDataHash, "/")
@@ -493,10 +477,12 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if mnt.Writable {
                                if bind == runner.Container.OutputPath {
                                        runner.HostOutputDir = src
+                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                                } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
-                                       return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
+                                       copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
+                               } else {
+                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                                }
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                        } else {
                                runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
                        }
@@ -504,7 +490,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
                case mnt.Kind == "tmp":
                        var tmpdir string
-                       tmpdir, err = runner.MkTempDir("", "")
+                       tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
                        if err != nil {
                                return fmt.Errorf("While creating mount temp dir: %v", err)
                        }
@@ -516,7 +502,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if staterr != nil {
                                return fmt.Errorf("While Chmod temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
                        if bind == runner.Container.OutputPath {
                                runner.HostOutputDir = tmpdir
@@ -532,11 +517,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        // can ensure the file is world-readable
                        // inside the container, without having to
                        // make it world-readable on the docker host.
-                       tmpdir, err := runner.MkTempDir("", "")
+                       tmpdir, err := runner.MkTempDir(runner.parentTemp, "json")
                        if err != nil {
                                return fmt.Errorf("creating temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
                        tmpfn := filepath.Join(tmpdir, "mountdata.json")
                        err = ioutil.WriteFile(tmpfn, jsondata, 0644)
                        if err != nil {
@@ -545,11 +529,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
 
                case mnt.Kind == "git_tree":
-                       tmpdir, err := runner.MkTempDir("", "")
+                       tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
                        if err != nil {
                                return fmt.Errorf("creating temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
                        err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
                        if err != nil {
                                return err
@@ -591,59 +574,118 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
        }
 
+       for _, cp := range copyFiles {
+               st, err := os.Stat(cp.src)
+               if err != nil {
+                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+               }
+               if st.IsDir() {
+                       err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+                               if walkerr != nil {
+                                       return walkerr
+                               }
+                               target := path.Join(cp.bind, walkpath[len(cp.src):])
+                               if walkinfo.Mode().IsRegular() {
+                                       copyerr := copyfile(walkpath, target)
+                                       if copyerr != nil {
+                                               return copyerr
+                                       }
+                                       return os.Chmod(target, walkinfo.Mode()|0777)
+                               } else if walkinfo.Mode().IsDir() {
+                                       mkerr := os.MkdirAll(target, 0777)
+                                       if mkerr != nil {
+                                               return mkerr
+                                       }
+                                       return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
+                               } else {
+                                       return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
+                               }
+                       })
+               } else if st.Mode().IsRegular() {
+                       err = copyfile(cp.src, cp.bind)
+                       if err == nil {
+                               err = os.Chmod(cp.bind, st.Mode()|0777)
+                       }
+               }
+               if err != nil {
+                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+               }
+       }
+
        return nil
 }
 
 func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
        // Handle docker log protocol
        // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
+       defer close(runner.loggingDone)
 
        header := make([]byte, 8)
-       for {
-               _, readerr := io.ReadAtLeast(containerReader, header, 8)
-
-               if readerr == nil {
-                       readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
-                       if header[0] == 1 {
-                               // stdout
-                               _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
-                       } else {
-                               // stderr
-                               _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
+       var err error
+       for err == nil {
+               _, err = io.ReadAtLeast(containerReader, header, 8)
+               if err != nil {
+                       if err == io.EOF {
+                               err = nil
                        }
+                       break
                }
+               readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
+               if header[0] == 1 {
+                       // stdout
+                       _, err = io.CopyN(runner.Stdout, containerReader, readsize)
+               } else {
+                       // stderr
+                       _, err = io.CopyN(runner.Stderr, containerReader, readsize)
+               }
+       }
 
-               if readerr != nil {
-                       if readerr != io.EOF {
-                               runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
-                       }
-
-                       closeerr := runner.Stdout.Close()
-                       if closeerr != nil {
-                               runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
-                       }
+       if err != nil {
+               runner.CrunchLog.Printf("error reading docker logs: %v", err)
+       }
 
-                       closeerr = runner.Stderr.Close()
-                       if closeerr != nil {
-                               runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
-                       }
+       err = runner.Stdout.Close()
+       if err != nil {
+               runner.CrunchLog.Printf("error closing stdout logs: %v", err)
+       }
 
-                       if runner.statReporter != nil {
-                               runner.statReporter.Stop()
-                               closeerr = runner.statLogger.Close()
-                               if closeerr != nil {
-                                       runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
-                               }
-                       }
+       err = runner.Stderr.Close()
+       if err != nil {
+               runner.CrunchLog.Printf("error closing stderr logs: %v", err)
+       }
 
-                       runner.loggingDone <- true
-                       close(runner.loggingDone)
-                       return
+       if runner.statReporter != nil {
+               runner.statReporter.Stop()
+               err = runner.statLogger.Close()
+               if err != nil {
+                       runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
                }
        }
 }
 
-func (runner *ContainerRunner) StartCrunchstat() {
+func (runner *ContainerRunner) stopHoststat() error {
+       if runner.hoststatReporter == nil {
+               return nil
+       }
+       runner.hoststatReporter.Stop()
+       err := runner.hoststatLogger.Close()
+       if err != nil {
+               return fmt.Errorf("error closing hoststat logs: %v", err)
+       }
+       return nil
+}
+
+func (runner *ContainerRunner) startHoststat() {
+       runner.hoststatLogger = NewThrottledLogger(runner.NewLogWriter("hoststat"))
+       runner.hoststatReporter = &crunchstat.Reporter{
+               Logger:     log.New(runner.hoststatLogger, "", 0),
+               CgroupRoot: runner.cgroupRoot,
+               PollPeriod: runner.statInterval,
+       }
+       runner.hoststatReporter.Start()
+}
+
+func (runner *ContainerRunner) startCrunchstat() {
        runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
        runner.statReporter = &crunchstat.Reporter{
                CID:          runner.ContainerID,
@@ -973,46 +1015,38 @@ func (runner *ContainerRunner) StartContainer() error {
                }
                return fmt.Errorf("could not start container: %v%s", err, advice)
        }
-       runner.cStarted = true
        return nil
 }
 
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() (err error) {
+func (runner *ContainerRunner) WaitFinish() error {
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+       waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+       arvMountExit := runner.ArvMountExit
+       for {
+               select {
+               case waitBody := <-waitOk:
+                       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+                       code := int(waitBody.StatusCode)
+                       runner.ExitCode = &code
+
+                       // wait for stdout/stderr to complete
+                       <-runner.loggingDone
+                       return nil
+
+               case err := <-waitErr:
+                       return fmt.Errorf("container wait: %v", err)
 
-       go func() {
-               <-runner.ArvMountExit
-               if runner.cStarted {
+               case <-arvMountExit:
                        runner.CrunchLog.Printf("arv-mount exited while container is still running.  Stopping container.")
                        runner.stop()
+                       // arvMountExit will always be ready now that
+                       // it's closed, but that doesn't interest us.
+                       arvMountExit = nil
                }
-       }()
-
-       var waitBody dockercontainer.ContainerWaitOKBody
-       select {
-       case waitBody = <-waitOk:
-       case err = <-waitErr:
-       }
-
-       // Container isn't running any more
-       runner.cStarted = false
-
-       if err != nil {
-               return fmt.Errorf("container wait: %v", err)
        }
-
-       runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
-       code := int(waitBody.StatusCode)
-       runner.ExitCode = &code
-
-       // wait for stdout/stderr to complete
-       <-runner.loggingDone
-
-       return nil
 }
 
 var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
@@ -1141,7 +1175,7 @@ func (runner *ContainerRunner) UploadOutputFile(
        // go through mounts and try reverse map to collection reference
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+               if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable {
                        // get path relative to bind
                        targetSuffix := tgt[len(bind):]
 
@@ -1191,10 +1225,6 @@ func (runner *ContainerRunner) UploadOutputFile(
 
 // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
 func (runner *ContainerRunner) CaptureOutput() error {
-       if runner.finalState != "Complete" {
-               return nil
-       }
-
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
                // Output may have been set directly by the container, so
                // refresh the container record to check.
@@ -1284,7 +1314,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
                        continue
                }
 
-               if mnt.ExcludeFromOutput == true {
+               if mnt.ExcludeFromOutput == true || mnt.Writable {
                        continue
                }
 
@@ -1406,10 +1436,8 @@ func (runner *ContainerRunner) CleanupDirs() {
                }
        }
 
-       for _, tmpdir := range runner.CleanupTempDir {
-               if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
-                       runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
-               }
+       if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
+               runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", runner.parentTemp, rmerr)
        }
 }
 
@@ -1579,6 +1607,7 @@ func (runner *ContainerRunner) Run() (err error) {
                }
 
                checkErr(runner.CaptureOutput())
+               checkErr(runner.stopHoststat())
                checkErr(runner.CommitLogs())
                checkErr(runner.UpdateContainerFinal())
        }()
@@ -1587,9 +1616,8 @@ func (runner *ContainerRunner) Run() (err error) {
        if err != nil {
                return
        }
-
-       // setup signal handling
        runner.setupSignals()
+       runner.startHoststat()
 
        // check for and/or load image
        err = runner.LoadImage()
@@ -1638,7 +1666,7 @@ func (runner *ContainerRunner) Run() (err error) {
        }
        runner.finalState = "Cancelled"
 
-       runner.StartCrunchstat()
+       runner.startCrunchstat()
 
        err = runner.StartContainer()
        if err != nil {
@@ -1647,7 +1675,7 @@ func (runner *ContainerRunner) Run() (err error) {
        }
 
        err = runner.WaitFinish()
-       if err == nil {
+       if err == nil && !runner.IsCancelled() {
                runner.finalState = "Complete"
        }
        return
@@ -1739,10 +1767,8 @@ func main() {
        // API version 1.21 corresponds to Docker 1.9, which is currently the
        // minimum version we want to support.
        docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-       dockerClientProxy := ThinDockerClientProxy{Docker: docker}
-
-       cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
 
+       cr := NewContainerRunner(api, kc, docker, containerId)
        if dockererr != nil {
                cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
                cr.checkBrokenNode(dockererr)
@@ -1750,6 +1776,12 @@ func main() {
                os.Exit(1)
        }
 
+       parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
+       if tmperr != nil {
+               log.Fatalf("%s: %v", containerId, tmperr)
+       }
+
+       cr.parentTemp = parentTemp
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent
index 4979cf8a0c801ee5bc56e48933e97f736dc3d00d..94b713355dd10a13e2563f701589f39874d8ac12 100644 (file)
@@ -7,7 +7,6 @@ package main
 import (
        "bufio"
        "bytes"
-       "context"
        "crypto/md5"
        "encoding/json"
        "errors"
@@ -17,7 +16,6 @@ import (
        "net"
        "os"
        "os/exec"
-       "path/filepath"
        "runtime/pprof"
        "sort"
        "strings"
@@ -30,6 +28,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "golang.org/x/net/context"
 
        dockertypes "github.com/docker/docker/api/types"
        dockercontainer "github.com/docker/docker/api/types/container"
@@ -42,11 +41,17 @@ func TestCrunchExec(t *testing.T) {
        TestingT(t)
 }
 
-type TestSuite struct{}
-
 // Gocheck boilerplate
 var _ = Suite(&TestSuite{})
 
+type TestSuite struct {
+       docker *TestDockerClient
+}
+
+func (s *TestSuite) SetUpTest(c *C) {
+       s.docker = NewTestDockerClient()
+}
+
 type ArvTestClient struct {
        Total   int64
        Calls   int
@@ -88,18 +93,18 @@ type TestDockerClient struct {
        logReader   io.ReadCloser
        logWriter   io.WriteCloser
        fn          func(t *TestDockerClient)
-       finish      int
+       exitCode    int
        stop        chan bool
        cwd         string
        env         []string
        api         *ArvTestClient
        realTemp    string
+       calledWait  bool
 }
 
-func NewTestDockerClient(exitCode int) *TestDockerClient {
+func NewTestDockerClient() *TestDockerClient {
        t := &TestDockerClient{}
        t.logReader, t.logWriter = io.Pipe()
-       t.finish = exitCode
        t.stop = make(chan bool, 1)
        t.cwd = "/"
        return t
@@ -131,16 +136,16 @@ func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockerco
 }
 
 func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
-       if t.finish == 3 {
+       if t.exitCode == 3 {
                return errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`)
        }
-       if t.finish == 4 {
+       if t.exitCode == 4 {
                return errors.New(`panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`)
        }
-       if t.finish == 5 {
+       if t.exitCode == 5 {
                return errors.New(`Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`)
        }
-       if t.finish == 6 {
+       if t.exitCode == 6 {
                return errors.New(`Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`)
        }
 
@@ -152,25 +157,24 @@ func (t *TestDockerClient) ContainerStart(ctx context.Context, container string,
        }
 }
 
-func (t *TestDockerClient) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+func (t *TestDockerClient) ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error {
        t.stop <- true
        return nil
 }
 
 func (t *TestDockerClient) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
-       body := make(chan dockercontainer.ContainerWaitOKBody)
+       t.calledWait = true
+       body := make(chan dockercontainer.ContainerWaitOKBody, 1)
        err := make(chan error)
        go func() {
                t.fn(t)
-               body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.finish)}
-               close(body)
-               close(err)
+               body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.exitCode)}
        }()
        return body, err
 }
 
 func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
-       if t.finish == 2 {
+       if t.exitCode == 2 {
                return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
        }
 
@@ -182,7 +186,7 @@ func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string
 }
 
 func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
-       if t.finish == 2 {
+       if t.exitCode == 2 {
                return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
        }
        _, err := io.Copy(ioutil.Discard, input)
@@ -396,8 +400,7 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
 
 func (s *TestSuite) TestLoadImage(c *C) {
        kc := &KeepTestClient{}
-       docker := NewTestDockerClient(0)
-       cr := NewContainerRunner(&ArvTestClient{}, kc, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
        _, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
@@ -512,8 +515,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
        // (2) Keep error
-       docker := NewTestDockerClient(0)
-       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.Container.ContainerImage = hwPDH
 
        err := cr.LoadImage()
@@ -531,8 +533,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
        // (4) Collection doesn't contain image
-       docker := NewTestDockerClient(0)
-       cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.Container.ContainerImage = hwPDH
 
        err := cr.LoadImage()
@@ -572,12 +573,11 @@ func dockerLog(fd byte, msg string) []byte {
 }
 
 func (s *TestSuite) TestRunContainer(c *C) {
-       docker := NewTestDockerClient(0)
-       docker.fn = func(t *TestDockerClient) {
+       s.docker.fn = func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, "Hello world\n"))
                t.logWriter.Close()
        }
-       cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
        var logs TestLogs
        cr.NewLogWriter = logs.NewTestLoggingWriter
@@ -667,18 +667,18 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
 
 // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
 // dress rehearsal of the Run() function, starting from a JSON container record.
-func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
+func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
        rec := arvados.Container{}
        err := json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient(exitCode)
-       docker.fn = fn
-       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+       s.docker.exitCode = exitCode
+       s.docker.fn = fn
+       s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api = &ArvTestClient{Container: rec}
-       docker.api = api
-       cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       s.docker.api = api
+       cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.statInterval = 100 * time.Millisecond
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
@@ -687,7 +687,7 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f
        c.Assert(err, IsNil)
        defer os.RemoveAll(realTemp)
 
-       docker.realTemp = realTemp
+       s.docker.realTemp = realTemp
 
        tempcount := 0
        cr.MkTempDir = func(_ string, prefix string) (string, error) {
@@ -730,7 +730,7 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f
 }
 
 func (s *TestSuite) TestFullRunHello(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -751,7 +751,7 @@ func (s *TestSuite) TestFullRunHello(c *C) {
 }
 
 func (s *TestSuite) TestCrunchstat(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
                "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
                "cwd": ".",
@@ -784,7 +784,7 @@ func (s *TestSuite) TestCrunchstat(c *C) {
 
 func (s *TestSuite) TestNodeInfoLog(c *C) {
        os.Setenv("SLURMD_NODENAME", "compute2")
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
                "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
                "cwd": ".",
@@ -818,7 +818,7 @@ func (s *TestSuite) TestNodeInfoLog(c *C) {
 }
 
 func (s *TestSuite) TestContainerRecordLog(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
                "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
                "cwd": ".",
@@ -841,7 +841,7 @@ func (s *TestSuite) TestContainerRecordLog(c *C) {
 }
 
 func (s *TestSuite) TestFullRunStderr(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -866,7 +866,7 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
 }
 
 func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["pwd"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -887,7 +887,7 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
 }
 
 func (s *TestSuite) TestFullRunSetCwd(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["pwd"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -909,7 +909,7 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
 func (s *TestSuite) TestStopOnSignal(c *C) {
        s.testStopContainer(c, func(cr *ContainerRunner) {
                go func() {
-                       for !cr.cStarted {
+                       for !s.docker.calledWait {
                                time.Sleep(time.Millisecond)
                        }
                        cr.SigChan <- syscall.SIGINT
@@ -943,16 +943,15 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
        err := json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient(0)
-       docker.fn = func(t *TestDockerClient) {
+       s.docker.fn = func(t *TestDockerClient) {
                <-t.stop
                t.logWriter.Write(dockerLog(1, "foo\n"))
                t.logWriter.Close()
        }
-       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+       s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api := &ArvTestClient{Container: rec}
-       cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr := NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
        setup(cr)
 
@@ -978,7 +977,7 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
 }
 
 func (s *TestSuite) TestFullRunSetEnv(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $FROBIZ"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -1029,6 +1028,8 @@ func (s *TestSuite) TestSetupMounts(c *C) {
        c.Assert(err, IsNil)
        stubCertPath := stubCert(certTemp)
 
+       cr.parentTemp = realTemp
+
        defer os.RemoveAll(realTemp)
        defer os.RemoveAll(certTemp)
 
@@ -1045,11 +1046,12 @@ func (s *TestSuite) TestSetupMounts(c *C) {
        }
 
        checkEmpty := func() {
-               filepath.Walk(realTemp, func(path string, _ os.FileInfo, err error) error {
-                       c.Check(path, Equals, realTemp)
-                       c.Check(err, IsNil)
-                       return nil
-               })
+               // Should be deleted.
+               _, err := os.Stat(realTemp)
+               c.Assert(os.IsNotExist(err), Equals, true)
+
+               // Now recreate it for the next test.
+               c.Assert(os.Mkdir(realTemp, 0777), IsNil)
        }
 
        {
@@ -1064,7 +1066,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
                        "--read-write", "--crunchstat-interval=5",
                        "--mount-by-pdh", "by_id", realTemp + "/keep1"})
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp"})
+               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp"})
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
@@ -1083,7 +1085,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
                        "--read-write", "--crunchstat-interval=5",
                        "--mount-by-pdh", "by_id", realTemp + "/keep1"})
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/out", realTemp + "/3:/tmp"})
+               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/out", realTemp + "/tmp3:/tmp"})
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
@@ -1104,7 +1106,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
                        "--read-write", "--crunchstat-interval=5",
                        "--mount-by-pdh", "by_id", realTemp + "/keep1"})
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"})
+               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"})
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
@@ -1200,8 +1202,8 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                err := cr.SetupMounts()
                c.Check(err, IsNil)
                sort.StringSlice(cr.Binds).Sort()
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2/mountdata.json:/mnt/test.json:ro"})
-               content, err := ioutil.ReadFile(realTemp + "/2/mountdata.json")
+               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/json2/mountdata.json:/mnt/test.json:ro"})
+               content, err := ioutil.ReadFile(realTemp + "/json2/mountdata.json")
                c.Check(err, IsNil)
                c.Check(content, DeepEquals, []byte(test.out))
                os.RemoveAll(cr.ArvMountPoint)
@@ -1227,26 +1229,42 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
                        "--read-write", "--crunchstat-interval=5",
                        "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
-               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"})
+               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"})
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
        }
 
-       // Writable mount points are not allowed underneath output_dir mount point
+       // Writable mount points copied to output_dir mount point
        {
                i = 0
                cr.ArvMountPoint = ""
                cr.Container.Mounts = make(map[string]arvados.Mount)
                cr.Container.Mounts = map[string]arvados.Mount{
-                       "/tmp":     {Kind: "tmp"},
-                       "/tmp/foo": {Kind: "collection", Writable: true},
+                       "/tmp": {Kind: "tmp"},
+                       "/tmp/foo": {Kind: "collection",
+                               PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53",
+                               Writable:         true},
+                       "/tmp/bar": {Kind: "collection",
+                               PortableDataHash: "59389a8f9ee9d399be35462a0f92541d+53",
+                               Path:             "baz",
+                               Writable:         true},
                }
                cr.OutputPath = "/tmp"
 
+               os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+               os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz", os.ModePerm)
+
+               rf, _ := os.Create(realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz/quux")
+               rf.Write([]byte("bar"))
+               rf.Close()
+
                err := cr.SetupMounts()
-               c.Check(err, NotNil)
-               c.Check(err, ErrorMatches, `Writable mount points are not permitted underneath the output_path.*`)
+               c.Check(err, IsNil)
+               _, err = os.Stat(cr.HostOutputDir + "/foo")
+               c.Check(err, IsNil)
+               _, err = os.Stat(cr.HostOutputDir + "/bar/quux")
+               c.Check(err, IsNil)
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()
@@ -1359,7 +1377,7 @@ func (s *TestSuite) TestStdout(c *C) {
                "runtime_constraints": {}
        }`
 
-       api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1370,17 +1388,16 @@ func (s *TestSuite) TestStdout(c *C) {
 }
 
 // Used by the TestStdoutWithWrongPath*()
-func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
+func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
        rec := arvados.Container{}
        err = json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient(0)
-       docker.fn = fn
-       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+       s.docker.fn = fn
+       s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api = &ArvTestClient{Container: rec}
-       cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
 
@@ -1389,7 +1406,7 @@ func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (ap
 }
 
 func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
-       _, _, err := StdoutErrorRunHelper(c, `{
+       _, _, err := s.stdoutErrorRunHelper(c, `{
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} },
     "output_path": "/tmp"
 }`, func(t *TestDockerClient) {})
@@ -1399,7 +1416,7 @@ func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
 }
 
 func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
-       _, _, err := StdoutErrorRunHelper(c, `{
+       _, _, err := s.stdoutErrorRunHelper(c, `{
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} },
     "output_path": "/tmp"
 }`, func(t *TestDockerClient) {})
@@ -1409,7 +1426,7 @@ func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
 }
 
 func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
-       _, _, err := StdoutErrorRunHelper(c, `{
+       _, _, err := s.stdoutErrorRunHelper(c, `{
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} },
     "output_path": "/tmp"
 }`, func(t *TestDockerClient) {})
@@ -1421,7 +1438,7 @@ func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
 func (s *TestSuite) TestFullRunWithAPI(c *C) {
        os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
        defer os.Unsetenv("ARVADOS_API_HOST")
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -1444,7 +1461,7 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
 func (s *TestSuite) TestFullRunSetOutput(c *C) {
        os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
        defer os.Unsetenv("ARVADOS_API_HOST")
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
@@ -1484,7 +1501,7 @@ func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C
 
        extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1519,12 +1536,12 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
                "a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
        }
 
-       api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, runner, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
 
-       c.Check(runner.Binds, DeepEquals, []string{realtemp + "/2:/tmp",
+       c.Check(runner.Binds, DeepEquals, []string{realtemp + "/tmp2:/tmp",
                realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/file2_in_main.txt:/tmp/foo/bar:ro",
                realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt:/tmp/foo/baz/sub2file2:ro",
                realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1:/tmp/foo/sub1:ro",
@@ -1571,7 +1588,7 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
                "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
        }
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1612,12 +1629,12 @@ func (s *TestSuite) TestOutputSymlinkToInput(c *C) {
                "a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
        }
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
-               os.Symlink("/keep/foo/sub1file2", t.realTemp+"/2/baz")
-               os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/2/baz2")
-               os.Symlink("/keep/foo2/subdir1", t.realTemp+"/2/baz3")
-               os.Mkdir(t.realTemp+"/2/baz4", 0700)
-               os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/2/baz4/baz5")
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+               os.Symlink("/keep/foo/sub1file2", t.realTemp+"/tmp2/baz")
+               os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz2")
+               os.Symlink("/keep/foo2/subdir1", t.realTemp+"/tmp2/baz3")
+               os.Mkdir(t.realTemp+"/tmp2/baz4", 0700)
+               os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz4/baz5")
                t.logWriter.Close()
        })
 
@@ -1654,8 +1671,8 @@ func (s *TestSuite) TestOutputError(c *C) {
 
        extraMounts := []string{}
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
-               os.Symlink("/etc/hosts", t.realTemp+"/2/baz")
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+               os.Symlink("/etc/hosts", t.realTemp+"/tmp2/baz")
                t.logWriter.Close()
        })
 
@@ -1678,22 +1695,22 @@ func (s *TestSuite) TestOutputSymlinkToOutput(c *C) {
 
        extraMounts := []string{}
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
-               rf, _ := os.Create(t.realTemp + "/2/realfile")
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+               rf, _ := os.Create(t.realTemp + "/tmp2/realfile")
                rf.Write([]byte("foo"))
                rf.Close()
 
-               os.Mkdir(t.realTemp+"/2/realdir", 0700)
-               rf, _ = os.Create(t.realTemp + "/2/realdir/subfile")
+               os.Mkdir(t.realTemp+"/tmp2/realdir", 0700)
+               rf, _ = os.Create(t.realTemp + "/tmp2/realdir/subfile")
                rf.Write([]byte("bar"))
                rf.Close()
 
-               os.Symlink("/tmp/realfile", t.realTemp+"/2/file1")
-               os.Symlink("realfile", t.realTemp+"/2/file2")
-               os.Symlink("/tmp/file1", t.realTemp+"/2/file3")
-               os.Symlink("file2", t.realTemp+"/2/file4")
-               os.Symlink("realdir", t.realTemp+"/2/dir1")
-               os.Symlink("/tmp/realdir", t.realTemp+"/2/dir2")
+               os.Symlink("/tmp/realfile", t.realTemp+"/tmp2/file1")
+               os.Symlink("realfile", t.realTemp+"/tmp2/file2")
+               os.Symlink("/tmp/file1", t.realTemp+"/tmp2/file3")
+               os.Symlink("file2", t.realTemp+"/tmp2/file4")
+               os.Symlink("realdir", t.realTemp+"/tmp2/dir1")
+               os.Symlink("/tmp/realdir", t.realTemp+"/tmp2/dir2")
                t.logWriter.Close()
        })
 
@@ -1735,7 +1752,7 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
                "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
        }
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1770,7 +1787,7 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
                "runtime_constraints": {}
        }`
 
-       api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+       api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
        })
@@ -1790,7 +1807,7 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
 }
 
 func (s *TestSuite) TestStderrMount(c *C) {
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo hello;exit 1"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1887,7 +1904,7 @@ exec echo killme
        ech := tf.Name()
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1912,7 +1929,7 @@ func (s *TestSuite) TestFullBrokenDocker2(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1935,7 +1952,7 @@ func (s *TestSuite) TestFullBrokenDocker3(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1957,7 +1974,7 @@ func (s *TestSuite) TestBadCommand1(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -1979,7 +1996,7 @@ func (s *TestSuite) TestBadCommand2(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
@@ -2001,7 +2018,7 @@ func (s *TestSuite) TestBadCommand3(c *C) {
        ech := ""
        brokenNodeHook = &ech
 
-       api, _, _ := FullRunHelper(c, `{
+       api, _, _ := s.fullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
index becd66975f676a76b09eb8da6761582fd2e94b5e..34fd594be2dd6ce2793eebe1ac884c92aa8fdd58 100644 (file)
@@ -1028,64 +1028,71 @@ class SharedDirectory(Directory):
         self.current_user = api.users().current().execute(num_retries=num_retries)
         self._poll = True
         self._poll_time = poll_time
+        self._updating_lock = threading.Lock()
 
     @use_counter
     def update(self):
-        with llfuse.lock_released:
-            all_projects = arvados.util.list_all(
-                self.api.groups().list, self.num_retries,
-                filters=[['group_class','=','project']])
-            objects = {}
-            for ob in all_projects:
-                objects[ob['uuid']] = ob
-
-            roots = []
-            root_owners = {}
-            for ob in all_projects:
-                if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
-                    roots.append(ob)
-                    root_owners[ob['owner_uuid']] = True
-
-            lusers = arvados.util.list_all(
-                self.api.users().list, self.num_retries,
-                filters=[['uuid','in', list(root_owners)]])
-            lgroups = arvados.util.list_all(
-                self.api.groups().list, self.num_retries,
-                filters=[['uuid','in', list(root_owners)]])
-
-            users = {}
-            groups = {}
-
-            for l in lusers:
-                objects[l["uuid"]] = l
-            for l in lgroups:
-                objects[l["uuid"]] = l
-
-            contents = {}
-            for r in root_owners:
-                if r in objects:
-                    obr = objects[r]
-                    if obr.get("name"):
-                        contents[obr["name"]] = obr
-                    #elif obr.get("username"):
-                    #    contents[obr["username"]] = obr
-                    elif "first_name" in obr:
-                        contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
-
-
-            for r in roots:
-                if r['owner_uuid'] not in objects:
-                    contents[r['name']] = r
-
-        # end with llfuse.lock_released, re-acquire lock
-
         try:
+            with llfuse.lock_released:
+                self._updating_lock.acquire()
+                if not self.stale():
+                    return
+
+                all_projects = arvados.util.list_all(
+                    self.api.groups().list, self.num_retries,
+                    filters=[['group_class','=','project']],
+                    select=["uuid", "owner_uuid"])
+                objects = {}
+                for ob in all_projects:
+                    objects[ob['uuid']] = ob
+
+                roots = []
+                root_owners = set()
+                current_uuid = self.current_user['uuid']
+                for ob in all_projects:
+                    if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
+                        roots.append(ob['uuid'])
+                        root_owners.add(ob['owner_uuid'])
+
+                lusers = arvados.util.list_all(
+                    self.api.users().list, self.num_retries,
+                    filters=[['uuid','in', list(root_owners)]])
+                lgroups = arvados.util.list_all(
+                    self.api.groups().list, self.num_retries,
+                    filters=[['uuid','in', list(root_owners)+roots]])
+
+                for l in lusers:
+                    objects[l["uuid"]] = l
+                for l in lgroups:
+                    objects[l["uuid"]] = l
+
+                contents = {}
+                for r in root_owners:
+                    if r in objects:
+                        obr = objects[r]
+                        if obr.get("name"):
+                            contents[obr["name"]] = obr
+                        #elif obr.get("username"):
+                        #    contents[obr["username"]] = obr
+                        elif "first_name" in obr:
+                            contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
+
+                for r in roots:
+                    if r in objects:
+                        obr = objects[r]
+                        if obr['owner_uuid'] not in objects:
+                            contents[obr["name"]] = obr
+
+            # end with llfuse.lock_released, re-acquire lock
+
             self.merge(contents.items(),
                        lambda i: i[0],
                        lambda a, i: a.uuid() == i[1]['uuid'],
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
         except Exception:
-            _logger.exception()
+            _logger.exception("arv-mount shared dir error")
+        finally:
+            self._updating_lock.release()
 
     def want_event_subscribe(self):
         return True
index c64ac7a8fe1e2645b11e0917c3c5b9bcaab54369..62c856da340183d0483503e0a2393816acaf0321 100644 (file)
@@ -402,7 +402,17 @@ func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) err
        }()
        errChan := make(chan error)
        go func() {
-               errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bufr, nil)
+               var body io.Reader = bufr
+               if len(block) == 0 {
+                       // We must send a "Content-Length: 0" header,
+                       // but the http client interprets
+                       // ContentLength==0 as "unknown" unless it can
+                       // confirm by introspection that Body will
+                       // read 0 bytes.
+                       body = http.NoBody
+                       bufr.Close()
+               }
+               errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), body, nil)
        }()
        select {
        case <-ctx.Done():
@@ -722,7 +732,9 @@ func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[
 
 func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
        c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
-       rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
+       if size != 0 {
+               rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
+       }
        err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
        c.stats.TickErr(err)
        return err
index 4256ec0d0cb599e259ff7cabcc6f3407fd2e6dce..06216edcb82aeddc3617c121cecc4e1a6387be4a 100644 (file)
@@ -124,6 +124,11 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                return
        }
 
+       if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
+               rw.WriteHeader(http.StatusLengthRequired)
+               return
+       }
+
        body, err := ioutil.ReadAll(r.Body)
        if err != nil {
                return
index 83d507b62b4931163b73dded52240705ebd2ae70..18d430dd7cd037b958e592533bdc9e2be53aadd1 100644 (file)
@@ -90,6 +90,7 @@ ADD crunch-setup.sh gitolite.rc \
     keep-setup.sh common.sh createusers.sh \
     logger runsu.sh waitforpostgres.sh \
     application_yml_override.py api-setup.sh \
+    go-setup.sh \
     /usr/local/lib/arvbox/
 
 ADD runit /etc/runit
index 80344c16f2ef9bfcf0f97bf14f07a8c1cb97ce73..7cb51edfdc61ab6c9b67b29353eeb2f79e8774eb 100644 (file)
@@ -24,13 +24,16 @@ RUN echo "production" > /var/lib/arvados/workbench_rails_env
 
 RUN chown -R 1000:1000 /usr/src && /usr/local/lib/arvbox/createusers.sh
 
+RUN sudo -u arvbox /var/lib/arvbox/service/composer/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/keep-web/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/sso/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/api/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/workbench/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/doc/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/vm/run-service --only-deps
-RUN sudo -u arvbox /var/lib/arvbox/service/keep-web/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/keepproxy/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/arv-git-httpd/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/crunch-dispatch-local/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/websockets/run-service --only-deps
 RUN sudo -u arvbox /usr/local/lib/arvbox/keep-setup.sh --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/sdk/run-service
index 30ecafb889c7e38acb45380d68c5f8634bcb9c3c..b3ec5cd10441f695522c50500a2e64fd3f6d8f5d 100755 (executable)
@@ -7,16 +7,11 @@ exec 2>&1
 set -eux -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunchstat"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/sdk/go/crunchrunner"
-install bin/crunchstat bin/crunchrunner /usr/local/bin
+install $GOPATH/bin/crunchstat $GOPATH/bin/crunchrunner /usr/local/bin
 
 if test -s /var/lib/arvados/api_rails_env ; then
   RAILS_ENV=$(cat /var/lib/arvados/api_rails_env)
diff --git a/tools/arvbox/lib/arvbox/docker/go-setup.sh b/tools/arvbox/lib/arvbox/docker/go-setup.sh
new file mode 100644 (file)
index 0000000..f068ce6
--- /dev/null
@@ -0,0 +1,16 @@
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+mkdir -p /var/lib/gopath
+cd /var/lib/gopath
+
+export GOPATH=$PWD
+mkdir -p "$GOPATH/src/git.curoverse.com"
+ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
+
+flock /var/lib/gopath/gopath.lock go get -t github.com/kardianos/govendor
+cd "$GOPATH/src/git.curoverse.com/arvados.git"
+flock /var/lib/gopath/gopath.lock go get -v -d ...
+flock /var/lib/gopath/gopath.lock "$GOPATH/bin/govendor" sync
index 5da2cfac440c6a4aafa20359ea718478d12816b3..8ef66a60687ce817e46308311dbcd4d80c6691ad 100755 (executable)
@@ -8,15 +8,10 @@ sleep 2
 set -eux -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keepstore"
-install bin/keepstore /usr/local/bin
+install $GOPATH/bin/keepstore /usr/local/bin
 
 if test "$1" = "--only-deps" ; then
     exit
index 806f9cd37a375e17ffeacc0943d03beb8ded614e..1383f7140f4ed961637d8c8ef160bfb3b575d317 100755 (executable)
@@ -7,15 +7,10 @@ exec 2>&1
 set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/arv-git-httpd"
-install bin/arv-git-httpd /usr/local/bin
+install $GOPATH/bin/arv-git-httpd /usr/local/bin
 
 if test "$1" = "--only-deps" ; then
     exit
index ac4441de099ab37e1a0a36109807cf9e127db507..abd350f073c0f449b37b25362185b9b24a963136 100755 (executable)
@@ -10,13 +10,13 @@ set -ex -o pipefail
 
 cd /usr/src/composer
 
-npm -d install yarn
-
-PATH=$PATH:/usr/src/composer/node_modules/.bin
+npm -d install --prefix /usr/local --global yarn
 
 yarn install
 
-if test "$1" != "--only-deps" ; then
-    echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/composer.yml
-    exec ng serve --host 0.0.0.0 --port 4200 --env=webdev
+if test "$1" = "--only-deps" ; then
+    exit
 fi
+
+echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/composer.yml
+exec node_modules/.bin/ng serve --host 0.0.0.0 --port 4200 --env=webdev
index e7a302682155b751fe46a1151b5673672327f3d7..decbccddeeecce662a0e353da0dd01c26ce91021 100755 (executable)
@@ -4,19 +4,18 @@
 # SPDX-License-Identifier: AGPL-3.0
 
 exec 2>&1
-set -eux -o pipefail
+set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunch-run"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunch-dispatch-local"
-install bin/crunch-run bin/crunch-dispatch-local /usr/local/bin
+install $GOPATH/bin/crunch-run $GOPATH/bin/crunch-dispatch-local /usr/local/bin
+
+if test "$1" = "--only-deps" ; then
+    exit
+fi
 
 cat > /usr/local/bin/crunch-run.sh <<EOF
 #!/bin/sh
index ee985a0e51b512d3263dcf7d72b7b4cdc530bd9f..70f2470b9fe7decd8a03efdfb09d5da8ab52f372 100755 (executable)
@@ -7,15 +7,10 @@ exec 2>&1
 set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keep-web"
-install bin/keep-web /usr/local/bin
+install $GOPATH/bin/keep-web /usr/local/bin
 
 if test "$1" = "--only-deps" ; then
     exit
index dae2dfdd786589a3023e0143a5f6c7f8b3711c0a..199247b7a0e2bfc6dcabdd929dc5177275f730bc 100755 (executable)
@@ -8,15 +8,10 @@ sleep 2
 set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keepproxy"
-install bin/keepproxy /usr/local/bin
+install $GOPATH/bin/keepproxy /usr/local/bin
 
 if test "$1" = "--only-deps" ; then
     exit
index 45407cb9ab48d34c7baa7811aef92376ed759d0b..3ef78ee45575676dc881059efd60cc57bd64cbd9 100755 (executable)
@@ -5,6 +5,8 @@
 
 flock /var/lib/arvados/createusers.lock /usr/local/lib/arvbox/createusers.sh
 
+make-ssl-cert generate-default-snakeoil --force-overwrite
+
 . /usr/local/lib/arvbox/common.sh
 
 chown -R $PGUSER:$PGGROUP /var/lib/postgresql
index cb56ac7f4de5dbb7f3ad6d22c6b8166933a32f81..2d01d907985c0c9ca6e0cf1e39969e1b4ce2d7fd 100755 (executable)
@@ -14,14 +14,10 @@ else
   RAILS_ENV=development
 fi
 
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
+. /usr/local/lib/arvbox/go-setup.sh
 
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
 flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/ws"
-install bin/ws /usr/local/bin/arvados-ws
+install $GOPATH/bin/ws /usr/local/bin/arvados-ws
 
 if test "$1" = "--only-deps" ; then
     exit
index ebc40b13cb8c2ad2ec8152df975d6a0863ffc45b..10569b2e139b89a2904820ab62dab6cbc3a747b5 100644 (file)
@@ -307,7 +307,7 @@ func doMain(cfg *ConfigParams) error {
                }
                userIDToUUID[uID] = u.UUID
                if cfg.Verbose {
-                       log.Printf("Seen user %q (%s)", u.Username, u.Email)
+                       log.Printf("Seen user %q (%s)", u.Username, u.UUID)
                }
        }
 
@@ -317,6 +317,11 @@ func doMain(cfg *ConfigParams) error {
                return err
        }
        log.Printf("Found %d remote groups", len(remoteGroups))
+       if cfg.Verbose {
+               for groupUUID := range remoteGroups {
+                       log.Printf("- Group %q: %d users", remoteGroups[groupUUID].Group.Name, len(remoteGroups[groupUUID].PreviousMembers))
+               }
+       }
 
        membershipsRemoved := 0
 
@@ -504,9 +509,9 @@ func GetRemoteGroups(cfg *ConfigParams, allUsers map[string]arvados.User) (remot
                                Operator: "=",
                                Operand:  group.UUID,
                        }, {
-                               Attr:     "head_kind",
-                               Operator: "=",
-                               Operand:  "arvados#user",
+                               Attr:     "head_uuid",
+                               Operator: "like",
+                               Operand:  "%-tpzed-%",
                        }},
                }
                // User -> Group filter
@@ -528,9 +533,9 @@ func GetRemoteGroups(cfg *ConfigParams, allUsers map[string]arvados.User) (remot
                                Operator: "=",
                                Operand:  group.UUID,
                        }, {
-                               Attr:     "tail_kind",
-                               Operator: "=",
-                               Operand:  "arvados#user",
+                               Attr:     "tail_uuid",
+                               Operator: "like",
+                               Operand:  "%-tpzed-%",
                        }},
                }
                g2uLinks, err := GetAll(cfg.Client, "links", g2uFilter, &LinkList{})
@@ -579,7 +584,7 @@ func GetRemoteGroups(cfg *ConfigParams, allUsers map[string]arvados.User) (remot
 // RemoveMemberFromGroup remove all links related to the membership
 func RemoveMemberFromGroup(cfg *ConfigParams, user arvados.User, group arvados.Group) error {
        if cfg.Verbose {
-               log.Printf("Getting group membership links for user %q (%s) on group %q (%s)", user.Email, user.UUID, group.Name, group.UUID)
+               log.Printf("Getting group membership links for user %q (%s) on group %q (%s)", user.Username, user.UUID, group.Name, group.UUID)
        }
        var links []interface{}
        // Search for all group<->user links (both ways)
index e776648a803736581dcb61724631ac844cdca68f..4a3e470c42f2b56f82199938d857bd295e564e9a 100644 (file)
@@ -83,7 +83,6 @@ func (s *TestSuite) SetUpTest(c *C) {
        c.Assert(len(s.users), Not(Equals), 0)
 }
 
-// Clean any membership link and remote group created by the test
 func (s *TestSuite) TearDownTest(c *C) {
        var dst interface{}
        // Reset database to fixture state after every test run.
@@ -93,7 +92,7 @@ func (s *TestSuite) TearDownTest(c *C) {
 
 var _ = Suite(&TestSuite{})
 
-// MakeTempCVSFile creates a temp file with data as comma separated values
+// MakeTempCSVFile creates a temp file with data as comma separated values
 func MakeTempCSVFile(data [][]string) (f *os.File, err error) {
        f, err = ioutil.TempFile("", "test_sync_remote_groups")
        if err != nil {
@@ -266,11 +265,15 @@ func (s *TestSuite) TestIgnoreSpaces(c *C) {
 
 // The absence of a user membership on the CSV file implies its removal
 func (s *TestSuite) TestMembershipRemoval(c *C) {
-       activeUserEmail := s.users[arvadostest.ActiveUserUUID].Email
-       activeUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
+       localUserEmail := s.users[arvadostest.ActiveUserUUID].Email
+       localUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
+       remoteUserEmail := s.users[arvadostest.FederatedActiveUserUUID].Email
+       remoteUserUUID := s.users[arvadostest.FederatedActiveUserUUID].UUID
        data := [][]string{
-               {"TestGroup1", activeUserEmail},
-               {"TestGroup2", activeUserEmail},
+               {"TestGroup1", localUserEmail},
+               {"TestGroup1", remoteUserEmail},
+               {"TestGroup2", localUserEmail},
+               {"TestGroup2", remoteUserEmail},
        }
        tmpfile, err := MakeTempCSVFile(data)
        c.Assert(err, IsNil)
@@ -283,11 +286,13 @@ func (s *TestSuite) TestMembershipRemoval(c *C) {
                groupUUID, err := RemoteGroupExists(s.cfg, groupName)
                c.Assert(err, IsNil)
                c.Assert(groupUUID, Not(Equals), "")
-               c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, true)
+               c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, true)
+               c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, true)
        }
-       // New CSV with one previous membership missing
+       // New CSV with some previous membership missing
        data = [][]string{
-               {"TestGroup1", activeUserEmail},
+               {"TestGroup1", localUserEmail},
+               {"TestGroup2", remoteUserEmail},
        }
        tmpfile2, err := MakeTempCSVFile(data)
        c.Assert(err, IsNil)
@@ -295,16 +300,18 @@ func (s *TestSuite) TestMembershipRemoval(c *C) {
        s.cfg.Path = tmpfile2.Name()
        err = doMain(s.cfg)
        c.Assert(err, IsNil)
-       // Confirm TestGroup1 membership still exist
+       // Confirm TestGroup1 memberships
        groupUUID, err := RemoteGroupExists(s.cfg, "TestGroup1")
        c.Assert(err, IsNil)
        c.Assert(groupUUID, Not(Equals), "")
-       c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, true)
-       // Confirm TestGroup2 membership was removed
+       c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, true)
+       c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, false)
+       // Confirm TestGroup1 memberships
        groupUUID, err = RemoteGroupExists(s.cfg, "TestGroup2")
        c.Assert(err, IsNil)
        c.Assert(groupUUID, Not(Equals), "")
-       c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, false)
+       c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, false)
+       c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, true)
 }
 
 // If a group doesn't exist on the system, create it before adding users