X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ff91b02b4c768ca9f583454b0909fa030d89debe..93380e6aec7e11607019cdce88419b6f708327d7:/sdk/cwl/arvados_cwl/executor.py diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 680ca0b7b2..1759e4ac28 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -197,11 +197,11 @@ The 'jobs' API is no longer supported. handler = RuntimeStatusLoggingHandler(self.runtime_status_update) root_logger.addHandler(handler) - self.runtimeContext = ArvRuntimeContext(vars(arvargs)) - self.runtimeContext.make_fs_access = partial(CollectionFsAccess, + self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs)) + self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess, collection_cache=self.collection_cache) - validate_cluster_target(self, self.runtimeContext) + validate_cluster_target(self, self.toplevel_runtimeContext) def arv_make_tool(self, toolpath_object, loadingContext): @@ -517,7 +517,6 @@ The 'jobs' API is no longer supported. updated_tool.visit(self.check_features) - self.project_uuid = runtimeContext.project_uuid self.pipeline = None self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir) self.secret_store = runtimeContext.secret_store @@ -535,6 +534,8 @@ The 'jobs' API is no longer supported. if runtimeContext.submit_request_uuid and self.work_api != "containers": raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api)) + runtimeContext = runtimeContext.copy() + default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True]) if runtimeContext.storage_classes == "default": runtimeContext.storage_classes = default_storage_classes @@ -544,9 +545,25 @@ The 'jobs' API is no longer supported. if not runtimeContext.name: runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"]) + if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow): + # When creating or updating workflow record, by default + # always copy dependencies and ensure Docker images are up + # to date. + runtimeContext.copy_deps = True + runtimeContext.match_local_docker = True + + if runtimeContext.update_workflow and self.project_uuid is None: + # If we are updating a workflow, make sure anything that + # gets uploaded goes into the same parent project, unless + # an alternate --project-uuid was provided. + existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute() + runtimeContext.project_uuid = existing_wf["owner_uuid"] + + self.project_uuid = runtimeContext.project_uuid + # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % runtimeContext.name, - updated_tool, job_order) + updated_tool, job_order, runtimeContext) # the last clause means: if it is a command line tool, and we # are going to wait for the result, and always_submit_runner @@ -573,7 +590,7 @@ The 'jobs' API is no longer supported. # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. - merged_map = upload_workflow_deps(self, tool) + merged_map = upload_workflow_deps(self, tool, runtimeContext) # Recreate process object (ArvadosWorkflow or # ArvadosCommandTool) because tool document may have been @@ -584,17 +601,17 @@ The 'jobs' API is no longer supported. loadingContext.metadata = tool.metadata tool = load_tool(tool.tool, loadingContext) - existing_uuid = runtimeContext.update_workflow - if existing_uuid or runtimeContext.create_workflow: + if runtimeContext.update_workflow or runtimeContext.create_workflow: # Create a pipeline template or workflow record and exit. if self.work_api == "containers": uuid = upload_workflow(self, tool, job_order, - self.project_uuid, - uuid=existing_uuid, - submit_runner_ram=runtimeContext.submit_runner_ram, - name=runtimeContext.name, - merged_map=merged_map, - submit_runner_image=runtimeContext.submit_runner_image) + runtimeContext.project_uuid, + runtimeContext, + uuid=runtimeContext.update_workflow, + submit_runner_ram=runtimeContext.submit_runner_ram, + name=runtimeContext.name, + merged_map=merged_map, + submit_runner_image=runtimeContext.submit_runner_image) self.stdout.write(uuid + "\n") return (None, "success") @@ -603,7 +620,6 @@ The 'jobs' API is no longer supported. self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse self.eval_timeout = runtimeContext.eval_timeout - runtimeContext = runtimeContext.copy() runtimeContext.use_container = True runtimeContext.tmpdir_prefix = "tmp" runtimeContext.work_api = self.work_api