1. Only mount points of kind @collection@ are supported.
-2. Mount points underneath output_path must not use @"writable":true@. If any of them are set as @writable@, the API will refuse to create/update the container request, and crunch-run will fail the container.
+2. Mount points underneath output_path which have "writable":true are copied into output_path during container initialization and may be updated, renamed, or deleted by the running container. The original collection is not modified. On container completion, files remaining in the output are saved to the output collection. The mount at output_path must be big enough to accommodate copies of the inner writable mounts.
3. If any such mount points are configured as @exclude_from_output":true@, they will be excluded from the output.
def check_features(self, obj):
if isinstance(obj, dict):
- if obj.get("writable"):
- raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
+ if obj.get("writable") and self.work_api != "containers":
+ raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
if obj.get("class") == "DockerRequirement":
if obj.get("dockerOutputDirectory"):
- # TODO: can be supported by containers API, but not jobs API.
- raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
- "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+ if self.work_api != "containers":
+ raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+ "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
+ if not obj.get("dockerOutputDirectory").startswith('/'):
+ raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
+ "Option 'dockerOutputDirectory' must be an absolute path.")
for v in obj.itervalues():
self.check_features(v)
elif isinstance(obj, list):
generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
separateDirs=False)
+ logger.debug("generatemapper is %s", generatemapper._pathmap)
+
with Perf(metrics, "createfiles %s" % self.name):
for f, p in generatemapper.items():
if not p.target:
pass
- elif p.type in ("File", "Directory"):
- source, path = self.arvrunner.fs_access.get_collection(p.resolved)
- vwd.copy(path, p.target, source_collection=source)
+ elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
+ if p.resolved.startswith("_:"):
+ vwd.mkdirs(p.target)
+ else:
+ source, path = self.arvrunner.fs_access.get_collection(p.resolved)
+ vwd.copy(path, p.target, source_collection=source)
elif p.type == "CreateFile":
with vwd.open(p.target, "w") as n:
n.write(p.resolved.encode("utf-8"))
+ def keepemptydirs(p):
+ if isinstance(p, arvados.collection.RichCollectionBase):
+ if len(p) == 0:
+ p.open(".keep", "w").close()
+ else:
+ for c in p:
+ keepemptydirs(p[c])
+
+ keepemptydirs(vwd)
+
with Perf(metrics, "generatefiles.save_new %s" % self.name):
vwd.save_new()
mounts[mountpoint] = {"kind": "collection",
"portable_data_hash": vwd.portable_data_hash(),
"path": p.target}
+ if p.type.startswith("Writable"):
+ mounts[mountpoint]["writable"] = True
container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
if self.environment:
def job(self, joborder, output_callback, **kwargs):
if self.work_api == "containers":
- kwargs["outdir"] = "/var/spool/cwl"
- kwargs["docker_outdir"] = "/var/spool/cwl"
+ dockerReq, is_req = self.get_requirement("DockerRequirement")
+ if dockerReq and dockerReq.get("dockerOutputDirectory"):
+ kwargs["outdir"] = dockerReq.get("dockerOutputDirectory")
+ kwargs["docker_outdir"] = dockerReq.get("dockerOutputDirectory")
+ else:
+ kwargs["outdir"] = "/var/spool/cwl"
+ kwargs["docker_outdir"] = "/var/spool/cwl"
elif self.work_api == "jobs":
kwargs["outdir"] = "$(task.outdir)"
kwargs["docker_outdir"] = "$(task.outdir)"
tgt = "%s_%i%s" % (basetgt, n, baseext)
self.targets.add(tgt)
if obj["class"] == "Directory":
- self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
+ if obj.get("writable"):
+ self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
+ else:
+ self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
if loc.startswith("_:") or self._follow_dirs:
self.visitlisting(obj.get("listing", []), tgt, basedir)
elif obj["class"] == "File":
if "contents" in obj and loc.startswith("_:"):
self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
else:
- if copy:
+ if copy or obj.get("writable"):
self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
else:
self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
- if docker_req.get("dockerOutputDirectory"):
+ if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
# TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180116213856',
+ 'cwltool==1.0.20180130110340',
'schema-salad==2.6.20171201034858',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
return
}
+func copyfile(src string, dst string) (err error) {
+ srcfile, err := os.Open(src)
+ if err != nil {
+ return
+ }
+
+ os.MkdirAll(path.Dir(dst), 0770)
+
+ dstfile, err := os.Create(dst)
+ if err != nil {
+ return
+ }
+ _, err = io.Copy(dstfile, srcfile)
+ if err != nil {
+ return
+ }
+
+ err = srcfile.Close()
+ err2 := dstfile.Close()
+
+ if err != nil {
+ return
+ }
+
+ if err2 != nil {
+ return err2
+ }
+
+ return nil
+}
+
func (runner *ContainerRunner) SetupMounts() (err error) {
err = runner.SetupArvMountPoint("keep")
if err != nil {
runner.Binds = nil
runner.Volumes = make(map[string]struct{})
needCertMount := true
+ type copyFile struct {
+ src string
+ bind string
+ }
+ var copyFiles []copyFile
var binds []string
for bind := range runner.Container.Mounts {
pdhOnly = false
src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
} else if mnt.PortableDataHash != "" {
- if mnt.Writable {
+ if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
return fmt.Errorf("Can never write to a collection specified by portable data hash")
}
idx := strings.Index(mnt.PortableDataHash, "/")
if mnt.Writable {
if bind == runner.Container.OutputPath {
runner.HostOutputDir = src
+ runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
} else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
- return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
+ copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
+ } else {
+ runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
}
- runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
} else {
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
}
}
}
+ for _, cp := range copyFiles {
+ dir, err := os.Stat(cp.src)
+ if err != nil {
+ return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+ }
+ if dir.IsDir() {
+ err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+ if walkerr != nil {
+ return walkerr
+ }
+ if walkinfo.Mode().IsRegular() {
+ return copyfile(walkpath, path.Join(cp.bind, walkpath[len(cp.src):]))
+ } else if walkinfo.Mode().IsDir() {
+ return os.MkdirAll(path.Join(cp.bind, walkpath[len(cp.src):]), 0770)
+ } else {
+ return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
+ }
+ })
+ } else {
+ err = copyfile(cp.src, cp.bind)
+ }
+ if err != nil {
+ return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+ }
+ }
+
return nil
}
// go through mounts and try reverse map to collection reference
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
- if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+ if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable {
// get path relative to bind
targetSuffix := tgt[len(bind):]
continue
}
- if mnt.ExcludeFromOutput == true {
+ if mnt.ExcludeFromOutput == true || mnt.Writable {
continue
}
checkEmpty()
}
- // Writable mount points are not allowed underneath output_dir mount point
+ // Writable mount points copied to output_dir mount point
{
i = 0
cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts = map[string]arvados.Mount{
- "/tmp": {Kind: "tmp"},
- "/tmp/foo": {Kind: "collection", Writable: true},
+ "/tmp": {Kind: "tmp"},
+ "/tmp/foo": {Kind: "collection",
+ PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53",
+ Writable: true},
+ "/tmp/bar": {Kind: "collection",
+ PortableDataHash: "59389a8f9ee9d399be35462a0f92541d+53",
+ Path: "baz",
+ Writable: true},
}
cr.OutputPath = "/tmp"
+ os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+ os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz", os.ModePerm)
+
+ rf, _ := os.Create(realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz/quux")
+ rf.Write([]byte("bar"))
+ rf.Close()
+
err := cr.SetupMounts()
- c.Check(err, NotNil)
- c.Check(err, ErrorMatches, `Writable mount points are not permitted underneath the output_path.*`)
+ c.Check(err, IsNil)
+ _, err = os.Stat(cr.HostOutputDir + "/foo")
+ c.Check(err, IsNil)
+ _, err = os.Stat(cr.HostOutputDir + "/bar/quux")
+ c.Check(err, IsNil)
os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()