10221: Always upload all dependencies up front. Ensures consistent reuse
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 12 Oct 2016 14:59:05 +0000 (10:59 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 13 Oct 2016 12:15:04 +0000 (08:15 -0400)
behavior between --local and --submit.  Fix pathmapping bugs where
previously-uploaded files were added to path map but not actually mapped to
target paths.

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py

index c90f8902684304b400cc7ece97068a8e6b094000..c8ba5a4f50afbbd7658eb2494870433db89788ad 100644 (file)
@@ -24,7 +24,7 @@ import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner
+from. runner import Runner, upload_instance
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
@@ -64,6 +64,8 @@ class ArvCwlRunner(object):
         self.pipeline = None
         self.final_output_collection = None
         self.output_name = output_name
         self.pipeline = None
         self.final_output_collection = None
         self.output_name = output_name
+        self.project_uuid = None
+
         if keep_client is not None:
             self.keep_client = keep_client
         else:
         if keep_client is not None:
             self.keep_client = keep_client
         else:
@@ -266,6 +268,8 @@ class ArvCwlRunner(object):
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
+        upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
+
         runnerjob = None
         if kwargs.get("submit"):
             if self.work_api == "containers":
         runnerjob = None
         if kwargs.get("submit"):
             if self.work_api == "containers":
index 228d43304af9e4345b1800043b175cb0ec13f594..5051d977b49298266d3554b1a12e1218f31ede46 100644 (file)
@@ -41,7 +41,7 @@ class ArvPathMapper(PathMapper):
                 if isinstance(st, arvados.commands.run.UploadFile):
                     uploadfiles.add((src, ab, st))
                 elif isinstance(st, arvados.commands.run.ArvFile):
                 if isinstance(st, arvados.commands.run.UploadFile):
                     uploadfiles.add((src, ab, st))
                 elif isinstance(st, arvados.commands.run.ArvFile):
-                    self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+                    self._pathmap[src] = MapperEnt("keep:"+ab, self.collection_pattern % ab, "File")
                 elif src.startswith("_:"):
                     if "contents" in srcobj:
                         pass
                 elif src.startswith("_:"):
                     if "contents" in srcobj:
                         pass
@@ -78,9 +78,11 @@ class ArvPathMapper(PathMapper):
 
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None
 
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None
-        self._pathmap = self.arvrunner.get_uploaded()
         uploadfiles = set()
 
         uploadfiles = set()
 
+        for k,v in self.arvrunner.get_uploaded().iteritems():
+            self._pathmap[k] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+
         for srcobj in referenced_files:
             self.visit(srcobj, uploadfiles)
 
         for srcobj in referenced_files:
             self.visit(srcobj, uploadfiles)
 
@@ -94,7 +96,7 @@ class ArvPathMapper(PathMapper):
                                              project=self.arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
                                              project=self.arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
-            self._pathmap[src] = MapperEnt("keep:" + st.keepref, st.fn, "File")
+            self._pathmap[src] = MapperEnt("keep:" + st.keepref, self.collection_pattern % st.keepref, "File")
             self.arvrunner.add_uploaded(src, self._pathmap[src])
 
         for srcobj in referenced_files:
             self.arvrunner.add_uploaded(src, self._pathmap[src])
 
         for srcobj in referenced_files:
index e5b4e006e8cce7cac780436fc06c6dcd79882730..054d3530cfe174b9a5da3025d248097ca1062d7a 100644 (file)
@@ -112,6 +112,30 @@ def upload_docker(arvrunner, tool):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
 
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
 
+def upload_instance(arvrunner, name, tool, job_order):
+        upload_docker(arvrunner, tool)
+
+        workflowmapper = upload_dependencies(arvrunner,
+                                             name,
+                                             tool.doc_loader,
+                                             tool.tool,
+                                             tool.tool["id"],
+                                             True)
+
+        jobmapper = upload_dependencies(arvrunner,
+                                        os.path.basename(job_order.get("id", "#")),
+                                        tool.doc_loader,
+                                        job_order,
+                                        job_order.get("id", "#"),
+                                        False)
+
+        adjustDirObjs(job_order, trim_listing)
+
+        if "id" in job_order:
+            del job_order["id"]
+
+        return workflowmapper
+
 
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse, output_name):
 
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse, output_name):
@@ -128,31 +152,8 @@ class Runner(object):
         pass
 
     def arvados_job_spec(self, *args, **kwargs):
         pass
 
     def arvados_job_spec(self, *args, **kwargs):
-        upload_docker(self.arvrunner, self.tool)
-
         self.name = os.path.basename(self.tool.tool["id"])
         self.name = os.path.basename(self.tool.tool["id"])
-
-        workflowmapper = upload_dependencies(self.arvrunner,
-                                             self.name,
-                                             self.tool.doc_loader,
-                                             self.tool.tool,
-                                             self.tool.tool["id"],
-                                             True)
-
-        jobmapper = upload_dependencies(self.arvrunner,
-                                        os.path.basename(self.job_order.get("id", "#")),
-                                        self.tool.doc_loader,
-                                        self.job_order,
-                                        self.job_order.get("id", "#"),
-                                        False)
-
-        adjustDirObjs(self.job_order, trim_listing)
-
-        if "id" in self.job_order:
-            del self.job_order["id"]
-
-        return workflowmapper
-
+        return upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
 
     def done(self, record):
         if record["state"] == "Complete":
 
     def done(self, record):
         if record["state"] == "Complete":