From: Peter Amstutz Date: Wed, 11 May 2022 21:19:51 +0000 (-0400) Subject: 19070: Prefer to use the passed-in runtimeContext X-Git-Tag: 2.5.0~179^2~2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/9ca1b9f63b0fdaac45d04393e413cbc0b7f16d88 19070: Prefer to use the passed-in runtimeContext Code cleanup, change most places to use the passed-in runtimeContext instead of the ArvRunner top level runtimeContext. Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 3faa510a0a..21b629f37a 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -219,7 +219,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.") - exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave depenencies where they are.") + exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.") parser.add_argument( "--skip-schemas", @@ -367,5 +367,5 @@ def main(args=sys.argv[1:], logger_handler=arvados.log_handler, custom_schema_callback=add_arv_hints, loadingContext=executor.loadingContext, - runtimeContext=executor.runtimeContext, + runtimeContext=executor.toplevel_runtimeContext, input_required=not (arvargs.create_workflow or arvargs.update_workflow)) diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 33b4c90c61..5082cc2f4b 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -466,7 +466,7 @@ class RunnerContainer(Runner): "cwd": "/var/spool/cwl", "priority": self.priority, "state": "Committed", - "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image), + "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext), "mounts": { "/var/lib/cwl/cwl.input.json": { "kind": "json", @@ -501,7 +501,7 @@ class RunnerContainer(Runner): "portable_data_hash": "%s" % workflowcollection } else: - packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map) + packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext) workflowpath = "/var/lib/cwl/workflow.json#main" container_req["mounts"]["/var/lib/cwl/workflow.json"] = { "kind": "json", @@ -551,17 +551,17 @@ class RunnerContainer(Runner): if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes: command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes) - if self.on_error: + if runtimeContext.on_error: command.append("--on-error=" + self.on_error) - if self.intermediate_output_ttl: - command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl) + if runtimeContext.intermediate_output_ttl: + command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl) - if self.arvrunner.trash_intermediate: + if runtimeContext.trash_intermediate: command.append("--trash-intermediate") - if self.arvrunner.project_uuid: - command.append("--project-uuid="+self.arvrunner.project_uuid) + if runtimeContext.project_uuid: + command.append("--project-uuid="+runtimeContext.project_uuid) if self.enable_dev: command.append("--enable-dev") @@ -582,8 +582,8 @@ class RunnerContainer(Runner): def run(self, runtimeContext): runtimeContext.keepprefix = "keep:" job_spec = self.arvados_job_spec(runtimeContext) - if self.arvrunner.project_uuid: - job_spec["owner_uuid"] = self.arvrunner.project_uuid + if runtimeContext.project_uuid: + job_spec["owner_uuid"] = runtimeContext.project_uuid extra_submit_params = {} if runtimeContext.submit_runner_cluster: diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index 4fe82a6fe1..8eb12913ad 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -37,11 +37,12 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics') max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax") sum_res_pars = ("outdirMin", "outdirMax") -def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, +def upload_workflow(arvRunner, tool, job_order, project_uuid, + runtimeContext, uuid=None, submit_runner_ram=0, name=None, merged_map=None, submit_runner_image=None): - packed = packed_workflow(arvRunner, tool, merged_map) + packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext) adjustDirObjs(job_order, trim_listing) adjustFileObjs(job_order, trim_anonymous_location) @@ -57,7 +58,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, name = tool.tool.get("label", os.path.basename(tool.tool["id"])) upload_dependencies(arvRunner, name, tool.doc_loader, - packed, tool.tool["id"], False) + packed, tool.tool["id"], False, + runtimeContext) wf_runner_resources = None @@ -72,7 +74,9 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"} hints.append(wf_runner_resources) - wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, submit_runner_image or "arvados/jobs:"+__version__) + wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, + submit_runner_image or "arvados/jobs:"+__version__, + runtimeContext) if submit_runner_ram: wf_runner_resources["ramMin"] = submit_runner_ram @@ -194,7 +198,8 @@ class ArvadosWorkflow(Workflow): self.doc_loader, joborder, joborder.get("id", "#"), - False) + False, + runtimeContext) if self.wf_pdh is None: packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader) @@ -237,7 +242,8 @@ class ArvadosWorkflow(Workflow): self.doc_loader, packed, self.tool["id"], - False) + False, + runtimeContext) # Discover files/directories referenced by the # workflow (mainly "default" values) diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 0daa15d5f0..70bc0c4572 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -197,11 +197,11 @@ The 'jobs' API is no longer supported. handler = RuntimeStatusLoggingHandler(self.runtime_status_update) root_logger.addHandler(handler) - self.runtimeContext = ArvRuntimeContext(vars(arvargs)) - self.runtimeContext.make_fs_access = partial(CollectionFsAccess, + self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs)) + self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess, collection_cache=self.collection_cache) - validate_cluster_target(self, self.runtimeContext) + validate_cluster_target(self, self.toplevel_runtimeContext) def arv_make_tool(self, toolpath_object, loadingContext): @@ -535,6 +535,8 @@ The 'jobs' API is no longer supported. if runtimeContext.submit_request_uuid and self.work_api != "containers": raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api)) + runtimeContext = runtimeContext.copy() + default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True]) if runtimeContext.storage_classes == "default": runtimeContext.storage_classes = default_storage_classes @@ -544,14 +546,14 @@ The 'jobs' API is no longer supported. if not runtimeContext.name: runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"]) - if self.runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow): + if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow): # When creating or updating workflow record, by default # always copy dependencies and ensure Docker images are up # to date. - self.runtimeContext.copy_deps = True - self.runtimeContext.match_local_docker = True + runtimeContext.copy_deps = True + runtimeContext.match_local_docker = True - if self.runtimeContext.update_workflow and self.project_uuid is None: + if runtimeContext.update_workflow and self.project_uuid is None: # If we are updating a workflow, make sure anything that # gets uploaded goes into the same parent project, unless # an alternate --project-uuid was provided. @@ -560,7 +562,7 @@ The 'jobs' API is no longer supported. # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % runtimeContext.name, - updated_tool, job_order) + updated_tool, job_order, runtimeContext) # the last clause means: if it is a command line tool, and we # are going to wait for the result, and always_submit_runner @@ -587,7 +589,7 @@ The 'jobs' API is no longer supported. # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. - merged_map = upload_workflow_deps(self, tool) + merged_map = upload_workflow_deps(self, tool, runtimeContext) # Recreate process object (ArvadosWorkflow or # ArvadosCommandTool) because tool document may have been @@ -602,12 +604,13 @@ The 'jobs' API is no longer supported. # Create a pipeline template or workflow record and exit. if self.work_api == "containers": uuid = upload_workflow(self, tool, job_order, - self.project_uuid, - uuid=runtimeContext.update_workflow, - submit_runner_ram=runtimeContext.submit_runner_ram, - name=runtimeContext.name, - merged_map=merged_map, - submit_runner_image=runtimeContext.submit_runner_image) + self.project_uuid, + runtimeContext, + uuid=runtimeContext.update_workflow, + submit_runner_ram=runtimeContext.submit_runner_ram, + name=runtimeContext.name, + merged_map=merged_map, + submit_runner_image=runtimeContext.submit_runner_image) self.stdout.write(uuid + "\n") return (None, "success") @@ -616,7 +619,6 @@ The 'jobs' API is no longer supported. self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse self.eval_timeout = runtimeContext.eval_timeout - runtimeContext = runtimeContext.copy() runtimeContext.use_container = True runtimeContext.tmpdir_prefix = "tmp" runtimeContext.work_api = self.work_api diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 10323c2be7..f39c98d882 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -240,7 +240,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No set_secondary(fsaccess, builder, inputschema, None, primary, discovered) def upload_dependencies(arvrunner, name, document_loader, - workflowobj, uri, loadref_run, + workflowobj, uri, loadref_run, runtimeContext, include_primary=True, discovered_secondaryfiles=None): """Upload the dependencies of the workflowobj document to Keep. @@ -449,12 +449,12 @@ def upload_dependencies(arvrunner, name, document_loader, for d in discovered: discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d] - if arvrunner.runtimeContext.copy_deps: + if runtimeContext.copy_deps: # Find referenced collections and copy them into the # destination project, for easy sharing. already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list, filters=[["portable_data_hash", "in", list(keeprefs)], - ["owner_uuid", "=", arvrunner.project_uuid]], + ["owner_uuid", "=", runtimeContext.project_uuid]], select=["uuid", "portable_data_hash", "created_at"])) keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present) @@ -469,7 +469,7 @@ def upload_dependencies(arvrunner, name, document_loader, col = col["items"][0] try: arvrunner.api.collections().create(body={"collection": { - "owner_uuid": arvrunner.project_uuid, + "owner_uuid": runtimeContext.project_uuid, "name": col["name"], "description": col["description"], "properties": col["properties"], @@ -491,7 +491,7 @@ def upload_dependencies(arvrunner, name, document_loader, return mapper -def upload_docker(arvrunner, tool): +def upload_docker(arvrunner, tool, runtimeContext): """Uploads Docker images used in CommandLineTool objects.""" if isinstance(tool, CommandLineTool): @@ -501,24 +501,26 @@ def upload_docker(arvrunner, tool): raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker, - arvrunner.runtimeContext.copy_deps) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) else: arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__}, - True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker, - arvrunner.runtimeContext.copy_deps) + True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: - upload_docker(arvrunner, s.embedded_tool) + upload_docker(arvrunner, s.embedded_tool, runtimeContext) -def packed_workflow(arvrunner, tool, merged_map): +def packed_workflow(arvrunner, tool, merged_map, runtimeContext): """Create a packed workflow. A "packed" workflow is one where all the components have been combined into a single document.""" @@ -547,11 +549,11 @@ def packed_workflow(arvrunner, tool, merged_map): v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] if v.get("class") == "DockerRequirement": v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, - arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker, - arvrunner.runtimeContext.copy_deps) + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -572,7 +574,7 @@ def tag_git_version(packed): packed["http://schema.org/version"] = githash -def upload_job_order(arvrunner, name, tool, job_order): +def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): """Upload local files referenced in the input object and return updated input object with 'location' updated to the proper keep references. """ @@ -608,7 +610,8 @@ def upload_job_order(arvrunner, name, tool, job_order): tool.doc_loader, job_order, job_order.get("id", "#"), - False) + False, + runtimeContext) if "id" in job_order: del job_order["id"] @@ -622,10 +625,10 @@ def upload_job_order(arvrunner, name, tool, job_order): FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"]) -def upload_workflow_deps(arvrunner, tool): +def upload_workflow_deps(arvrunner, tool, runtimeContext): # Ensure that Docker images needed by this workflow are available - upload_docker(arvrunner, tool) + upload_docker(arvrunner, tool, runtimeContext) document_loader = tool.doc_loader @@ -640,6 +643,7 @@ def upload_workflow_deps(arvrunner, tool): deptool, deptool["id"], False, + runtimeContext, include_primary=False, discovered_secondaryfiles=discovered_secondaryfiles) document_loader.idx[deptool["id"]] = deptool @@ -652,15 +656,17 @@ def upload_workflow_deps(arvrunner, tool): return merged_map -def arvados_jobs_image(arvrunner, img): +def arvados_jobs_image(arvrunner, img, runtimeContext): """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it.""" try: - return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker, - arvrunner.runtimeContext.copy_deps) + return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, + True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) ) diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py index ba0557a9c0..305d51e144 100644 --- a/sdk/cwl/tests/test_submit.py +++ b/sdk/cwl/tests/test_submit.py @@ -1117,7 +1117,7 @@ class TestSubmit(unittest.TestCase): "portable_data_hash": "9999999999999999999999999999999b+99"} self.assertEqual("9999999999999999999999999999999b+99", - arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__)) + arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__, arvrunner.runtimeContext)) @stubs