6264: First pass complete, ready for testing.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 29 Jun 2015 20:14:43 +0000 (16:14 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 29 Jun 2015 20:14:43 +0000 (16:14 -0400)
sdk/python/arvados/commands/cwl_runner.py
sdk/python/arvados/commands/run.py

index 9db55eeb21c35578b1d3ea634417079b83b2d478..37995071a1103153f21529b09fef937edf3ecfe4 100644 (file)
@@ -33,6 +33,42 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image):
 
     return dockerRequirement["dockerImageId"]
 
+class CollectionFsAccess(object):
+    def __init__(self):
+        self.collections = {}
+
+    def get_collection(self, path):
+        p = path.split("/")
+        if p[0] == "keep":
+            del p[0]
+        if p[0] not in self.collections:
+            self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
+        return (self.collections[p[0]], "/".join(p[1:]))
+
+    def _match(self, collection, patternsegments, parent):
+        ret = []
+        for i in collection:
+            if fnmatch.fnmatch(i, patternsegments[0]):
+                cur = os.path.join(parent, i)
+                if len(patternsegments) == 1:
+                    ret.append(cur)
+                else:
+                    ret.extend(self._match(collection[i], patternsegments[1:], cur))
+        return ret
+
+    def glob(self, pattern):
+        collection, rest = self.get_path(pattern)
+        patternsegments = rest.split("/")
+        return self._match(collection, patternsegments, collection.manifest_locator())
+
+    def open(self, fn, mode):
+        collection, rest = self.get_path(fn)
+        return c.open(rest, mode)
+
+    def exists(self, fn):
+        collection, rest = self.get_path(fn)
+        return c.exists(rest)
+
 
 class ArvadosJob(object):
     def __init__(self, runner):
@@ -65,44 +101,8 @@ class ArvadosJob(object):
 
         self.arvrunner.jobs[response["uuid"]] = self
 
-    class CollectionFsAccess(object):
-        def __init__(self):
-            self.collections = {}
-
-        def get_collection(self, path):
-            p = path.split("/")
-            if p[0] == "keep":
-                del p[0]
-            if p[0] not in self.collections:
-                self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
-            return (self.collections[p[0]], "/".join(p[1:]))
-
-        def _match(self, collection, patternsegments, parent):
-            ret = []
-            for i in collection:
-                if fnmatch.fnmatch(i, patternsegments[0]):
-                    cur = os.path.join(parent, i)
-                    if len(patternsegments) == 1:
-                        ret.append(cur)
-                    else:
-                        ret.extend(self._match(collection[i], patternsegments[1:], cur))
-            return ret
-
-        def glob(self, pattern):
-            collection, rest = self.get_path(pattern)
-            patternsegments = rest.split("/")
-            return self._match(collection, patternsegments, collection.manifest_locator())
-
-        def open(self, fn, mode):
-            collection, rest = self.get_path(fn)
-            return c.open(rest, mode)
-
-        def exists(self, fn):
-            collection, rest = self.get_path(fn)
-            return c.exists(rest)
-
     def done(self, record):
-        outputs = self.collect_outputs(record["output"], fs_access=ArvadosJob.CollectionFsAccess())
+        outputs = self.collect_outputs(record["output"], fs_access=CollectionFsAccess())
 
         if record["state"] == "Complete":
             processStatus = "success"
@@ -112,6 +112,31 @@ class ArvadosJob(object):
         self.output_callback(outputs, processStatus)
 
 
+class ArvPathMapper(cwltool.pathmapper.PathMapper):
+    def __init__(self, arvrunner, referenced_files, basedir):
+        self._pathmap = {}
+        uploadfiles = []
+
+        pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
+
+        for src in referenced_files:
+            ab = src if os.path.isabs(src) else os.path.join(basedir, src)
+            st = arvados.commands.run.statfile("", ab)
+            if isinstance(st, arvados.commands.run.UploadFile):
+                uploadfiles.append((src, ab, st))
+            elif isinstance(st, arvados.commands.run.ArvFile):
+                self._pathmap[src] = (ab, st.fn)
+            elif isinstance(st, basestring) and pdh_path.match(st):
+                self._pathmap[src] = (st, "$(file %s") % st)
+            else:
+                workflow.WorkflowException("Input file path '%s' is invalid", st)
+
+        arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api)
+
+        for src, ab, st in uploadfiles:
+            self._pathmap[src] = (ab, st.fn)
+
+
 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
     def __init__(self, arvrunner, toolpath_object, **kwargs):
         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
@@ -120,27 +145,9 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
     def makeJobRunner(self):
         return ArvadosJob(self.arvrunner)
 
-    class ArvPathMapper(cwltool.pathmapper.PathMapper):
-        def __init__(self, referenced_files, basedir):
-            self._pathmap = {}
-            uploadfiles = []
-
-            pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
-
-            for src in referenced_files:
-                ab = src if os.path.isabs(src) else os.path.join(basedir, src)
-                st = arvados.commands.run.statfile("", ab)
-                if isinstance(st, arvados.commands.run.UploadFile):
-                    uploadfiles.append((src, ab))
-                elif isinstance(st, arvados.commands.run.ArvFile):
-                    self._pathmap[src] = (ab, st.fn)
-                elif isinstance(st, basestring) and pdh_path.match(st):
-                    self._pathmap[src] = (st, "$(file %s") % st)
-                else:
-                    workflow.WorkflowException("Input file path '%s' is invalid", st)
-
     def makePathMapper(self, reffiles, input_basedir):
-        pass
+        return ArvadosCommandTool.ArvPathMapper(self.arvrunner, reffiles, input_basedir)
+
 
 class ArvCwlRunner(object):
     def __init__(self, api_client):
index c303ef61329c7334fd723a6ce27a04f54a5b8481..6b929ac3e95dbd355f50ca91ed093e7e072be757 100644 (file)
@@ -101,7 +101,7 @@ def statfile(prefix, fn):
 
     return prefix+fn
 
-def uploadfiles(files):
+def uploadfiles(files, api):
     # Find the smallest path prefix that includes all the files that need to be uploaded.
     # This starts at the root and iteratively removes common parent directory prefixes
     # until all file pathes no longer have a common parent.
@@ -237,8 +237,8 @@ def main(arguments=None):
     n = True
     pathprefix = "/"
     files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
-    if len(files) > 0:
-        uploadfiles(files)
+    if files:
+        uploadfiles(files, api)
 
     for i in xrange(1, len(slots)):
         slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]