+ def visit(self, srcobj, uploadfiles):
+ src = srcobj["location"]
+ if srcobj["class"] == "File":
+ if "#" in src:
+ src = src[:src.index("#")]
+ if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
+ self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
+ if src not in self._pathmap:
+ # Local FS ref, may need to be uploaded or may be on keep
+ # mount.
+ ab = abspath(src, self.input_basedir)
+ st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
+ if isinstance(st, arvados.commands.run.UploadFile):
+ uploadfiles.add((src, ab, st))
+ elif isinstance(st, arvados.commands.run.ArvFile):
+ self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+ elif src.startswith("_:") and "contents" in srcobj:
+ pass
+ else:
+ raise WorkflowException("Input file path '%s' is invalid" % st)
+ if "secondaryFiles" in srcobj:
+ for l in srcobj["secondaryFiles"]:
+ self.visit(l, uploadfiles)
+ elif srcobj["class"] == "Directory":
+ if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
+ self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "Directory")
+ else:
+ for l in srcobj["listing"]:
+ self.visit(l, uploadfiles)
+
+ def addentry(self, obj, c, path, subdirs):
+ if obj["location"] in self._pathmap:
+ src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
+ c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
+ for l in obj.get("secondaryFiles", []):
+ self.addentry(l, c, path, subdirs)
+ elif obj["class"] == "Directory":
+ for l in obj["listing"]:
+ self.addentry(l, c, path + "/" + obj["basename"], subdirs)
+ subdirs.append((obj["location"], path + "/" + obj["basename"]))
+ elif obj["location"].startswith("_:") and "contents" in obj:
+ with c.open(path + "/" + obj["basename"], "w") as f:
+ f.write(obj["contents"].encode("utf-8"))
+ else:
+ raise WorkflowException("Don't know what to do with '%s'" % obj["location"])
+