import arvados.commands.run
import arvados.collection
+from schema_salad.sourceline import SourceLine
+
from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
from cwltool.workflow import WorkflowException
# mount.
ab = abspath(src, self.input_basedir)
st = arvados.commands.run.statfile("", ab, fnPattern="keep:%s/%s")
- if isinstance(st, arvados.commands.run.UploadFile):
- uploadfiles.add((src, ab, st))
- elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
- elif src.startswith("_:"):
- if "contents" in srcobj:
- pass
+ with SourceLine(srcobj, "location", WorkflowException):
+ if isinstance(st, arvados.commands.run.UploadFile):
+ uploadfiles.add((src, ab, st))
+ elif isinstance(st, arvados.commands.run.ArvFile):
+ self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
+ elif src.startswith("_:"):
+ if "contents" in srcobj:
+ pass
+ else:
+ raise WorkflowException("File literal '%s' is missing contents" % src)
+ elif src.startswith("arvwf:"):
+ self._pathmap[src] = MapperEnt(src, src, "File")
else:
- raise WorkflowException("File literal '%s' is missing contents" % src)
- elif src.startswith("arvwf:"):
- self._pathmap[src] = MapperEnt(src, src, "File")
- else:
- raise WorkflowException("Input file path '%s' is invalid" % st)
+ raise WorkflowException("Input file path '%s' is invalid" % st)
if "secondaryFiles" in srcobj:
for l in srcobj["secondaryFiles"]:
self.visit(l, uploadfiles)
with c.open(path + "/" + obj["basename"], "w") as f:
f.write(obj["contents"].encode("utf-8"))
else:
- raise WorkflowException("Don't know what to do with '%s'" % obj["location"])
+ raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
def setup(self, referenced_files, basedir):
# type: (List[Any], unicode) -> None
uploadfiles = set()
- for k,v in self.arvrunner.get_uploaded().iteritems():
- self._pathmap[k] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+ already_uploaded = self.arvrunner.get_uploaded()
+ for k in referenced_files:
+ loc = k["location"]
+ if loc in already_uploaded:
+ v = already_uploaded[loc]
+ self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
for srcobj in referenced_files:
self.visit(srcobj, uploadfiles)
self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
-class InitialWorkDirPathMapper(StagingPathMapper):
+class VwdPathMapper(StagingPathMapper):
def setup(self, referenced_files, basedir):
# type: (List[Any], unicode) -> None
self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
-class FinalOutputPathMapper(StagingPathMapper):
+class NoFollowPathMapper(StagingPathMapper):
_follow_dirs = False
def setup(self, referenced_files, basedir):
# type: (List[Any], unicode) -> None