API fixup
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 14 Jun 2018 21:56:13 +0000 (17:56 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Mon, 18 Jun 2018 18:46:42 +0000 (14:46 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py

index b2b93bf9e7870e553eab1ec3086f16e2f795b29b..d6ffe7d1612c487c38d132adbb2f8215153f2d46 100644 (file)
@@ -28,6 +28,7 @@ import cwltool.workflow
 import cwltool.process
 from schema_salad.sourceline import SourceLine
 import schema_salad.validate as validate
+import cwltool.argparser
 
 import arvados
 import arvados.config
@@ -122,16 +123,13 @@ class ArvCwlRunner(object):
             else:
                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
 
-    def arv_make_tool(self, toolpath_object, **kwargs):
-        kwargs["work_api"] = self.work_api
-        kwargs["fetcher_constructor"] = self.fetcher_constructor
-        kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
+    def arv_make_tool(self, toolpath_object, loadingContext):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            return ArvadosCommandTool(self, toolpath_object, **kwargs)
+            return ArvadosCommandTool(self, toolpath_object, loadingContext)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
-            return ArvadosWorkflow(self, toolpath_object, **kwargs)
+            return ArvadosWorkflow(self, toolpath_object, loadingContext)
         else:
-            return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
+            return cwltool.workflow.defaultMakeTool(toolpath_object, loadingContext)
 
     def output_callback(self, out, processStatus):
         with self.workflow_eval_lock:
@@ -150,8 +148,8 @@ class ArvCwlRunner(object):
             self.workflow_eval_lock.notifyAll()
 
 
-    def start_run(self, runnable, kwargs):
-        self.task_queue.add(partial(runnable.run, **kwargs))
+    def start_run(self, runnable, runtimeContext):
+        self.task_queue.add(partial(runnable.run, runtimeContext))
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
@@ -373,33 +371,31 @@ class ArvCwlRunner(object):
                                        'progress':1.0
                                    }).execute(num_retries=self.num_retries)
 
-    def arv_executor(self, tool, job_order, **kwargs):
-        self.debug = kwargs.get("debug")
+    def arv_executor(self, tool, job_order, runtimeContext):
+        self.debug = runtimeContext.debug
 
         tool.visit(self.check_features)
 
-        self.project_uuid = kwargs.get("project_uuid")
+        self.project_uuid = runtimeContext.project_uuid
         self.pipeline = None
-        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
-                                                                 collection_cache=self.collection_cache)
-        self.fs_access = make_fs_access(kwargs["basedir"])
-        self.secret_store = kwargs.get("secret_store")
+        self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+        self.secret_store = runtimeContext.secret_store
 
-        self.trash_intermediate = kwargs["trash_intermediate"]
+        self.trash_intermediate = runtimeContext.trash_intermediate
         if self.trash_intermediate and self.work_api != "containers":
             raise Exception("--trash-intermediate is only supported with --api=containers.")
 
-        self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
+        self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
         if self.intermediate_output_ttl and self.work_api != "containers":
             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
         if self.intermediate_output_ttl < 0:
             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
 
-        if kwargs.get("submit_request_uuid") and self.work_api != "containers":
+        if runtimeContext.submit_request_uuid and self.work_api != "containers":
             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
 
-        if not kwargs.get("name"):
-            kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+        if not runtimeContext.name:
+            runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
@@ -409,25 +405,25 @@ class ArvCwlRunner(object):
         # upload_workflow_deps
         # Don't validate this time because it will just print redundant errors.
         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
-                                  makeTool=self.arv_make_tool,
+                                  construct_tool_object=self.arv_make_tool,
                                   loader=tool.doc_loader,
                                   avsc_names=tool.doc_schema,
                                   metadata=tool.metadata,
                                   do_validate=False)
 
         # Upload local file references in the job order.
-        job_order = upload_job_order(self, "%s input" % kwargs["name"],
+        job_order = upload_job_order(self, "%s input" % runtimeContext.name,
                                      tool, job_order)
 
-        existing_uuid = kwargs.get("update_workflow")
-        if existing_uuid or kwargs.get("create_workflow"):
+        existing_uuid = runtimeContext.update_workflow
+        if existing_uuid or runtimeContext.create_workflow:
             # Create a pipeline template or workflow record and exit.
             if self.work_api == "jobs":
                 tmpl = RunnerTemplate(self, tool, job_order,
-                                      kwargs.get("enable_reuse"),
+                                      runtimeContext.enable_reuse,
                                       uuid=existing_uuid,
-                                      submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                      name=kwargs["name"],
+                                      submit_runner_ram=runtimeContext.submit_runner_ram,
+                                      name=runtimeContext.name,
                                       merged_map=merged_map)
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
@@ -436,82 +432,80 @@ class ArvCwlRunner(object):
                 return (upload_workflow(self, tool, job_order,
                                         self.project_uuid,
                                         uuid=existing_uuid,
-                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                        name=kwargs["name"],
+                                        submit_runner_ram=runtimeContext.submit_runner_ram,
+                                        name=runtimeContext.name,
                                         merged_map=merged_map),
                         "success")
 
-        self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
-        self.eval_timeout = kwargs.get("eval_timeout")
+        self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
+        self.eval_timeout = runtimeContext.eval_timeout
 
-        kwargs["make_fs_access"] = make_fs_access
-        kwargs["enable_reuse"] = kwargs.get("enable_reuse")
-        kwargs["use_container"] = True
-        kwargs["tmpdir_prefix"] = "tmp"
-        kwargs["compute_checksum"] = kwargs.get("compute_checksum")
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.use_container = True
+        runtimeContext.tmpdir_prefix = "tmp"
 
         if self.work_api == "containers":
             if self.ignore_docker_for_reuse:
                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
-            kwargs["outdir"] = "/var/spool/cwl"
-            kwargs["docker_outdir"] = "/var/spool/cwl"
-            kwargs["tmpdir"] = "/tmp"
-            kwargs["docker_tmpdir"] = "/tmp"
+            runtimeContext.outdir = "/var/spool/cwl"
+            runtimeContext.docker_outdir = "/var/spool/cwl"
+            runtimeContext.tmpdir = "/tmp"
+            runtimeContext.docker_tmpdir = "/tmp"
         elif self.work_api == "jobs":
-            if kwargs["priority"] != DEFAULT_PRIORITY:
+            if runtimeContext.priority != DEFAULT_PRIORITY:
                 raise Exception("--priority not implemented for jobs API.")
-            kwargs["outdir"] = "$(task.outdir)"
-            kwargs["docker_outdir"] = "$(task.outdir)"
-            kwargs["tmpdir"] = "$(task.tmpdir)"
+            runtimeContext.outdir = "$(task.outdir)"
+            runtimeContext.docker_outdir = "$(task.outdir)"
+            runtimeContext.tmpdir = "$(task.tmpdir)"
 
-        if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
+        if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
             raise Exception("--priority must be in the range 1..1000.")
 
         runnerjob = None
-        if kwargs.get("submit"):
+        if runtimeContext.submit:
             # Submit a runner job to run the workflow for us.
             if self.work_api == "containers":
-                if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
-                    kwargs["runnerjob"] = tool.tool["id"]
+                if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
+                    runtimeContext.runnerjob = tool.tool["id"]
                     runnerjob = tool.job(job_order,
                                          self.output_callback,
-                                         **kwargs).next()
+                                         runtimeContext).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
+                    runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
                                                 self.output_name,
                                                 self.output_tags,
-                                                submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                                name=kwargs.get("name"),
-                                                on_error=kwargs.get("on_error"),
-                                                submit_runner_image=kwargs.get("submit_runner_image"),
-                                                intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
+                                                submit_runner_ram=runtimeContext.submit_runner_ram,
+                                                name=runtimeContext.name,
+                                                on_error=runtimeContext.on_error,
+                                                submit_runner_image=runtimeContext.submit_runner_image,
+                                                intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
                                                 merged_map=merged_map,
                                                 default_storage_classes=self.default_storage_classes,
-                                                priority=kwargs.get("priority"),
+                                                priority=runtimeContext.priority,
                                                 secret_store=self.secret_store)
             elif self.work_api == "jobs":
-                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
+                runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
                                       self.output_name,
                                       self.output_tags,
-                                      submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                      name=kwargs.get("name"),
-                                      on_error=kwargs.get("on_error"),
-                                      submit_runner_image=kwargs.get("submit_runner_image"),
+                                      submit_runner_ram=runtimeContext.submit_runner_ram,
+                                      name=runtimeContext.name,
+                                      on_error=runtimeContext.on_error,
+                                      submit_runner_image=runtimeContext.submit_runner_image,
                                       merged_map=merged_map)
-        elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
+        elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
                     "owner_uuid": self.project_uuid,
-                    "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
+                    "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
                     "components": {},
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
-        if runnerjob and not kwargs.get("wait"):
-            submitargs = kwargs.copy()
-            submitargs['submit'] = False
-            runnerjob.run(**submitargs)
+        if runnerjob and not runtimeContext.wait:
+            submitargs = runtimeContext.copy()
+            submitargs.submit = False
+            runnerjob.run(submitargs)
             return (runnerjob.uuid, "success")
 
         self.poll_api = arvados.api('v1')
@@ -523,11 +517,11 @@ class ArvCwlRunner(object):
         if runnerjob:
             jobiter = iter((runnerjob,))
         else:
-            if "cwl_runner_job" in kwargs:
-                self.uuid = kwargs.get("cwl_runner_job").get('uuid')
+            if runtimeContext.cwl_runner_job is not None:
+                self.uuid = runtimeContext.cwl_runner_job.get('uuid')
             jobiter = tool.job(job_order,
                                self.output_callback,
-                               **kwargs)
+                               runtimeContext)
 
         try:
             self.workflow_eval_lock.acquire()
@@ -549,7 +543,7 @@ class ArvCwlRunner(object):
 
                 if runnable:
                     with Perf(metrics, "run"):
-                        self.start_run(runnable, kwargs)
+                        self.start_run(runnable, runtimeContext)
                 else:
                     if (self.task_queue.in_flight + len(self.processes)) > 0:
                         self.workflow_eval_lock.wait(3)
@@ -590,8 +584,7 @@ class ArvCwlRunner(object):
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
-
-        if kwargs.get("submit") and isinstance(runnerjob, Runner):
+        if runtimeContext.submit and isinstance(runnerjob, Runner):
             logger.info("Final output collection %s", runnerjob.final_output)
         else:
             if self.output_name is None:
@@ -603,7 +596,7 @@ class ArvCwlRunner(object):
             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
             self.set_crunch_output()
 
-        if kwargs.get("compute_checksum"):
+        if runtimeContext.compute_checksum:
             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
@@ -844,26 +837,28 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
     else:
         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
 
-    arvargs.conformance_test = None
+    for key, val in six.iteritems(cwltool.argparser.get_default_args()):
+        if not hasattr(arvargs, key):
+            setattr(arvargs, key, val)
+
     arvargs.use_container = True
     arvargs.relax_path_checks = True
     arvargs.print_supported_versions = False
 
-    make_fs_access = partial(CollectionFsAccess,
-                           collection_cache=runner.collection_cache)
+    loadingContext = LoadingContext(vars(arvargs))
+    loadingContext.fetcher_constructor = runner.fetcher_constructor
+    loadingContext.resolver = partial(collectionResolver, api_client, num_retries=runner.num_retries)
+    loadingContext.construct_tool_object = runner.arv_make_tool
+
+    runtimeContext = RuntimeContext(vars(arvargs))
+    runtimeContext.make_fs_access = partial(CollectionFsAccess,
+                             collection_cache=runner.collection_cache)
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
                              executor=runner.arv_executor,
-                             makeTool=runner.arv_make_tool,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
-                             make_fs_access=make_fs_access,
-                             fetcher_constructor=partial(CollectionFetcher,
-                                                         api_client=api_client,
-                                                         fs_access=make_fs_access(""),
-                                                         num_retries=runner.num_retries),
-                             resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
                              logger_handler=arvados.log_handler,
                              custom_schema_callback=add_arv_hints)
index 05c0212fc6eeae41b2a10279f04ebd71000f1eaf..b2ed0a0c8764541aca95313dfa0abef93e080038 100644 (file)
@@ -14,7 +14,7 @@ import uuid
 import ruamel.yaml as yaml
 
 from cwltool.errors import WorkflowException
-from cwltool.process import get_feature, UnsupportedRequirement, shortname
+from cwltool.process import UnsupportedRequirement, shortname
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.utils import aslist
 
@@ -41,7 +41,7 @@ class ArvadosContainer(object):
     def update_pipeline_component(self, r):
         pass
 
-    def run(self, dry_run=False, pull_image=True, **kwargs):
+    def run(self, runtimeContext):
         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
         # which calls makeJobRunner() to get a new ArvadosContainer
         # object.  The fields that define execution such as
@@ -54,7 +54,7 @@ class ArvadosContainer(object):
             "name": self.name,
             "output_path": self.outdir,
             "cwd": self.outdir,
-            "priority": kwargs.get("priority"),
+            "priority": runtimeContext.priority,
             "state": "Committed",
             "properties": {},
         }
@@ -190,7 +190,7 @@ class ArvadosContainer(object):
             mounts["stdout"] = {"kind": "file",
                                 "path": "%s/%s" % (self.outdir, self.stdout)}
 
-        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+        (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
         if not docker_req:
             docker_req = {"dockerImageId": "arvados/jobs"}
 
@@ -199,11 +199,11 @@ class ArvadosContainer(object):
                                                                      pull_image,
                                                                      self.arvrunner.project_uuid)
 
-        api_req, _ = get_feature(self, "http://arvados.org/cwl#APIRequirement")
+        api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
         if api_req:
             runtime_constraints["API"] = True
 
-        runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
+        runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
         if runtime_req:
             if "keep_cache" in runtime_req:
                 runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
@@ -217,11 +217,11 @@ class ArvadosContainer(object):
                         "writable": True
                     }
 
-        partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
+        partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
         if partition_req:
             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
 
-        intermediate_output_req, _ = get_feature(self, "http://arvados.org/cwl#IntermediateOutput")
+        intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
         if intermediate_output_req:
             self.output_ttl = intermediate_output_req["outputTTL"]
         else:
@@ -236,15 +236,15 @@ class ArvadosContainer(object):
         container_request["runtime_constraints"] = runtime_constraints
         container_request["scheduling_parameters"] = scheduling_parameters
 
-        enable_reuse = kwargs.get("enable_reuse", True)
+        enable_reuse = runtimeContext.enable_reuse
         if enable_reuse:
-            reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement")
+            reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
         container_request["use_existing"] = enable_reuse
 
-        if kwargs.get("runnerjob", "").startswith("arvwf:"):
-            wfuuid = kwargs["runnerjob"][6:kwargs["runnerjob"].index("#")]
+        if runtimeContext.runnerjob.startswith("arvwf:"):
+            wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
             if container_request["name"] == "main":
                 container_request["name"] = wfrecord["name"]
@@ -253,9 +253,9 @@ class ArvadosContainer(object):
         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
 
         try:
-            if kwargs.get("submit_request_uuid"):
+            if runtimeContext.submit_request_uuid:
                 response = self.arvrunner.api.container_requests().update(
-                    uuid=kwargs["submit_request_uuid"],
+                    uuid=runtimeContext.submit_request_uuid,
                     body=container_request
                 ).execute(num_retries=self.arvrunner.num_retries)
             else:
@@ -329,7 +329,7 @@ class ArvadosContainer(object):
 class RunnerContainer(Runner):
     """Submit and manage a container that runs arvados-cwl-runner."""
 
-    def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+    def arvados_job_spec(self, runtimeContext):
         """Create an Arvados container request for this workflow.
 
         The returned dict can be used to create a container passed as
@@ -424,7 +424,7 @@ class RunnerContainer(Runner):
         if self.output_tags:
             command.append("--output-tags=" + self.output_tags)
 
-        if kwargs.get("debug"):
+        if runtimeContext.debug:
             command.append("--debug")
 
         if kwargs.get("storage_classes") and kwargs.get("storage_classes") != self.default_storage_classes:
@@ -449,15 +449,15 @@ class RunnerContainer(Runner):
         return container_req
 
 
-    def run(self, **kwargs):
-        kwargs["keepprefix"] = "keep:"
-        job_spec = self.arvados_job_spec(**kwargs)
+    def run(self, runtimeContext):
+        runtimeContext.keepprefix = "keep:"
+        job_spec = self.arvados_job_spec(runtimeContext)
         if self.arvrunner.project_uuid:
             job_spec["owner_uuid"] = self.arvrunner.project_uuid
 
-        if kwargs.get("submit_request_uuid"):
+        if runtimeContext.submit_request_uuid:
             response = self.arvrunner.api.container_requests().update(
-                uuid=kwargs["submit_request_uuid"],
+                uuid=runtimeContext.submit_request_uuid,
                 body=job_spec
             ).execute(num_retries=self.arvrunner.num_retries)
         else:
index 04256c68f8b10f47ede2fefcabb0172948c2ff00..4973c8a8c61bc2c9362d1b8fffba058af282cd15 100644 (file)
@@ -8,7 +8,7 @@ import copy
 import json
 import time
 
-from cwltool.process import get_feature, shortname, UnsupportedRequirement
+from cwltool.process import shortname, UnsupportedRequirement
 from cwltool.errors import WorkflowException
 from cwltool.command_line_tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
@@ -44,7 +44,7 @@ class ArvadosJob(object):
         self.running = False
         self.uuid = None
 
-    def run(self, dry_run=False, pull_image=True, **kwargs):
+    def run(self, runtimeContext):
         script_parameters = {
             "command": self.command_line
         }
@@ -96,8 +96,8 @@ class ArvadosJob(object):
             script_parameters["task.permanentFailCodes"] = self.permanentFailCodes
 
         with Perf(metrics, "arv_docker_get_image %s" % self.name):
-            (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
-            if docker_req and kwargs.get("use_container") is not False:
+            (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
+            if docker_req and runtimeContextuse_container is not False:
                 if docker_req.get("dockerOutputDirectory"):
                     raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
@@ -111,7 +111,7 @@ class ArvadosJob(object):
             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
 
-        runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
+        runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
         if runtime_req:
             if "keep_cache" in runtime_req:
                 runtime_constraints["keep_cache_mb_per_task"] = runtime_req["keep_cache"]
@@ -128,9 +128,9 @@ class ArvadosJob(object):
         if not self.arvrunner.ignore_docker_for_reuse:
             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
 
-        enable_reuse = kwargs.get("enable_reuse", True)
+        enable_reuse = runtimeContext.enable_reuse
         if enable_reuse:
-            reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement")
+            reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
 
@@ -269,7 +269,7 @@ class ArvadosJob(object):
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
 
-    def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+    def arvados_job_spec(self, runtimeContext):
         """Create an Arvados job specification for this workflow.
 
         The returned dict can be used to create a job (i.e., passed as
@@ -299,7 +299,7 @@ class RunnerJob(Runner):
         if self.on_error:
             self.job_order["arv:on_error"] = self.on_error
 
-        if kwargs.get("debug"):
+        if runtimeContext.debug:
             self.job_order["arv:debug"] = True
 
         return {
@@ -314,8 +314,8 @@ class RunnerJob(Runner):
             }
         }
 
-    def run(self, **kwargs):
-        job_spec = self.arvados_job_spec(**kwargs)
+    def run(self, runtimeContext):
+        job_spec = self.arvados_job_spec(runtimeContext)
 
         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
 
@@ -346,7 +346,7 @@ class RunnerJob(Runner):
             body=instance_spec).execute(num_retries=self.arvrunner.num_retries)
         logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
 
-        if kwargs.get("wait") is False:
+        if runtimeContext.wait is False:
             self.uuid = self.arvrunner.pipeline["uuid"]
             return
 
index fea6adfacc323539d7c2cd595f66d441859893b8..f4caf7f4bd4eae824c2a78e3e7f542a3dd519f66 100644 (file)
@@ -10,50 +10,50 @@ from .pathmapper import ArvPathMapper
 class ArvadosCommandTool(CommandLineTool):
     """Wrap cwltool CommandLineTool to override selected methods."""
 
-    def __init__(self, arvrunner, toolpath_object, **kwargs):
-        super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
+    def __init__(self, arvrunner, toolpath_object, runtimeContext):
+        super(ArvadosCommandTool, self).__init__(toolpath_object, runtimeContext)
         self.arvrunner = arvrunner
-        self.work_api = kwargs["work_api"]
+        self.work_api = runtimeContext.work_api
 
-    def makeJobRunner(self, **kwargs):
+    def makeJobRunner(self, runtimeContext):
         if self.work_api == "containers":
             return ArvadosContainer(self.arvrunner)
         elif self.work_api == "jobs":
             return ArvadosJob(self.arvrunner)
 
-    def makePathMapper(self, reffiles, stagedir, **kwargs):
+    def makePathMapper(self, reffiles, stagedir, runtimeContext):
         # type: (List[Any], unicode, **Any) -> PathMapper
         if self.work_api == "containers":
-            return ArvPathMapper(self.arvrunner, reffiles+kwargs.get("extra_reffiles", []), kwargs["basedir"],
+            return ArvPathMapper(self.arvrunner, reffiles+runtimeContext.extra_reffiles, runtimeContext.basedir,
                                  "/keep/%s",
-                                 "/keep/%s/%s",
-                                 **kwargs)
+                                 "/keep/%s/%s")
         elif self.work_api == "jobs":
-            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+            return ArvPathMapper(self.arvrunner, reffiles, runtimeContext.basedir,
                                  "$(task.keep)/%s",
-                                 "$(task.keep)/%s/%s",
-                                 **kwargs)
+                                 "$(task.keep)/%s/%s")
 
-    def job(self, joborder, output_callback, **kwargs):
+    def job(self, joborder, output_callback, runtimeContext):
 
         # Workaround for #13365
-        builderargs = kwargs.copy()
-        builderargs["toplevel"] = True
-        builderargs["tmp_outdir_prefix"] = ""
-        builder = self._init_job(joborder, **builderargs)
+        builderargs = runtimeContext.copy()
+        builderargs.toplevel = True
+        builderargs.tmp_outdir_prefix = ""
+        builder = self._init_job(joborder, builderargs)
         joborder = builder.job
 
+        runtimeContext = runtimeContext.copy()
+
         if self.work_api == "containers":
             dockerReq, is_req = self.get_requirement("DockerRequirement")
             if dockerReq and dockerReq.get("dockerOutputDirectory"):
-                kwargs["outdir"] = dockerReq.get("dockerOutputDirectory")
-                kwargs["docker_outdir"] = dockerReq.get("dockerOutputDirectory")
+                runtimeContext.outdir = dockerReq.get("dockerOutputDirectory")
+                runtimeContext.docker_outdir = dockerReq.get("dockerOutputDirectory")
             else:
-                kwargs["outdir"] = "/var/spool/cwl"
-                kwargs["docker_outdir"] = "/var/spool/cwl"
+                runtimeContext.outdir = "/var/spool/cwl"
+                runtimeContext.docker_outdir = "/var/spool/cwl"
         elif self.work_api == "jobs":
-            kwargs["outdir"] = "$(task.outdir)"
-            kwargs["docker_outdir"] = "$(task.outdir)"
-            kwargs["tmpdir"] = "$(task.tmpdir)"
-            kwargs["docker_tmpdir"] = "$(task.tmpdir)"
-        return super(ArvadosCommandTool, self).job(joborder, output_callback, **kwargs)
+            runtimeContext.outdir = "$(task.outdir)"
+            runtimeContext.docker_outdir = "$(task.outdir)"
+            runtimeContext.tmpdir = "$(task.tmpdir)"
+            runtimeContext.docker_tmpdir = "$(task.tmpdir)"
+        return super(ArvadosCommandTool, self).job(joborder, output_callback, runtimeContext)
index f675fb10e80811e92e90b209f073c806c9777afb..94b8568db7e94dd22059a4a605b8638d950131b7 100644 (file)
@@ -15,6 +15,7 @@ from cwltool.process import shortname
 from cwltool.workflow import Workflow, WorkflowException
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import Builder
+from cwltool.context import LoadingContext
 
 import ruamel.yaml as yaml
 
@@ -109,17 +110,16 @@ def get_overall_res_req(res_reqs):
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
 
-    def __init__(self, arvrunner, toolpath_object, **kwargs):
-        super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
+    def __init__(self, arvrunner, toolpath_object, loadingContext):
+        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
         self.arvrunner = arvrunner
-        self.work_api = kwargs["work_api"]
         self.wf_pdh = None
         self.dynamic_resource_req = []
         self.static_resource_req = []
         self.wf_reffiles = []
+        self.loadingContext = loadingContext
 
-    def job(self, joborder, output_callback, **kwargs):
-        kwargs["work_api"] = self.work_api
+    def job(self, joborder, output_callback, runtimeContext):
         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
         if req:
             with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
@@ -176,7 +176,7 @@ class ArvadosWorkflow(Workflow):
                         self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
 
                     upload_dependencies(self.arvrunner,
-                                        kwargs.get("name", ""),
+                                        runtimeContext.name,
                                         document_loader,
                                         packed,
                                         uri,
@@ -213,15 +213,16 @@ class ArvadosWorkflow(Workflow):
                 reffiles = []
                 visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
 
-                mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, kwargs["basedir"],
-                                 "/keep/%s",
-                                 "/keep/%s/%s",
-                                 **kwargs)
+                mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
+                                       "/keep/%s",
+                                       "/keep/%s/%s")
 
                 # For containers API, we need to make sure any extra
                 # referenced files (ie referenced by the workflow but
                 # not in the inputs) are included in the mounts.
-                kwargs["extra_reffiles"] = copy.deepcopy(self.wf_reffiles)
+                if self.wf_reffiles:
+                    runtimeContext = runtimeContext.copy()
+                    runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
 
                 def keepmount(obj):
                     remove_redundant_fields(obj)
@@ -275,9 +276,6 @@ class ArvadosWorkflow(Workflow):
                 "arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
                 "id": "#"
             })
-            kwargs["loader"] = self.doc_loader
-            kwargs["avsc_names"] = self.doc_schema
-            kwargs["metadata"]  = self.metadata
-            return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
+            return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
         else:
-            return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
+            return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
index 5024e95f77df785abf668c68364dadc4d49fb2a4..94e9c8fc84abcd2511935d63b7c413e77da38948 100644 (file)
@@ -27,6 +27,7 @@ from cwltool.process import shortname
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
 from cwltool.load_tool import load_tool
 from cwltool.errors import WorkflowException
+from cwltool.context import RuntimeContext
 
 from .fsaccess import CollectionFetcher, CollectionFsAccess
 
@@ -115,7 +116,7 @@ def run():
             logging.getLogger('arvados').setLevel(logging.DEBUG)
             logging.getLogger("cwltool").setLevel(logging.DEBUG)
 
-        args = argparse.Namespace()
+        args = RuntimeContext()
         args.project_uuid = arvados.current_job()["owner_uuid"]
         args.enable_reuse = enable_reuse
         args.on_error = on_error
@@ -134,7 +135,7 @@ def run():
         args.disable_js_validation = False
         args.tmp_outdir_prefix = "tmp"
 
-        runner.arv_executor(t, job_order_object, **vars(args))
+        runner.arv_executor(t, job_order_object, args)
     except Exception as e:
         if isinstance(e, WorkflowException):
             logging.info("Workflow error %s", e)
index 27e48f1f4408e33630985f2060ba738af720111f..05a358e0d57a44f26cf6a4ec26c9d1bd35200163 100644 (file)
@@ -42,7 +42,7 @@ class ArvPathMapper(PathMapper):
     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
 
     def __init__(self, arvrunner, referenced_files, input_basedir,
-                 collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
+                 collection_pattern, file_pattern, name=None, single_collection=False):
         self.arvrunner = arvrunner
         self.input_basedir = input_basedir
         self.collection_pattern = collection_pattern
index f907d33951c45a5d707bb15dd18c9154ae1b5bad..87acf15d657c89dec9221bf3a6b2fb1d3f612e32 100644 (file)
@@ -16,7 +16,7 @@ from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
+from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
 from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.utils import aslist
@@ -201,7 +201,7 @@ def upload_docker(arvrunner, tool):
     """Uploads Docker images used in CommandLineTool objects."""
 
     if isinstance(tool, CommandLineTool):
-        (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+        (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
         if docker_req:
             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
                 # TODO: can be supported by containers API, but not jobs API.
@@ -362,7 +362,7 @@ class Runner(object):
         if enable_reuse:
             # If reuse is permitted by command line arguments but
             # disabled by the workflow itself, disable it.
-            reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
+            reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
         self.enable_reuse = enable_reuse
index 4c31d3b4450eac66ed5f839de2d34842913067f0..c288603753373cc60165ef101f966a4ac15f0736 100644 (file)
@@ -33,7 +33,7 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20180524215209',
+          'cwltool==1.0.20180614214548',
           'schema-salad==2.7.20180501211602',
           'typing >= 3.5.3',
           'ruamel.yaml >=0.13.11, <0.15',