13627: Fix loadingContext used for reloading document.
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 15 Jun 2018 14:11:01 +0000 (10:11 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Mon, 18 Jun 2018 18:56:08 +0000 (14:56 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/runner.py

index 44873ebbc03dad2497537aee638fcc53b1604e8e..2dd5497355f6891268f8767170b72c7b578493d1 100644 (file)
@@ -71,9 +71,19 @@ class ArvCwlRunner(object):
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None,
-                 output_name=None, output_tags=None, default_storage_classes="default",
-                 num_retries=4, thread_count=4):
+    def __init__(self, api_client,
+                 arvargs=None,
+                 keep_client=None,
+                 num_retries=4,
+                 thread_count=4):
+
+        if arvargs is None:
+            arvargs = argparse.Namespace()
+            arvargs.work_api = None
+            arvargs.output_name = None
+            arvargs.output_tags = None
+            arvargs.thread_count = 1
+
         self.api = api_client
         self.processes = {}
         self.workflow_eval_lock = threading.Condition(threading.RLock())
@@ -85,15 +95,15 @@ class ArvCwlRunner(object):
         self.poll_api = None
         self.pipeline = None
         self.final_output_collection = None
-        self.output_name = output_name
-        self.output_tags = output_tags
+        self.output_name = arvargs.output_name
+        self.output_tags = arvargs.output_tags
         self.project_uuid = None
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
-        self.thread_count = thread_count
+        self.thread_count = arvargs.thread_count
         self.poll_interval = 12
-        self.default_storage_classes = default_storage_classes
+        self.loadingContext = None
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -113,18 +123,24 @@ class ArvCwlRunner(object):
             try:
                 methods = self.api._rootDesc.get('resources')[api]['methods']
                 if ('httpMethod' in methods['create'] and
-                    (work_api == api or work_api is None)):
+                    (arvargs.work_api == api or arvargs.work_api is None)):
                     self.work_api = api
                     break
             except KeyError:
                 pass
 
         if not self.work_api:
-            if work_api is None:
+            if arvargs.work_api is None:
                 raise Exception("No supported APIs")
             else:
                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
 
+        self.loadingContext = ArvLoadingContext(vars(arvargs))
+        self.loadingContext.fetcher_constructor = self.fetcher_constructor
+        self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
+        self.loadingContext.construct_tool_object = self.arv_make_tool
+
+
     def arv_make_tool(self, toolpath_object, loadingContext):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, loadingContext)
@@ -406,12 +422,11 @@ class ArvCwlRunner(object):
         # Reload tool object which may have been updated by
         # upload_workflow_deps
         # Don't validate this time because it will just print redundant errors.
-        loadingContext = ArvLoadingContext({
-            "construct_tool_object": self.arv_make_tool,
-            "loader": tool.doc_loader,
-            "avsc_names": tool.doc_schema,
-            "metadata": tool.metadata,
-            "do_validate": False})
+        loadingContext = self.loadingContext.copy()
+        loadingContext.loader = tool.doc_loader
+        loadingContext.avsc_names = tool.doc_schema
+        loadingContext.metadata = tool.metadata
+        loadingContext.do_validate = False
 
         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
                                   loadingContext)
@@ -485,7 +500,6 @@ class ArvCwlRunner(object):
                                                 submit_runner_image=runtimeContext.submit_runner_image,
                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
                                                 merged_map=merged_map,
-                                                default_storage_classes=self.default_storage_classes,
                                                 priority=runtimeContext.priority,
                                                 secret_store=self.secret_store)
             elif self.work_api == "jobs":
@@ -597,7 +611,7 @@ class ArvCwlRunner(object):
             if self.output_tags is None:
                 self.output_tags = ""
 
-            storage_classes = kwargs.get("storage_classes").strip().split(",")
+            storage_classes = runtimeContext.storage_classes.strip().split(",")
             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
             self.set_crunch_output()
 
@@ -787,6 +801,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
         logger.error("Multiple storage classes are not supported currently.")
         return 1
 
+    arvargs.use_container = True
+    arvargs.relax_path_checks = True
+    arvargs.print_supported_versions = False
+
     if install_sig_handlers:
         arv_cmd.install_signal_handlers()
 
@@ -814,10 +832,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
             keep_client = api_client.keep
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
-        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
-                              num_retries=4, output_name=arvargs.output_name,
-                              output_tags=arvargs.output_tags, default_storage_classes=parser.get_default("storage_classes"),
-                              thread_count=arvargs.thread_count)
+        runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
     except Exception as e:
         logger.error(e)
         return 1
@@ -846,15 +861,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
         if not hasattr(arvargs, key):
             setattr(arvargs, key, val)
 
-    arvargs.use_container = True
-    arvargs.relax_path_checks = True
-    arvargs.print_supported_versions = False
-
-    loadingContext = ArvLoadingContext(vars(arvargs))
-    loadingContext.fetcher_constructor = runner.fetcher_constructor
-    loadingContext.resolver = partial(collectionResolver, api_client, num_retries=runner.num_retries)
-    loadingContext.construct_tool_object = runner.arv_make_tool
-
     runtimeContext = ArvRuntimeContext(vars(arvargs))
     runtimeContext.make_fs_access = partial(CollectionFsAccess,
                              collection_cache=runner.collection_cache)
@@ -867,5 +873,5 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
                              job_order_object=job_order_object,
                              logger_handler=arvados.log_handler,
                              custom_schema_callback=add_arv_hints,
-                             loadingContext=loadingContext,
+                             loadingContext=runner.loadingContext,
                              runtimeContext=runtimeContext)
index 4953afa79743c8e5601fe332fbedc82059800450..cb362187b969d40559eb13d0924333d76b063b0b 100644 (file)
@@ -436,8 +436,8 @@ class RunnerContainer(Runner):
         if runtimeContext.debug:
             command.append("--debug")
 
-        if kwargs.get("storage_classes") and kwargs.get("storage_classes") != self.default_storage_classes:
-            command.append("--storage-classes=" + kwargs.get("storage_classes"))
+        if runtimeContext.storage_classes != "default":
+            command.append("--storage-classes=" + runtimeContext.storage_classes
 
         if self.on_error:
             command.append("--on-error=" + self.on_error)
index 940ae92e772afc9de7df30697e5ddaa60dda4887..0ab065d7874149296382c94a2e593bc88ab5fb1c 100644 (file)
@@ -278,7 +278,7 @@ class ArvadosJob(JobBase):
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
 
-    def arvados_job_spec(self, runtimeContext):
+    def arvados_job_spec(self, debug=False):
         """Create an Arvados job specification for this workflow.
 
         The returned dict can be used to create a job (i.e., passed as
@@ -308,7 +308,7 @@ class RunnerJob(Runner):
         if self.on_error:
             self.job_order["arv:on_error"] = self.on_error
 
-        if runtimeContext.debug:
+        if debug:
             self.job_order["arv:debug"] = True
 
         return {
@@ -324,7 +324,7 @@ class RunnerJob(Runner):
         }
 
     def run(self, runtimeContext):
-        job_spec = self.arvados_job_spec(runtimeContext)
+        job_spec = self.arvados_job_spec(runtimeContext.debug)
 
         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
 
index 87acf15d657c89dec9221bf3a6b2fb1d3f612e32..a1c8c1bfb5891c72126c9f28f9d63cc5e972eeb0 100644 (file)
@@ -353,7 +353,7 @@ class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
-                 intermediate_output_ttl=0, merged_map=None, default_storage_classes="default",
+                 intermediate_output_ttl=0, merged_map=None,
                  priority=None, secret_store=None):
         self.arvrunner = runner
         self.tool = tool
@@ -376,7 +376,6 @@ class Runner(object):
         self.intermediate_output_ttl = intermediate_output_ttl
         self.priority = priority
         self.secret_store = secret_store
-        self.default_storage_classes = default_storage_classes
 
         if submit_runner_ram:
             self.submit_runner_ram = submit_runner_ram