+ collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
+ self.arvrunner = arvrunner
+ self.input_basedir = input_basedir
+ self.collection_pattern = collection_pattern
+ self.file_pattern = file_pattern
+ self.name = name
+ self.referenced_files = [r["location"] for r in referenced_files]
+ self.single_collection = single_collection
+ super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
+
+ def visit(self, srcobj, uploadfiles):
+ src = srcobj["location"]
+ if "#" in src:
+ src = src[:src.index("#")]
+
+ if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
+ self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
+
+ debug = logger.isEnabledFor(logging.DEBUG)
+
+ if src not in self._pathmap:
+ if src.startswith("file:"):
+ # Local FS ref, may need to be uploaded or may be on keep
+ # mount.
+ ab = abspath(src, self.input_basedir)
+ st = arvados.commands.run.statfile("", ab,
+ fnPattern="keep:%s/%s",
+ dirPattern="keep:%s/%s",
+ raiseOSError=True)
+ with SourceLine(srcobj, "location", WorkflowException, debug):
+ if isinstance(st, arvados.commands.run.UploadFile):
+ uploadfiles.add((src, ab, st))
+ elif isinstance(st, arvados.commands.run.ArvFile):
+ self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
+ else:
+ raise WorkflowException("Input file path '%s' is invalid" % st)
+ elif src.startswith("_:"):
+ if srcobj["class"] == "File" and "contents" not in srcobj:
+ raise WorkflowException("File literal '%s' is missing `contents`" % src)
+ if srcobj["class"] == "Directory" and "listing" not in srcobj:
+ raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+ else:
+ self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
+
+ with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
+ for l in srcobj.get("secondaryFiles", []):
+ self.visit(l, uploadfiles)
+ with SourceLine(srcobj, "listing", WorkflowException, debug):
+ for l in srcobj.get("listing", []):
+ self.visit(l, uploadfiles)
+
+ def addentry(self, obj, c, path, remap):
+ if obj["location"] in self._pathmap:
+ src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
+ if srcpath == "":
+ srcpath = "."
+ c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
+ remap.append((obj["location"], path + "/" + obj["basename"]))
+ for l in obj.get("secondaryFiles", []):
+ self.addentry(l, c, path, remap)
+ elif obj["class"] == "Directory":
+ for l in obj.get("listing", []):
+ self.addentry(l, c, path + "/" + obj["basename"], remap)
+ remap.append((obj["location"], path + "/" + obj["basename"]))
+ elif obj["location"].startswith("_:") and "contents" in obj:
+ with c.open(path + "/" + obj["basename"], "w") as f:
+ f.write(obj["contents"].encode("utf-8"))
+ else:
+ raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
+
+ def setup(self, referenced_files, basedir):
+ # type: (List[Any], unicode) -> None