Merge branch '12764-writable-file' refs #12764
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 31 Jan 2018 02:22:38 +0000 (21:22 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 31 Jan 2018 02:22:45 +0000 (21:22 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

doc/_includes/_mount_types.liquid
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go

index 734b07c8b7970c00bdd6e99be3815e9d8d31f1f0..fc8a7991b38c65616704d0d096d7500a1b9e1473 100644 (file)
@@ -64,7 +64,7 @@ When a container's output_path is a tmp mount backed by local disk, this output
 
 1. Only mount points of kind @collection@ are supported.
 
-2. Mount points underneath output_path must not use @"writable":true@. If any of them are set as @writable@, the API will refuse to create/update the container request, and crunch-run will fail the container.
+2. Mount points underneath output_path which have "writable":true are copied into output_path during container initialization and may be updated, renamed, or deleted by the running container.  The original collection is not modified.  On container completion, files remaining in the output are saved to the output collection.   The mount at output_path must be big enough to accommodate copies of the inner writable mounts.
 
 3. If any such mount points are configured as @exclude_from_output":true@, they will be excluded from the output.
 
index e82fd9feef1f3d88c4068e7d0d6cbfee6232c3f2..a55db8d53ebeab4cc0567700945bf124f4d9158b 100644 (file)
@@ -224,13 +224,16 @@ class ArvCwlRunner(object):
 
     def check_features(self, obj):
         if isinstance(obj, dict):
-            if obj.get("writable"):
-                raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
+            if obj.get("writable") and self.work_api != "containers":
+                raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
             if obj.get("class") == "DockerRequirement":
                 if obj.get("dockerOutputDirectory"):
-                    # TODO: can be supported by containers API, but not jobs API.
-                    raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
-                        "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+                    if self.work_api != "containers":
+                        raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+                            "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
+                    if not obj.get("dockerOutputDirectory").startswith('/'):
+                        raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
+                            "Option 'dockerOutputDirectory' must be an absolute path.")
             for v in obj.itervalues():
                 self.check_features(v)
         elif isinstance(obj, list):
index 014e1b94aae5b283ea851fd74e480dd5df926f55..abe67c8fb3c552c7e66925093ae1377b1e26b4e9 100644 (file)
@@ -105,17 +105,32 @@ class ArvadosContainer(object):
                 generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
                                                     separateDirs=False)
 
+                logger.debug("generatemapper is %s", generatemapper._pathmap)
+
                 with Perf(metrics, "createfiles %s" % self.name):
                     for f, p in generatemapper.items():
                         if not p.target:
                             pass
-                        elif p.type in ("File", "Directory"):
-                            source, path = self.arvrunner.fs_access.get_collection(p.resolved)
-                            vwd.copy(path, p.target, source_collection=source)
+                        elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
+                            if p.resolved.startswith("_:"):
+                                vwd.mkdirs(p.target)
+                            else:
+                                source, path = self.arvrunner.fs_access.get_collection(p.resolved)
+                                vwd.copy(path, p.target, source_collection=source)
                         elif p.type == "CreateFile":
                             with vwd.open(p.target, "w") as n:
                                 n.write(p.resolved.encode("utf-8"))
 
+                def keepemptydirs(p):
+                    if isinstance(p, arvados.collection.RichCollectionBase):
+                        if len(p) == 0:
+                            p.open(".keep", "w").close()
+                        else:
+                            for c in p:
+                                keepemptydirs(p[c])
+
+                keepemptydirs(vwd)
+
                 with Perf(metrics, "generatefiles.save_new %s" % self.name):
                     vwd.save_new()
 
@@ -126,6 +141,8 @@ class ArvadosContainer(object):
                     mounts[mountpoint] = {"kind": "collection",
                                           "portable_data_hash": vwd.portable_data_hash(),
                                           "path": p.target}
+                    if p.type.startswith("Writable"):
+                        mounts[mountpoint]["writable"] = True
 
         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
         if self.environment:
index b667dac1ca5cec6f272c390be8fcd17e1628764c..81faff44e6297476d28d84c7f78d7201dff29122 100644 (file)
@@ -36,8 +36,13 @@ class ArvadosCommandTool(CommandLineTool):
 
     def job(self, joborder, output_callback, **kwargs):
         if self.work_api == "containers":
-            kwargs["outdir"] = "/var/spool/cwl"
-            kwargs["docker_outdir"] = "/var/spool/cwl"
+            dockerReq, is_req = self.get_requirement("DockerRequirement")
+            if dockerReq and dockerReq.get("dockerOutputDirectory"):
+                kwargs["outdir"] = dockerReq.get("dockerOutputDirectory")
+                kwargs["docker_outdir"] = dockerReq.get("dockerOutputDirectory")
+            else:
+                kwargs["outdir"] = "/var/spool/cwl"
+                kwargs["docker_outdir"] = "/var/spool/cwl"
         elif self.work_api == "jobs":
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["docker_outdir"] = "$(task.outdir)"
index 914ccaa5a1049868cfe7f840f6bf7d56e957218c..bb95ba9ee4636ff273cd93455f901a2123a62995 100644 (file)
@@ -230,7 +230,10 @@ class StagingPathMapper(PathMapper):
             tgt = "%s_%i%s" % (basetgt, n, baseext)
         self.targets.add(tgt)
         if obj["class"] == "Directory":
-            self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
+            if obj.get("writable"):
+                self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
+            else:
+                self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
             if loc.startswith("_:") or self._follow_dirs:
                 self.visitlisting(obj.get("listing", []), tgt, basedir)
         elif obj["class"] == "File":
@@ -239,7 +242,7 @@ class StagingPathMapper(PathMapper):
             if "contents" in obj and loc.startswith("_:"):
                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
             else:
-                if copy:
+                if copy or obj.get("writable"):
                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
                 else:
                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
index 2ca63cfe5048a62f0a1853e2aca06be865ea1fd4..fb5d036e941969df71b6a3062d09bb87d4328739 100644 (file)
@@ -161,7 +161,7 @@ def upload_docker(arvrunner, tool):
     if isinstance(tool, CommandLineTool):
         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
         if docker_req:
-            if docker_req.get("dockerOutputDirectory"):
+            if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
                 # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
index 88e3d80db35293a5fab4aec7dbd4db023e26a943..e5484651edcd44d0f2ef67c3bf0dbd1244ffca60 100644 (file)
@@ -41,7 +41,7 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20180116213856',
+          'cwltool==1.0.20180130110340',
           'schema-salad==2.6.20171201034858',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',
index 705bea486b21797ea4c6523ba28e03dda4ad269c..0582e5418fd4776e11a97224d42c58750b90d8da 100644 (file)
@@ -332,6 +332,37 @@ func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
        return
 }
 
+func copyfile(src string, dst string) (err error) {
+       srcfile, err := os.Open(src)
+       if err != nil {
+               return
+       }
+
+       os.MkdirAll(path.Dir(dst), 0770)
+
+       dstfile, err := os.Create(dst)
+       if err != nil {
+               return
+       }
+       _, err = io.Copy(dstfile, srcfile)
+       if err != nil {
+               return
+       }
+
+       err = srcfile.Close()
+       err2 := dstfile.Close()
+
+       if err != nil {
+               return
+       }
+
+       if err2 != nil {
+               return err2
+       }
+
+       return nil
+}
+
 func (runner *ContainerRunner) SetupMounts() (err error) {
        err = runner.SetupArvMountPoint("keep")
        if err != nil {
@@ -359,6 +390,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        runner.Binds = nil
        runner.Volumes = make(map[string]struct{})
        needCertMount := true
+       type copyFile struct {
+               src  string
+               bind string
+       }
+       var copyFiles []copyFile
 
        var binds []string
        for bind := range runner.Container.Mounts {
@@ -414,7 +450,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                pdhOnly = false
                                src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
                        } else if mnt.PortableDataHash != "" {
-                               if mnt.Writable {
+                               if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
                                        return fmt.Errorf("Can never write to a collection specified by portable data hash")
                                }
                                idx := strings.Index(mnt.PortableDataHash, "/")
@@ -441,10 +477,12 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if mnt.Writable {
                                if bind == runner.Container.OutputPath {
                                        runner.HostOutputDir = src
+                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                                } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
-                                       return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
+                                       copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
+                               } else {
+                                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                                }
-                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                        } else {
                                runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
                        }
@@ -539,6 +577,32 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
        }
 
+       for _, cp := range copyFiles {
+               dir, err := os.Stat(cp.src)
+               if err != nil {
+                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+               }
+               if dir.IsDir() {
+                       err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+                               if walkerr != nil {
+                                       return walkerr
+                               }
+                               if walkinfo.Mode().IsRegular() {
+                                       return copyfile(walkpath, path.Join(cp.bind, walkpath[len(cp.src):]))
+                               } else if walkinfo.Mode().IsDir() {
+                                       return os.MkdirAll(path.Join(cp.bind, walkpath[len(cp.src):]), 0770)
+                               } else {
+                                       return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
+                               }
+                       })
+               } else {
+                       err = copyfile(cp.src, cp.bind)
+               }
+               if err != nil {
+                       return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+               }
+       }
+
        return nil
 }
 
@@ -1102,7 +1166,7 @@ func (runner *ContainerRunner) UploadOutputFile(
        // go through mounts and try reverse map to collection reference
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+               if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable {
                        // get path relative to bind
                        targetSuffix := tgt[len(bind):]
 
@@ -1241,7 +1305,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
                        continue
                }
 
-               if mnt.ExcludeFromOutput == true {
+               if mnt.ExcludeFromOutput == true || mnt.Writable {
                        continue
                }
 
index 22989bb2ece510e6f239151b267968fde25f1203..ed31adecf8caa39ec7523076678a46a8e039f6c7 100644 (file)
@@ -1233,20 +1233,36 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                checkEmpty()
        }
 
-       // Writable mount points are not allowed underneath output_dir mount point
+       // Writable mount points copied to output_dir mount point
        {
                i = 0
                cr.ArvMountPoint = ""
                cr.Container.Mounts = make(map[string]arvados.Mount)
                cr.Container.Mounts = map[string]arvados.Mount{
-                       "/tmp":     {Kind: "tmp"},
-                       "/tmp/foo": {Kind: "collection", Writable: true},
+                       "/tmp": {Kind: "tmp"},
+                       "/tmp/foo": {Kind: "collection",
+                               PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53",
+                               Writable:         true},
+                       "/tmp/bar": {Kind: "collection",
+                               PortableDataHash: "59389a8f9ee9d399be35462a0f92541d+53",
+                               Path:             "baz",
+                               Writable:         true},
                }
                cr.OutputPath = "/tmp"
 
+               os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+               os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz", os.ModePerm)
+
+               rf, _ := os.Create(realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz/quux")
+               rf.Write([]byte("bar"))
+               rf.Close()
+
                err := cr.SetupMounts()
-               c.Check(err, NotNil)
-               c.Check(err, ErrorMatches, `Writable mount points are not permitted underneath the output_path.*`)
+               c.Check(err, IsNil)
+               _, err = os.Stat(cr.HostOutputDir + "/foo")
+               c.Check(err, IsNil)
+               _, err = os.Stat(cr.HostOutputDir + "/bar/quux")
+               c.Check(err, IsNil)
                os.RemoveAll(cr.ArvMountPoint)
                cr.CleanupDirs()
                checkEmpty()