+ collection_pattern, file_pattern, name=None, single_collection=False):
+ self.arvrunner = arvrunner
+ self.input_basedir = input_basedir
+ self.collection_pattern = collection_pattern
+ self.file_pattern = file_pattern
+ self.name = name
+ self.referenced_files = [r["location"] for r in referenced_files]
+ self.single_collection = single_collection
+ self.pdh_to_uuid = {}
+ super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
+
+ def visit(self, srcobj, uploadfiles):
+ src = srcobj["location"]
+ if "#" in src:
+ src = src[:src.index("#")]
+
+ if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
+ self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+ if arvados_cwl.util.collectionUUID in srcobj:
+ self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
+
+ debug = logger.isEnabledFor(logging.DEBUG)
+
+ if src not in self._pathmap:
+ if src.startswith("file:"):
+ # Local FS ref, may need to be uploaded or may be on keep
+ # mount.
+ ab = abspath(src, self.input_basedir)
+ st = arvados.commands.run.statfile("", ab,
+ fnPattern="keep:%s/%s",
+ dirPattern="keep:%s/%s",
+ raiseOSError=True)
+ with SourceLine(srcobj, "location", WorkflowException, debug):
+ if isinstance(st, arvados.commands.run.UploadFile):
+ uploadfiles.add((src, ab, st))
+ elif isinstance(st, arvados.commands.run.ArvFile):
+ self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.parse.unquote(st.fn[5:]), "File", True)
+ else:
+ raise WorkflowException("Input file path '%s' is invalid" % st)
+ elif src.startswith("_:"):
+ if srcobj["class"] == "File" and "contents" not in srcobj:
+ raise WorkflowException("File literal '%s' is missing `contents`" % src)
+ if srcobj["class"] == "Directory" and "listing" not in srcobj:
+ raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+ elif src.startswith("http:") or src.startswith("https:"):
+ keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
+ logger.info("%s is %s", src, keepref)
+ self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
+ else:
+ self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
+
+ with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
+ for l in srcobj.get("secondaryFiles", []):
+ self.visit(l, uploadfiles)
+ with SourceLine(srcobj, "listing", WorkflowException, debug):
+ for l in srcobj.get("listing", []):
+ self.visit(l, uploadfiles)
+
+ def addentry(self, obj, c, path, remap):
+ if obj["location"] in self._pathmap:
+ src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
+ if srcpath == "":
+ srcpath = "."
+ c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
+ remap.append((obj["location"], path + "/" + obj["basename"]))
+ for l in obj.get("secondaryFiles", []):
+ self.addentry(l, c, path, remap)
+ elif obj["class"] == "Directory":
+ for l in obj.get("listing", []):
+ self.addentry(l, c, path + "/" + obj["basename"], remap)
+ remap.append((obj["location"], path + "/" + obj["basename"]))
+ elif obj["location"].startswith("_:") and "contents" in obj:
+ with c.open(path + "/" + obj["basename"], "w") as f:
+ f.write(obj["contents"])
+ remap.append((obj["location"], path + "/" + obj["basename"]))
+ else:
+ raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
+
+ def needs_new_collection(self, srcobj, prefix=""):
+ """Check if files need to be staged into a new collection.
+
+ If all the files are in the same collection and in the same
+ paths they would be staged to, return False. Otherwise, a new
+ collection is needed with files copied/created in the
+ appropriate places.
+ """
+
+ loc = srcobj["location"]
+ if loc.startswith("_:"):
+ return True
+ if prefix:
+ if loc != prefix+srcobj["basename"]:
+ return True
+ else:
+ i = loc.rfind("/")
+ if i > -1:
+ prefix = loc[:i+1]
+ else:
+ prefix = loc+"/"
+ if srcobj["class"] == "File" and loc not in self._pathmap:
+ return True
+ for s in srcobj.get("secondaryFiles", []):
+ if self.needs_new_collection(s, prefix):
+ return True
+ if srcobj.get("listing"):
+ prefix = "%s%s/" % (prefix, srcobj["basename"])
+ for l in srcobj["listing"]:
+ if self.needs_new_collection(l, prefix):
+ return True
+ return False
+
+ def setup(self, referenced_files, basedir):
+ # type: (List[Any], unicode) -> None