def add_arv_hints():
cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
- res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
- use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
+ res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml')
+ res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml')
+ customschema = res.read()
+ use_custom_schema("v1.0", "http://arvados.org/cwl", customschema)
+ use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema)
res.close()
cwltool.process.supportedProcessRequirements.extend([
"http://arvados.org/cwl#RunInSingleContainer",
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+$base: "http://arvados.org/cwl#"
+$namespaces:
+ cwl: "https://w3id.org/cwl/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+$graph:
+- $import: https://w3id.org/cwl/CommonWorkflowLanguage.yml
+
+- name: cwltool:Secrets
+ type: record
+ inVocab: false
+ extends: cwl:ProcessRequirement
+ fields:
+ class:
+ type: string
+ doc: "Always 'Secrets'"
+ jsonldPredicate:
+ "_id": "@type"
+ "_type": "@vocab"
+ secrets:
+ type: string[]
+ doc: |
+ List one or more input parameters that are sensitive (such as passwords)
+ which will be deliberately obscured from logging.
+ jsonldPredicate:
+ "_type": "@id"
+ refScope: 0
+
+- name: RunInSingleContainer
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Indicates that a subworkflow should run in a single container
+ and not be scheduled as separate steps.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:RunInSingleContainer'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+
+- name: OutputDirType
+ type: enum
+ symbols:
+ - local_output_dir
+ - keep_output_dir
+ doc:
+ - |
+ local_output_dir: Use regular file system local to the compute node.
+ There must be sufficient local scratch space to store entire output;
+ specify this with `outdirMin` of `ResourceRequirement`. Files are
+ batch uploaded to Keep when the process completes. Most compatible, but
+ upload step can be time consuming for very large files.
+ - |
+ keep_output_dir: Use writable Keep mount. Files are streamed to Keep as
+ they are written. Does not consume local scratch space, but does consume
+ RAM for output buffers (up to 192 MiB per file simultaneously open for
+ writing.) Best suited to processes which produce sequential output of
+ large files (non-sequential writes may produced fragmented file
+ manifests). Supports regular files and directories, does not support
+ special files such as symlinks, hard links, named pipes, named sockets,
+ or device nodes.
+
+
+- name: RuntimeConstraints
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Set Arvados-specific runtime hints.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:RuntimeConstraints'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ - name: keep_cache
+ type: int?
+ doc: |
+ Size of file data buffer for Keep mount in MiB. Default is 256
+ MiB. Increase this to reduce cache thrashing in situations such as
+ accessing multiple large (64+ MiB) files at the same time, or
+ performing random access on a large file.
+ - name: outputDirType
+ type: OutputDirType?
+ doc: |
+ Preferred backing store for output staging. If not specified, the
+ system may choose which one to use.
+
+- name: PartitionRequirement
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Select preferred compute partitions on which to run jobs.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:PartitionRequirement'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ - name: partition
+ type:
+ - string
+ - string[]
+
+- name: APIRequirement
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Indicates that process wants to access to the Arvados API. Will be granted
+ limited network access and have ARVADOS_API_HOST and ARVADOS_API_TOKEN set
+ in the environment.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:APIRequirement'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+
+- name: IntermediateOutput
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify desired handling of intermediate output collections.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:IntermediateOutput'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ outputTTL:
+ type: int
+ doc: |
+ If the value is greater than zero, consider intermediate output
+ collections to be temporary and should be automatically
+ trashed. Temporary collections will be trashed `outputTTL` seconds
+ after creation. A value of zero means intermediate output should be
+ retained indefinitely (this is the default behavior).
+
+ Note: arvados-cwl-runner currently does not take workflow dependencies
+ into account when setting the TTL on an intermediate output
+ collection. If the TTL is too short, it is possible for a collection to
+ be trashed before downstream steps that consume it are started. The
+ recommended minimum value for TTL is the expected duration of the
+ entire the workflow.
+
+- name: WorkflowRunnerResources
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify memory or cores resource request for the CWL runner process itself.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:WorkflowRunnerResources'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ ramMin:
+ type: int?
+ doc: Minimum RAM, in mebibytes (2**20)
+ jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/ramMin"
+ coresMin:
+ type: int?
+ doc: Minimum cores allocated to cwl-runner
+ jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
+ keep_cache:
+ type: int?
+ doc: |
+ Size of collection metadata cache for the workflow runner, in
+ MiB. Default 256 MiB. Will be added on to the RAM request
+ when determining node size to request.
+ jsonldPredicate: "http://arvados.org/cwl#RuntimeConstraints/keep_cache"
+
+- name: ClusterTarget
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify where a workflow step should run
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:ClusterTarget'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ cluster_id:
+ type: string?
+ doc: The cluster to run the container
+ project_uuid:
+ type: string?
+ doc: The project that will own the container requests and intermediate collections
try:
if record["state"] == "Complete":
processStatus = "success"
+ # we don't have the real exit code so fake it.
+ record["exit_code"] = 0
else:
processStatus = "permanentFail"
+ record["exit_code"] = 1
outputs = {}
try:
outputs = done.done(self, record, dirs["tmpdir"],
dirs["outdir"], dirs["keep"])
except WorkflowException as e:
- # Only include a stack trace if in debug mode.
- # This is most likely a user workflow error and a stack trace may obfuscate more useful output.
+ # Only include a stack trace if in debug mode.
+ # This is most likely a user workflow error and a stack trace may obfuscate more useful output.
logger.error("%s unable to collect output from %s:\n%s",
self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
self.loadingContext.fetcher_constructor = self.fetcher_constructor
self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
self.loadingContext.construct_tool_object = self.arv_make_tool
- self.loadingContext.do_update = False
# Add a custom logging handler to the root logger for runtime status reporting
# if running inside a container
if not runtimeContext.name:
runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
- # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
- # Also uploads docker images.
- merged_map = upload_workflow_deps(self, tool)
+ submitting = (runtimeContext.update_workflow or
+ runtimeContext.create_workflow or
+ (runtimeContext.submit and not
+ (tool.tool["class"] == "CommandLineTool" and
+ runtimeContext.wait and
+ not runtimeContext.always_submit_runner)))
- # Reload tool object which may have been updated by
- # upload_workflow_deps
- # Don't validate this time because it will just print redundant errors.
loadingContext = self.loadingContext.copy()
- loadingContext.loader = tool.doc_loader
- loadingContext.avsc_names = tool.doc_schema
- loadingContext.metadata = tool.metadata
loadingContext.do_validate = False
+ loadingContext.do_update = False
+ if submitting:
+ # Document may have been auto-updated. Reload the original
+ # document with updating disabled because we want to
+ # submit the original document, not the auto-updated one.
+ tool = load_tool(tool.tool["id"], loadingContext)
+
+ # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
+ # Also uploads docker images.
+ merged_map = upload_workflow_deps(self, tool)
- tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
- loadingContext)
+ # Recreate process object (ArvadosWorkflow or
+ # ArvadosCommandTool) because tool document may have been
+ # updated by upload_workflow_deps in ways that modify
+ # inheritance of hints or requirements.
+ tool = load_tool(tool.tool, loadingContext)
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % runtimeContext.name,
if runtimeContext.submit:
# Submit a runner job to run the workflow for us.
if self.work_api == "containers":
+ loadingContext.loader = tool.doc_loader
+ loadingContext.avsc_names = tool.doc_schema
if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
runtimeContext.runnerjob = tool.tool["id"]
else:
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
logger.info("Pipeline instance %s", self.pipeline["uuid"])
- if not isinstance(tool, Runner):
- loadingContext.do_update = True
- tool = load_tool(tool.doc_loader.idx[tool.tool["id"]],
- loadingContext)
-
if runtimeContext.cwl_runner_job is not None:
self.uuid = runtimeContext.cwl_runner_job.get('uuid')
download_url="https://github.com/curoverse/arvados.git",
license='Apache 2.0',
packages=find_packages(),
- package_data={'arvados_cwl': ['arv-cwl-schema.yml']},
+ package_data={'arvados_cwl': ['arv-cwl-schema-v1.0.yml', 'arv-cwl-schema-v1.1.yml']},
scripts=[
'bin/cwl-runner',
'bin/arvados-cwl-runner',
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20190423203253',
+ 'cwltool==1.0.20190425212529',
'schema-salad==4.1.20190305210046',
'typing >= 3.6.4',
'ruamel.yaml >=0.15.54, <=0.15.77',
self.assertFalse(api.collections().create.called)
self.assertFalse(runner.runtime_status_error.called)
- arvjob.collect_outputs.assert_called_with("keep:abc+123")
+ arvjob.collect_outputs.assert_called_with("keep:abc+123", 0)
arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")