return dockerRequirement["dockerImageId"]
+class CollectionFsAccess(object):
+ def __init__(self):
+ self.collections = {}
+
+ def get_collection(self, path):
+ p = path.split("/")
+ if p[0] == "keep":
+ del p[0]
+ if p[0] not in self.collections:
+ self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
+ return (self.collections[p[0]], "/".join(p[1:]))
+
+ def _match(self, collection, patternsegments, parent):
+ ret = []
+ for i in collection:
+ if fnmatch.fnmatch(i, patternsegments[0]):
+ cur = os.path.join(parent, i)
+ if len(patternsegments) == 1:
+ ret.append(cur)
+ else:
+ ret.extend(self._match(collection[i], patternsegments[1:], cur))
+ return ret
+
+ def glob(self, pattern):
+ collection, rest = self.get_path(pattern)
+ patternsegments = rest.split("/")
+ return self._match(collection, patternsegments, collection.manifest_locator())
+
+ def open(self, fn, mode):
+ collection, rest = self.get_path(fn)
+ return c.open(rest, mode)
+
+ def exists(self, fn):
+ collection, rest = self.get_path(fn)
+ return c.exists(rest)
+
class ArvadosJob(object):
def __init__(self, runner):
self.arvrunner.jobs[response["uuid"]] = self
- class CollectionFsAccess(object):
- def __init__(self):
- self.collections = {}
-
- def get_collection(self, path):
- p = path.split("/")
- if p[0] == "keep":
- del p[0]
- if p[0] not in self.collections:
- self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
- return (self.collections[p[0]], "/".join(p[1:]))
-
- def _match(self, collection, patternsegments, parent):
- ret = []
- for i in collection:
- if fnmatch.fnmatch(i, patternsegments[0]):
- cur = os.path.join(parent, i)
- if len(patternsegments) == 1:
- ret.append(cur)
- else:
- ret.extend(self._match(collection[i], patternsegments[1:], cur))
- return ret
-
- def glob(self, pattern):
- collection, rest = self.get_path(pattern)
- patternsegments = rest.split("/")
- return self._match(collection, patternsegments, collection.manifest_locator())
-
- def open(self, fn, mode):
- collection, rest = self.get_path(fn)
- return c.open(rest, mode)
-
- def exists(self, fn):
- collection, rest = self.get_path(fn)
- return c.exists(rest)
-
def done(self, record):
- outputs = self.collect_outputs(record["output"], fs_access=ArvadosJob.CollectionFsAccess())
+ outputs = self.collect_outputs(record["output"], fs_access=CollectionFsAccess())
if record["state"] == "Complete":
processStatus = "success"
self.output_callback(outputs, processStatus)
+class ArvPathMapper(cwltool.pathmapper.PathMapper):
+ def __init__(self, arvrunner, referenced_files, basedir):
+ self._pathmap = {}
+ uploadfiles = []
+
+ pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
+
+ for src in referenced_files:
+ ab = src if os.path.isabs(src) else os.path.join(basedir, src)
+ st = arvados.commands.run.statfile("", ab)
+ if isinstance(st, arvados.commands.run.UploadFile):
+ uploadfiles.append((src, ab, st))
+ elif isinstance(st, arvados.commands.run.ArvFile):
+ self._pathmap[src] = (ab, st.fn)
+ elif isinstance(st, basestring) and pdh_path.match(st):
+ self._pathmap[src] = (st, "$(file %s") % st)
+ else:
+ workflow.WorkflowException("Input file path '%s' is invalid", st)
+
+ arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api)
+
+ for src, ab, st in uploadfiles:
+ self._pathmap[src] = (ab, st.fn)
+
+
class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
def __init__(self, arvrunner, toolpath_object, **kwargs):
super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
def makeJobRunner(self):
return ArvadosJob(self.arvrunner)
- class ArvPathMapper(cwltool.pathmapper.PathMapper):
- def __init__(self, referenced_files, basedir):
- self._pathmap = {}
- uploadfiles = []
-
- pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
-
- for src in referenced_files:
- ab = src if os.path.isabs(src) else os.path.join(basedir, src)
- st = arvados.commands.run.statfile("", ab)
- if isinstance(st, arvados.commands.run.UploadFile):
- uploadfiles.append((src, ab))
- elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = (ab, st.fn)
- elif isinstance(st, basestring) and pdh_path.match(st):
- self._pathmap[src] = (st, "$(file %s") % st)
- else:
- workflow.WorkflowException("Input file path '%s' is invalid", st)
-
def makePathMapper(self, reffiles, input_basedir):
- pass
+ return ArvadosCommandTool.ArvPathMapper(self.arvrunner, reffiles, input_basedir)
+
class ArvCwlRunner(object):
def __init__(self, api_client):