X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/aa18c23993738e7615bfdc1c6e9d610f19618b4a..d6a05be901e501bd10c00702fc8540dc1efba68a:/sdk/cwl/arvados_cwl/executor.py diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index cbe65c3f94..f95ac17e1f 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -137,6 +137,7 @@ class ArvCwlExecutor(object): self.fs_access = None self.secret_store = None self.stdout = stdout + self.fast_submit = False if keep_client is not None: self.keep_client = keep_client @@ -520,7 +521,8 @@ The 'jobs' API is no longer supported. for req in job_reqs: tool.requirements.append(req) - def get_git_info(self, tool): + @staticmethod + def get_git_info(tool): in_a_git_repo = False cwd = None filepath = None @@ -541,10 +543,10 @@ The 'jobs' API is no longer supported. git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout - git_branch = subprocess.run(["git", "branch", "--show-current"], cwd=cwd, capture_output=True, text=True).stdout + git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout - git_describe = subprocess.run(["git", "describe", "--always"], cwd=cwd, capture_output=True, text=True).stdout + git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout git_path = filepath[len(git_toplevel):] @@ -572,12 +574,18 @@ The 'jobs' API is no longer supported. return gitproperties + def set_container_request_properties(self, container, properties): + resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries) + for cr in resp["items"]: + cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()}) + self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries) + def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None): self.debug = runtimeContext.debug git_info = self.get_git_info(updated_tool) if git_info: - logger.info("Git provenance of %s", updated_tool.tool["id"]) + logger.info("Git provenance") for g in git_info: if git_info[g]: logger.info(" %s: %s", g.split("#", 1)[1], git_info[g]) @@ -587,7 +595,8 @@ The 'jobs' API is no longer supported. controller = self.api.config()["Services"]["Controller"]["ExternalURL"] logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller) - updated_tool.visit(self.check_features) + if not self.fast_submit: + updated_tool.visit(self.check_features) self.pipeline = None self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir) @@ -655,7 +664,7 @@ The 'jobs' API is no longer supported. loadingContext = self.loadingContext.copy() loadingContext.do_validate = False loadingContext.disable_js_validation = True - if submitting: + if submitting and not self.fast_submit: loadingContext.do_update = False # Document may have been auto-updated. Reload the original # document with updating disabled because we want to @@ -668,9 +677,12 @@ The 'jobs' API is no longer supported. # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. - logger.info("Uploading workflow dependencies") - with Perf(metrics, "upload_workflow_deps"): - merged_map = upload_workflow_deps(self, tool, runtimeContext) + if not self.fast_submit: + logger.info("Uploading workflow dependencies") + with Perf(metrics, "upload_workflow_deps"): + merged_map = upload_workflow_deps(self, tool, runtimeContext) + else: + merged_map = {} # Recreate process object (ArvadosWorkflow or # ArvadosCommandTool) because tool document may have been @@ -776,6 +788,7 @@ The 'jobs' API is no longer supported. current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) if current_container: logger.info("Running inside container %s", current_container.get("uuid")) + self.set_container_request_properties(current_container, git_info) self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout) self.polling_thread = threading.Thread(target=self.poll_states)