X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fbffa8c4d0fa8bd19fe77b82c16395b80f0bb0ce..b5d0e0ad775eb822b9ab4bed5b57c2e9072f4c0b:/sdk/cwl/arvados_cwl/executor.py diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index aa19633d8c..ed57c6dae0 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -18,6 +18,7 @@ import json import re from functools import partial import time +import urllib from cwltool.errors import WorkflowException import cwltool.workflow @@ -260,46 +261,29 @@ The 'jobs' API is no longer supported. if current is None: return runtime_status = current.get('runtime_status', {}) - # In case of status being an error, only report the first one. - if kind == 'error': - if not runtime_status.get('error'): - runtime_status.update({ - 'error': message - }) - if detail is not None: - runtime_status.update({ - 'errorDetail': detail - }) - # Further errors are only mentioned as a count. - else: - # Get anything before an optional 'and N more' string. - try: - error_msg = re.match( - r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0] - more_failures = re.match( - r'.*\(and (\d+) more\)', runtime_status.get('error')) - except TypeError: - # Ignore tests stubbing errors - return - if more_failures: - failure_qty = int(more_failures.groups()[0]) - runtime_status.update({ - 'error': "%s (and %d more)" % (error_msg, failure_qty+1) - }) - else: - runtime_status.update({ - 'error': "%s (and 1 more)" % error_msg - }) - elif kind in ['warning', 'activity']: - # Record the last warning/activity status without regard of - # previous occurences. + if kind in ('error', 'warning'): + updatemessage = runtime_status.get(kind, "") + if not updatemessage: + updatemessage = message + + # Subsequent messages tacked on in detail + updatedetail = runtime_status.get(kind+'Detail', "") + maxlines = 40 + if updatedetail.count("\n") < maxlines: + if updatedetail: + updatedetail += "\n" + updatedetail += message + "\n" + + if detail: + updatedetail += detail + "\n" + + if updatedetail.count("\n") >= maxlines: + updatedetail += "\nSome messages may have been omitted. Check the full log." + runtime_status.update({ - kind: message + kind: updatemessage, + kind+'Detail': updatedetail, }) - if detail is not None: - runtime_status.update({ - kind+"Detail": detail - }) else: # Ignore any other status kind return @@ -450,7 +434,7 @@ The 'jobs' API is no longer supported. srccollection = sp[0][5:] try: reader = self.collection_cache.get(srccollection) - srcpath = "/".join(sp[1:]) if len(sp) > 1 else "." + srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".") final.copy(srcpath, v.target, source_collection=reader, overwrite=False) except arvados.errors.ArgumentError as e: logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e) @@ -560,6 +544,16 @@ The 'jobs' API is no longer supported. if not runtimeContext.name: runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"]) + if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow): + runtimeContext.copy_deps = True + + if runtimeContext.update_workflow and self.project_uuid is None: + # If we are updating a workflow, make sure anything that + # gets uploaded goes into the same parent project, unless + # an alternate --project-uuid was provided. + existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute() + self.project_uuid = existing_wf["owner_uuid"] + # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % runtimeContext.name, updated_tool, job_order) @@ -577,8 +571,8 @@ The 'jobs' API is no longer supported. loadingContext = self.loadingContext.copy() loadingContext.do_validate = False - loadingContext.do_update = False if submitting: + loadingContext.do_update = False # Document may have been auto-updated. Reload the original # document with updating disabled because we want to # submit the document with its original CWL version, not @@ -600,13 +594,12 @@ The 'jobs' API is no longer supported. loadingContext.metadata = tool.metadata tool = load_tool(tool.tool, loadingContext) - existing_uuid = runtimeContext.update_workflow - if existing_uuid or runtimeContext.create_workflow: + if runtimeContext.update_workflow or runtimeContext.create_workflow: # Create a pipeline template or workflow record and exit. if self.work_api == "containers": uuid = upload_workflow(self, tool, job_order, self.project_uuid, - uuid=existing_uuid, + uuid=runtimeContext.update_workflow, submit_runner_ram=runtimeContext.submit_runner_ram, name=runtimeContext.name, merged_map=merged_map,