X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f509fecc2eef88be0a7b88d491ae0c0ac13d686b..60e38e35f49de11b5752c2bfb7f9f10443d82c49:/sdk/cwl/arvados_cwl/executor.py diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index edb9d5b523..381b885431 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -18,6 +18,7 @@ import json import re from functools import partial import time +import urllib from cwltool.errors import WorkflowException import cwltool.workflow @@ -99,7 +100,8 @@ class ArvCwlExecutor(object): arvargs=None, keep_client=None, num_retries=4, - thread_count=4): + thread_count=4, + stdout=sys.stdout): if arvargs is None: arvargs = argparse.Namespace() @@ -132,6 +134,7 @@ class ArvCwlExecutor(object): self.should_estimate_cache_size = True self.fs_access = None self.secret_store = None + self.stdout = stdout if keep_client is not None: self.keep_client = keep_client @@ -194,11 +197,11 @@ The 'jobs' API is no longer supported. handler = RuntimeStatusLoggingHandler(self.runtime_status_update) root_logger.addHandler(handler) - self.runtimeContext = ArvRuntimeContext(vars(arvargs)) - self.runtimeContext.make_fs_access = partial(CollectionFsAccess, + self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs)) + self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess, collection_cache=self.collection_cache) - validate_cluster_target(self, self.runtimeContext) + validate_cluster_target(self, self.toplevel_runtimeContext) def arv_make_tool(self, toolpath_object, loadingContext): @@ -258,46 +261,29 @@ The 'jobs' API is no longer supported. if current is None: return runtime_status = current.get('runtime_status', {}) - # In case of status being an error, only report the first one. - if kind == 'error': - if not runtime_status.get('error'): - runtime_status.update({ - 'error': message - }) - if detail is not None: - runtime_status.update({ - 'errorDetail': detail - }) - # Further errors are only mentioned as a count. - else: - # Get anything before an optional 'and N more' string. - try: - error_msg = re.match( - r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0] - more_failures = re.match( - r'.*\(and (\d+) more\)', runtime_status.get('error')) - except TypeError: - # Ignore tests stubbing errors - return - if more_failures: - failure_qty = int(more_failures.groups()[0]) - runtime_status.update({ - 'error': "%s (and %d more)" % (error_msg, failure_qty+1) - }) - else: - runtime_status.update({ - 'error': "%s (and 1 more)" % error_msg - }) - elif kind in ['warning', 'activity']: - # Record the last warning/activity status without regard of - # previous occurences. + if kind in ('error', 'warning'): + updatemessage = runtime_status.get(kind, "") + if not updatemessage: + updatemessage = message + + # Subsequent messages tacked on in detail + updatedetail = runtime_status.get(kind+'Detail', "") + maxlines = 40 + if updatedetail.count("\n") < maxlines: + if updatedetail: + updatedetail += "\n" + updatedetail += message + "\n" + + if detail: + updatedetail += detail + "\n" + + if updatedetail.count("\n") >= maxlines: + updatedetail += "\nSome messages may have been omitted. Check the full log." + runtime_status.update({ - kind: message + kind: updatemessage, + kind+'Detail': updatedetail, }) - if detail is not None: - runtime_status.update({ - kind+"Detail": detail - }) else: # Ignore any other status kind return @@ -418,7 +404,7 @@ The 'jobs' API is no longer supported. with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)): self.check_features(v, parentfield=parentfield) - def make_output_collection(self, name, storage_classes, tagsString, outputObj): + def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj): outputObj = copy.deepcopy(outputObj) files = [] @@ -448,7 +434,7 @@ The 'jobs' API is no longer supported. srccollection = sp[0][5:] try: reader = self.collection_cache.get(srccollection) - srcpath = "/".join(sp[1:]) if len(sp) > 1 else "." + srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".") final.copy(srcpath, v.target, source_collection=reader, overwrite=False) except arvados.errors.ArgumentError as e: logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e) @@ -470,7 +456,9 @@ The 'jobs' API is no longer supported. res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False)) f.write(res) - final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True) + + final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, + ensure_unique_name=True, properties=output_properties) logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(), final.api_response()["name"], @@ -500,6 +488,7 @@ The 'jobs' API is no longer supported. self.api.containers().update(uuid=current['uuid'], body={ 'output': self.final_output_collection.portable_data_hash(), + 'output_properties': self.final_output_collection.get_properties(), }).execute(num_retries=self.num_retries) self.api.collections().update(uuid=self.final_output_collection.manifest_locator(), body={ @@ -531,7 +520,6 @@ The 'jobs' API is no longer supported. updated_tool.visit(self.check_features) - self.project_uuid = runtimeContext.project_uuid self.pipeline = None self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir) self.secret_store = runtimeContext.secret_store @@ -549,7 +537,9 @@ The 'jobs' API is no longer supported. if runtimeContext.submit_request_uuid and self.work_api != "containers": raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api)) - default_storage_classes = ",".join([k for k,v in self.api.config()["StorageClasses"].items() if v.get("Default") is True]) + runtimeContext = runtimeContext.copy() + + default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True]) if runtimeContext.storage_classes == "default": runtimeContext.storage_classes = default_storage_classes if runtimeContext.intermediate_storage_classes == "default": @@ -558,9 +548,25 @@ The 'jobs' API is no longer supported. if not runtimeContext.name: runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"]) + if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow): + # When creating or updating workflow record, by default + # always copy dependencies and ensure Docker images are up + # to date. + runtimeContext.copy_deps = True + runtimeContext.match_local_docker = True + + if runtimeContext.update_workflow and self.project_uuid is None: + # If we are updating a workflow, make sure anything that + # gets uploaded goes into the same parent project, unless + # an alternate --project-uuid was provided. + existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute() + runtimeContext.project_uuid = existing_wf["owner_uuid"] + + self.project_uuid = runtimeContext.project_uuid + # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % runtimeContext.name, - updated_tool, job_order) + updated_tool, job_order, runtimeContext) # the last clause means: if it is a command line tool, and we # are going to wait for the result, and always_submit_runner @@ -575,8 +581,8 @@ The 'jobs' API is no longer supported. loadingContext = self.loadingContext.copy() loadingContext.do_validate = False - loadingContext.do_update = False if submitting: + loadingContext.do_update = False # Document may have been auto-updated. Reload the original # document with updating disabled because we want to # submit the document with its original CWL version, not @@ -587,7 +593,7 @@ The 'jobs' API is no longer supported. # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. - merged_map = upload_workflow_deps(self, tool) + merged_map = upload_workflow_deps(self, tool, runtimeContext) # Recreate process object (ArvadosWorkflow or # ArvadosCommandTool) because tool document may have been @@ -598,25 +604,25 @@ The 'jobs' API is no longer supported. loadingContext.metadata = tool.metadata tool = load_tool(tool.tool, loadingContext) - existing_uuid = runtimeContext.update_workflow - if existing_uuid or runtimeContext.create_workflow: + if runtimeContext.update_workflow or runtimeContext.create_workflow: # Create a pipeline template or workflow record and exit. if self.work_api == "containers": - return (upload_workflow(self, tool, job_order, - self.project_uuid, - uuid=existing_uuid, - submit_runner_ram=runtimeContext.submit_runner_ram, - name=runtimeContext.name, - merged_map=merged_map, - submit_runner_image=runtimeContext.submit_runner_image), - "success") + uuid = upload_workflow(self, tool, job_order, + runtimeContext.project_uuid, + runtimeContext, + uuid=runtimeContext.update_workflow, + submit_runner_ram=runtimeContext.submit_runner_ram, + name=runtimeContext.name, + merged_map=merged_map, + submit_runner_image=runtimeContext.submit_runner_image) + self.stdout.write(uuid + "\n") + return (None, "success") self.apply_reqs(job_order, tool) self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse self.eval_timeout = runtimeContext.eval_timeout - runtimeContext = runtimeContext.copy() runtimeContext.use_container = True runtimeContext.tmpdir_prefix = "tmp" runtimeContext.work_api = self.work_api @@ -679,7 +685,8 @@ The 'jobs' API is no longer supported. if runtimeContext.submit and not runtimeContext.wait: runnerjob = next(jobiter) runnerjob.run(runtimeContext) - return (runnerjob.uuid, "success") + self.stdout.write(runnerjob.uuid+"\n") + return (None, "success") current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger) if current_container: @@ -784,7 +791,15 @@ The 'jobs' API is no longer supported. else: storage_classes = runtimeContext.storage_classes.strip().split(",") - self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output) + output_properties = {} + output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties") + if output_properties_req: + for pr in output_properties_req["processProperties"]: + output_properties[pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"]) + + self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, + self.output_tags, output_properties, + self.final_output) self.set_crunch_output() if runtimeContext.compute_checksum: