Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>
cd "$WORKSPACE"
py=python
+pipcmd=pip
if [[ -n "$PYCMD" ]] ; then
- py="$PYCMD" ;
+ py="$PYCMD"
+ if [[ $py = python3 ]] ; then
+ pipcmd=pip3
+ fi
fi
(cd sdk/python && python setup.py sdist)
cwl_runner_version=$(cd sdk/python && nohash_version_from_git 1.0)
fi
-docker build --build-arg sdk=$sdk --build-arg runner=$runner --build-arg salad=$salad --build-arg cwltool=$cwltool --build-arg pythoncmd=$py -f "$WORKSPACE/sdk/dev-jobs.dockerfile" -t arvados/jobs:$cwl_runner_version "$WORKSPACE/sdk"
+docker build --build-arg sdk=$sdk --build-arg runner=$runner --build-arg salad=$salad --build-arg cwltool=$cwltool --build-arg pythoncmd=$py --build-arg pipcmd=$pipcmd -f "$WORKSPACE/sdk/dev-jobs.dockerfile" -t arvados/jobs:$cwl_runner_version "$WORKSPACE/sdk"
echo arv-keepdocker arvados/jobs $cwl_runner_version
arv-keepdocker arvados/jobs $cwl_runner_version
TODO: extract this information based on git commit messages and generate changelogs / release notes automatically.
{% endcomment %}
-h3. v1.4.0 (2019-05-31)
+h3. v1.4.0 (2019-06-05)
h4. Populating the new file_count and file_size_total columns on the collections table
Log (undef, "collate");
my ($child_out, $child_in);
- my $pid = open2($child_out, $child_in, 'python', '-c', q{
+ # This depends on the python-arvados-python-client package, which needs to be installed
+ # on the machine running crunch-dispatch (typically, the API server).
+ my $pid = open2($child_out, $child_in, '/usr/share/python2.7/dist/python-arvados-python-client/bin/python', '-c', q{
import arvados
import sys
print (arvados.api("v1").collections().
def add_arv_hints():
cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
- res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
- use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
- res.close()
+ res10 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml')
+ res11 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml')
+ customschema10 = res10.read()
+ customschema11 = res11.read()
+ use_custom_schema("v1.0", "http://arvados.org/cwl", customschema10)
+ use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema11)
+ use_custom_schema("v1.1", "http://arvados.org/cwl", customschema11)
+ res10.close()
+ res11.close()
cwltool.process.supportedProcessRequirements.extend([
"http://arvados.org/cwl#RunInSingleContainer",
"http://arvados.org/cwl#OutputDirType",
else:
arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
+ if stdout is sys.stdout:
+ # cwltool.main has code to work around encoding issues with
+ # sys.stdout and unix pipes (they default to ASCII encoding,
+ # we want utf-8), so when stdout is sys.stdout set it to None
+ # to take advantage of that. Don't override it for all cases
+ # since we still want to be able to capture stdout for the
+ # unit tests.
+ stdout = None
+
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+$base: "http://arvados.org/cwl#"
+$namespaces:
+ cwl: "https://w3id.org/cwl/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+$graph:
+- $import: https://w3id.org/cwl/CommonWorkflowLanguage.yml
+
+- name: cwltool:Secrets
+ type: record
+ inVocab: false
+ extends: cwl:ProcessRequirement
+ fields:
+ class:
+ type: string
+ doc: "Always 'Secrets'"
+ jsonldPredicate:
+ "_id": "@type"
+ "_type": "@vocab"
+ secrets:
+ type: string[]
+ doc: |
+ List one or more input parameters that are sensitive (such as passwords)
+ which will be deliberately obscured from logging.
+ jsonldPredicate:
+ "_type": "@id"
+ refScope: 0
+
+- name: RunInSingleContainer
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Indicates that a subworkflow should run in a single container
+ and not be scheduled as separate steps.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:RunInSingleContainer'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+
+- name: OutputDirType
+ type: enum
+ symbols:
+ - local_output_dir
+ - keep_output_dir
+ doc:
+ - |
+ local_output_dir: Use regular file system local to the compute node.
+ There must be sufficient local scratch space to store entire output;
+ specify this with `outdirMin` of `ResourceRequirement`. Files are
+ batch uploaded to Keep when the process completes. Most compatible, but
+ upload step can be time consuming for very large files.
+ - |
+ keep_output_dir: Use writable Keep mount. Files are streamed to Keep as
+ they are written. Does not consume local scratch space, but does consume
+ RAM for output buffers (up to 192 MiB per file simultaneously open for
+ writing.) Best suited to processes which produce sequential output of
+ large files (non-sequential writes may produced fragmented file
+ manifests). Supports regular files and directories, does not support
+ special files such as symlinks, hard links, named pipes, named sockets,
+ or device nodes.
+
+
+- name: RuntimeConstraints
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Set Arvados-specific runtime hints.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:RuntimeConstraints'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ - name: keep_cache
+ type: int?
+ doc: |
+ Size of file data buffer for Keep mount in MiB. Default is 256
+ MiB. Increase this to reduce cache thrashing in situations such as
+ accessing multiple large (64+ MiB) files at the same time, or
+ performing random access on a large file.
+ - name: outputDirType
+ type: OutputDirType?
+ doc: |
+ Preferred backing store for output staging. If not specified, the
+ system may choose which one to use.
+
+- name: PartitionRequirement
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Select preferred compute partitions on which to run jobs.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:PartitionRequirement'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ - name: partition
+ type:
+ - string
+ - string[]
+
+- name: APIRequirement
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Indicates that process wants to access to the Arvados API. Will be granted
+ limited network access and have ARVADOS_API_HOST and ARVADOS_API_TOKEN set
+ in the environment.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:APIRequirement'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+
+- name: IntermediateOutput
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify desired handling of intermediate output collections.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:IntermediateOutput'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ outputTTL:
+ type: int
+ doc: |
+ If the value is greater than zero, consider intermediate output
+ collections to be temporary and should be automatically
+ trashed. Temporary collections will be trashed `outputTTL` seconds
+ after creation. A value of zero means intermediate output should be
+ retained indefinitely (this is the default behavior).
+
+ Note: arvados-cwl-runner currently does not take workflow dependencies
+ into account when setting the TTL on an intermediate output
+ collection. If the TTL is too short, it is possible for a collection to
+ be trashed before downstream steps that consume it are started. The
+ recommended minimum value for TTL is the expected duration of the
+ entire the workflow.
+
+- name: WorkflowRunnerResources
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify memory or cores resource request for the CWL runner process itself.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:WorkflowRunnerResources'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ ramMin:
+ type: int?
+ doc: Minimum RAM, in mebibytes (2**20)
+ jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/ramMin"
+ coresMin:
+ type: int?
+ doc: Minimum cores allocated to cwl-runner
+ jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
+ keep_cache:
+ type: int?
+ doc: |
+ Size of collection metadata cache for the workflow runner, in
+ MiB. Default 256 MiB. Will be added on to the RAM request
+ when determining node size to request.
+ jsonldPredicate: "http://arvados.org/cwl#RuntimeConstraints/keep_cache"
+
+- name: ClusterTarget
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify where a workflow step should run
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:ClusterTarget'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ cluster_id:
+ type: string?
+ doc: The cluster to run the container
+ project_uuid:
+ type: string?
+ doc: The project that will own the container requests and intermediate collections
runtimeContext.pull_image,
runtimeContext.project_uuid)
+ network_req, _ = self.get_requirement("NetworkAccess")
+ if network_req:
+ runtime_constraints["API"] = network_req["networkAccess"]
+
api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
if api_req:
runtime_constraints["API"] = True
if self.output_ttl < 0:
raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
- if self.timelimit is not None:
+ if self.timelimit is not None and self.timelimit > 0:
scheduling_parameters["max_run_time"] = self.timelimit
extra_submit_params = {}
enable_reuse = runtimeContext.enable_reuse
if enable_reuse:
+ reuse_req, _ = self.get_requirement("WorkReuse")
+ if reuse_req:
+ enable_reuse = reuse_req["enableReuse"]
reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
if self.arvrunner.project_uuid:
command.append("--project-uuid="+self.arvrunner.project_uuid)
+ if self.enable_dev:
+ command.append("--enable-dev")
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
try:
if record["state"] == "Complete":
processStatus = "success"
+ # we don't have the real exit code so fake it.
+ record["exit_code"] = 0
else:
processStatus = "permanentFail"
+ record["exit_code"] = 1
outputs = {}
try:
outputs = done.done(self, record, dirs["tmpdir"],
dirs["outdir"], dirs["keep"])
except WorkflowException as e:
- # Only include a stack trace if in debug mode.
- # This is most likely a user workflow error and a stack trace may obfuscate more useful output.
+ # Only include a stack trace if in debug mode.
+ # This is most likely a user workflow error and a stack trace may obfuscate more useful output.
logger.error("%s unable to collect output from %s:\n%s",
self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
# SPDX-License-Identifier: Apache-2.0
from cwltool.command_line_tool import CommandLineTool, ExpressionTool
-from cwltool.builder import Builder
from .arvjob import ArvadosJob
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
+from .runner import make_builder
from functools import partial
from schema_salad.sourceline import SourceLine
from cwltool.errors import WorkflowException
return runtimeContext
-def make_builder(joborder, hints, requirements, runtimeContext):
- return Builder(
- job=joborder,
- files=[], # type: List[Dict[Text, Text]]
- bindings=[], # type: List[Dict[Text, Any]]
- schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
- names=None, # type: Names
- requirements=requirements, # type: List[Dict[Text, Any]]
- hints=hints, # type: List[Dict[Text, Any]]
- resources={}, # type: Dict[str, int]
- mutation_manager=None, # type: Optional[MutationManager]
- formatgraph=None, # type: Optional[Graph]
- make_fs_access=None, # type: Type[StdFsAccess]
- fs_access=None, # type: StdFsAccess
- job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
- timeout=runtimeContext.eval_timeout, # type: float
- debug=runtimeContext.debug, # type: bool
- js_console=runtimeContext.js_console, # type: bool
- force_docker_pull=runtimeContext.force_docker_pull, # type: bool
- loadListing="", # type: Text
- outdir="", # type: Text
- tmpdir="", # type: Text
- stagedir="", # type: Text
- )
class ArvadosCommandTool(CommandLineTool):
"""Wrap cwltool CommandLineTool to override selected methods."""
import ruamel.yaml as yaml
from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
- trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
+ trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
+ make_builder)
from .pathmapper import ArvPathMapper, trim_listing
-from .arvtool import ArvadosCommandTool, set_cluster_target, make_builder
+from .arvtool import ArvadosCommandTool, set_cluster_target
+
from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
- discover_secondary_files(self.tool["inputs"], joborder)
+ discover_secondary_files(self.arvrunner.fs_access, builder,
+ self.tool["inputs"], joborder)
with Perf(metrics, "subworkflow upload_deps"):
upload_dependencies(self.arvrunner,
adjustDirObjs(packed, keepmount)
self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+ self.loadingContext = self.loadingContext.copy()
+ self.loadingContext.metadata = self.loadingContext.metadata.copy()
+ self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
+
wf_runner = cmap({
"class": "CommandLineTool",
"baseCommand": "cwltool",
def done_outputs(self, record, tmpdir, outdir, keepdir):
self.builder.outdir = outdir
self.builder.pathmapper.keepdir = keepdir
- return self.collect_outputs("keep:" + record["output"])
+ return self.collect_outputs("keep:" + record["output"], record["exit_code"])
crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
from builtins import next
from builtins import object
from builtins import str
-from future.utils import viewvalues
+from future.utils import viewvalues, viewitems
import argparse
import logging
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
from cwltool.command_line_tool import compute_checksums
+from cwltool.load_tool import load_tool
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
self.poll_interval = 12
self.loadingContext = None
self.should_estimate_cache_size = True
+ self.fs_access = None
+ self.secret_store = None
if keep_client is not None:
self.keep_client = keep_client
num_retries=self.num_retries)
self.work_api = None
- expected_api = ["jobs", "containers"]
+ expected_api = ["containers", "jobs"]
for api in expected_api:
try:
methods = self.api._rootDesc.get('resources')[api]['methods']
while keys:
page = keys[:pageSize]
- keys = keys[pageSize:]
try:
proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
except Exception:
"new_attributes": p
}
})
+ keys = keys[pageSize:]
+
finish_poll = time.time()
remain_wait = self.poll_interval - (finish_poll - begin_poll)
except:
except (KeyboardInterrupt, SystemExit):
break
- def check_features(self, obj):
+ def check_features(self, obj, parentfield=""):
if isinstance(obj, dict):
if obj.get("writable") and self.work_api != "containers":
raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
"Option 'dockerOutputDirectory' must be an absolute path.")
if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
- for v in viewvalues(obj):
- self.check_features(v)
+ if obj.get("class") == "InplaceUpdateRequirement":
+ if obj["inplaceUpdate"] and parentfield == "requirements":
+ raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
+ for k,v in viewitems(obj):
+ self.check_features(v, parentfield=k)
elif isinstance(obj, list):
for i,v in enumerate(obj):
with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
- self.check_features(v)
+ self.check_features(v, parentfield=parentfield)
def make_output_collection(self, name, storage_classes, tagsString, outputObj):
outputObj = copy.deepcopy(outputObj)
'progress':1.0
}).execute(num_retries=self.num_retries)
+ def apply_reqs(self, job_order_object, tool):
+ if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
+ if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
+ raise WorkflowException(
+ "`cwl:requirements` in the input object is not part of CWL "
+ "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
+ "can set the cwlVersion to v1.1 or greater and re-run with "
+ "--enable-dev.")
+ job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
+ for req in job_reqs:
+ tool.requirements.append(req)
+
def arv_executor(self, tool, job_order, runtimeContext, logger=None):
self.debug = runtimeContext.debug
if not runtimeContext.name:
runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+ # Upload local file references in the job order.
+ job_order = upload_job_order(self, "%s input" % runtimeContext.name,
+ tool, job_order)
+
+ submitting = (runtimeContext.update_workflow or
+ runtimeContext.create_workflow or
+ (runtimeContext.submit and not
+ (tool.tool["class"] == "CommandLineTool" and
+ runtimeContext.wait and
+ not runtimeContext.always_submit_runner)))
+
+ loadingContext = self.loadingContext.copy()
+ loadingContext.do_validate = False
+ loadingContext.do_update = False
+ if submitting:
+ # Document may have been auto-updated. Reload the original
+ # document with updating disabled because we want to
+ # submit the original document, not the auto-updated one.
+ tool = load_tool(tool.tool["id"], loadingContext)
+
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
merged_map = upload_workflow_deps(self, tool)
- # Reload tool object which may have been updated by
- # upload_workflow_deps
- # Don't validate this time because it will just print redundant errors.
- loadingContext = self.loadingContext.copy()
+ # Recreate process object (ArvadosWorkflow or
+ # ArvadosCommandTool) because tool document may have been
+ # updated by upload_workflow_deps in ways that modify
+ # inheritance of hints or requirements.
loadingContext.loader = tool.doc_loader
loadingContext.avsc_names = tool.doc_schema
loadingContext.metadata = tool.metadata
- loadingContext.do_validate = False
-
- tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
- loadingContext)
-
- # Upload local file references in the job order.
- job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- tool, job_order)
+ tool = load_tool(tool.tool, loadingContext)
existing_uuid = runtimeContext.update_workflow
if existing_uuid or runtimeContext.create_workflow:
merged_map=merged_map),
"success")
+ self.apply_reqs(job_order, tool)
+
self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
self.eval_timeout = runtimeContext.eval_timeout
from future import standard_library
standard_library.install_aliases()
from future.utils import viewvalues, viewitems
+from past.builtins import basestring
import os
import sys
from functools import partial
import logging
import json
+import copy
from collections import namedtuple
from io import StringIO
+from typing import Mapping, Sequence
if os.name == "posix" and sys.version_info[0] < 3:
import subprocess32 as subprocess
from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
+from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+ shortname, Process, fill_in_defaults)
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
from cwltool.builder import substitute
from cwltool.pack import pack
+from cwltool.update import INTERNAL_VERSION
+from cwltool.builder import Builder
import schema_salad.validate as validate
import arvados.collection
from .pathmapper import ArvPathMapper, trim_listing
from ._version import __version__
from . import done
+from . context import ArvRuntimeContext
logger = logging.getLogger('arvados.cwl-runner')
for i in viewvalues(d):
find_defaults(i, op)
-def setSecondary(t, fileobj, discovered):
- if isinstance(fileobj, dict) and fileobj.get("class") == "File":
- if "secondaryFiles" not in fileobj:
- fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
- if discovered is not None:
- discovered[fileobj["location"]] = fileobj["secondaryFiles"]
- elif isinstance(fileobj, list):
- for e in fileobj:
- setSecondary(t, e, discovered)
-
-def discover_secondary_files(inputs, job_order, discovered=None):
- for t in inputs:
- if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
- setSecondary(t, job_order[shortname(t["id"])], discovered)
+def make_builder(joborder, hints, requirements, runtimeContext):
+ return Builder(
+ job=joborder,
+ files=[], # type: List[Dict[Text, Text]]
+ bindings=[], # type: List[Dict[Text, Any]]
+ schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
+ names=None, # type: Names
+ requirements=requirements, # type: List[Dict[Text, Any]]
+ hints=hints, # type: List[Dict[Text, Any]]
+ resources={}, # type: Dict[str, int]
+ mutation_manager=None, # type: Optional[MutationManager]
+ formatgraph=None, # type: Optional[Graph]
+ make_fs_access=None, # type: Type[StdFsAccess]
+ fs_access=None, # type: StdFsAccess
+ job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
+ timeout=runtimeContext.eval_timeout, # type: float
+ debug=runtimeContext.debug, # type: bool
+ js_console=runtimeContext.js_console, # type: bool
+ force_docker_pull=runtimeContext.force_docker_pull, # type: bool
+ loadListing="", # type: Text
+ outdir="", # type: Text
+ tmpdir="", # type: Text
+ stagedir="", # type: Text
+ )
+
+def search_schemadef(name, reqs):
+ for r in reqs:
+ if r["class"] == "SchemaDefRequirement":
+ for sd in r["types"]:
+ if sd["name"] == name:
+ return sd
+ return None
+
+primitive_types_set = frozenset(("null", "boolean", "int", "long",
+ "float", "double", "string", "record",
+ "array", "enum"))
+
+def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
+ if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
+ # union type, collect all possible secondaryFiles
+ for i in inputschema:
+ set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
+ return
+
+ if isinstance(inputschema, basestring):
+ sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
+ if sd:
+ inputschema = sd
+ else:
+ return
+
+ if "secondaryFiles" in inputschema:
+ # set secondaryFiles, may be inherited by compound types.
+ secondaryspec = inputschema["secondaryFiles"]
+
+ if (isinstance(inputschema["type"], (Mapping, Sequence)) and
+ not isinstance(inputschema["type"], basestring)):
+ # compound type (union, array, record)
+ set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
+
+ elif (inputschema["type"] == "record" and
+ isinstance(primary, Mapping)):
+ #
+ # record type, find secondary files associated with fields.
+ #
+ for f in inputschema["fields"]:
+ p = primary.get(shortname(f["name"]))
+ if p:
+ set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
+
+ elif (inputschema["type"] == "array" and
+ isinstance(primary, Sequence)):
+ #
+ # array type, find secondary files of elements
+ #
+ for p in primary:
+ set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
+
+ elif (inputschema["type"] == "File" and
+ secondaryspec and
+ isinstance(primary, Mapping) and
+ primary.get("class") == "File" and
+ "secondaryFiles" not in primary):
+ #
+ # Found a file, check for secondaryFiles
+ #
+ primary["secondaryFiles"] = []
+ for i, sf in enumerate(aslist(secondaryspec)):
+ pattern = builder.do_eval(sf["pattern"], context=primary)
+ if pattern is None:
+ continue
+ sfpath = substitute(primary["location"], pattern)
+ required = builder.do_eval(sf["required"], context=primary)
+
+ if fsaccess.exists(sfpath):
+ primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
+ elif required:
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % sfpath)
+
+ primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
+ if discovered is not None:
+ discovered[primary["location"]] = primary["secondaryFiles"]
+ elif inputschema["type"] not in primitive_types_set:
+ set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
+
+def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
+ for inputschema in inputs:
+ primary = job_order.get(shortname(inputschema["id"]))
+ if isinstance(primary, (Mapping, Sequence)):
+ set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
loadref_fields = set(("$import",))
scanobj = workflowobj
- if "id" in workflowobj:
+ if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
# Need raw file content (before preprocessing) to ensure
# that external references in $include and $mixin are captured.
scanobj = loadref("", workflowobj["id"])
discovered = {}
def discover_default_secondary_files(obj):
- discover_secondary_files(obj["inputs"],
- {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
+ builder_job_order = {}
+ for t in obj["inputs"]:
+ builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
+ # Need to create a builder object to evaluate expressions.
+ builder = make_builder(builder_job_order,
+ obj.get("hints", []),
+ obj.get("requirements", []),
+ ArvRuntimeContext())
+ discover_secondary_files(arvrunner.fs_access,
+ builder,
+ obj["inputs"],
+ builder_job_order,
discovered)
visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
object with 'location' updated to the proper keep references.
"""
- discover_secondary_files(tool.tool["inputs"], job_order)
+ # Make a copy of the job order and set defaults.
+ builder_job_order = copy.copy(job_order)
+
+ # fill_in_defaults throws an error if there are any
+ # missing required parameters, we don't want it to do that
+ # so make them all optional.
+ inputs_copy = copy.deepcopy(tool.tool["inputs"])
+ for i in inputs_copy:
+ if "null" not in i["type"]:
+ i["type"] = ["null"] + aslist(i["type"])
+
+ fill_in_defaults(inputs_copy,
+ builder_job_order,
+ arvrunner.fs_access)
+ # Need to create a builder object to evaluate expressions.
+ builder = make_builder(builder_job_order,
+ tool.hints,
+ tool.requirements,
+ ArvRuntimeContext())
+ # Now update job_order with secondaryFiles
+ discover_secondary_files(arvrunner.fs_access,
+ builder,
+ tool.tool["inputs"],
+ job_order)
jobmapper = upload_dependencies(arvrunner,
name,
collection_cache_size=256,
collection_cache_is_default=True):
+ loadingContext = loadingContext.copy()
+ loadingContext.metadata = loadingContext.metadata.copy()
+ loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
+
super(Runner, self).__init__(tool.tool, loadingContext)
self.arvrunner = runner
self.intermediate_output_ttl = intermediate_output_ttl
self.priority = priority
self.secret_store = secret_store
+ self.enable_dev = loadingContext.enable_dev
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
download_url="https://github.com/curoverse/arvados.git",
license='Apache 2.0',
packages=find_packages(),
- package_data={'arvados_cwl': ['arv-cwl-schema.yml']},
+ package_data={'arvados_cwl': ['arv-cwl-schema-v1.0.yml', 'arv-cwl-schema-v1.1.yml']},
scripts=[
'bin/cwl-runner',
'bin/arvados-cwl-runner',
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20181217162649',
- 'schema-salad==3.0.20181129082112',
+ 'cwltool==1.0.20190603140227',
+ 'schema-salad==4.2.20190417121603',
'typing >= 3.6.4',
'ruamel.yaml >=0.15.54, <=0.15.77',
'arvados-python-client>=1.3.0.20190205182514',
shift ; shift
;;
-h|--help)
- echo "$0 [--no-reset-container] [--leave-running] [--config dev|localdemo] [--tag docker_tag] [--build] [--pythoncmd python(2|3)] [--suite (integration|conformance)]"
+ echo "$0 [--no-reset-container] [--leave-running] [--config dev|localdemo] [--tag docker_tag] [--build] [--pythoncmd python(2|3)] [--suite (integration|conformance-v1.0|conformance-v1.1)]"
exit
;;
*)
export ARVBOX_CONTAINER=cwltest
fi
+if test "$suite" = "conformance" ; then
+ suite=conformance-v1.0
+fi
+
if test $reset_container = 1 ; then
arvbox stop
docker rm $ARVBOX_CONTAINER
mkdir -p /tmp/cwltest
cd /tmp/cwltest
-if ! test -d common-workflow-language ; then
- git clone https://github.com/common-workflow-language/common-workflow-language.git
+
+if [[ "$suite" = "conformance-v1.0" ]] ; then
+ if ! test -d common-workflow-language ; then
+ git clone https://github.com/common-workflow-language/common-workflow-language.git
+ fi
+ cd common-workflow-language
+elif [[ "$suite" = "conformance-v1.1" ]] ; then
+ if ! test -d cwl-v1.1 ; then
+ git clone https://github.com/common-workflow-language/cwl-v1.1.git
+ fi
+ cd cwl-v1.1
+fi
+
+if [[ "$suite" != "integration" ]] ; then
+ git pull
fi
-cd common-workflow-language
-git pull
+
export ARVADOS_API_HOST=localhost:8000
export ARVADOS_API_HOST_INSECURE=1
export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
-
if test -n "$build" ; then
- /usr/src/arvados/build/build-dev-docker-jobs-image.sh
+ /usr/src/arvados/build/build-dev-docker-jobs-image.sh
elif test "$tag" = "latest" ; then
arv-keepdocker --pull arvados/jobs $tag
else
chmod +x /tmp/cwltest/arv-cwl-containers
env
-if [[ "$suite" = "conformance" ]] ; then
- exec ./run_test.sh RUNNER=/tmp/cwltest/arv-cwl-${runapi} EXTRA=--compute-checksum $@
-elif [[ "$suite" = "integration" ]] ; then
+if [[ "$suite" = "integration" ]] ; then
cd /usr/src/arvados/sdk/cwl/tests
exec ./arvados-tests.sh $@
+else
+ exec ./run_test.sh RUNNER=/tmp/cwltest/arv-cwl-${runapi} EXTRA=--compute-checksum $@
fi
EOF
from schema_salad.ref_resolver import Loader
from schema_salad.sourceline import cmap
-from .matcher import JsonDiffMatcher
+from .matcher import JsonDiffMatcher, StripYAMLComments
from .mock_discovery import get_rootDesc
if not os.getenv('ARVADOS_DEBUG'):
class TestContainer(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"basedir": "",
"make_fs_access": make_fs_access,
"loader": Loader({}),
- "metadata": {"cwlVersion": "v1.0"}})
+ "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"}})
runtimeContext = arvados_cwl.context.ArvRuntimeContext(
{"work_api": "containers",
"basedir": "",
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
tool = cmap({
"inputs": [],
self.assertFalse(api.collections().create.called)
self.assertFalse(runner.runtime_status_error.called)
- arvjob.collect_outputs.assert_called_with("keep:abc+123")
+ arvjob.collect_outputs.assert_called_with("keep:abc+123", 0)
arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
"portable_data_hash": "99999999999999999999999999999994+99",
"manifest_text": ". 99999999999999999999999999999994+99 0:0:file1 0:0:file2"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
tool = cmap({
"inputs": [
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
tool = cmap({"arguments": ["md5sum", "example.conf"],
"class": "CommandLineTool",
"class": "CommandLineTool",
"hints": [
{
- "class": "http://commonwl.org/cwltool#TimeLimit",
+ "class": "ToolTimeLimit",
"timelimit": 42
}
]
_, kwargs = runner.api.container_requests().create.call_args
self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))
+
+
+class TestWorkflow(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+
+ document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
+ document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
+ document_loader.fetch_text = document_loader.fetcher.fetch_text
+ document_loader.check_exists = document_loader.fetcher.check_exists
+
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": document_loader,
+ "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
+ "construct_tool_object": runner.arv_make_tool})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "containers",
+ "basedir": "",
+ "name": "test_run_wf_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "tmpdir": "/tmp",
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
+ # The test passes no builder.resources
+ # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("arvados.collection.Collection")
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
+ def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
+ arv_docker_clear_cache()
+ arvados_cwl.add_arv_hints()
+
+ api = mock.MagicMock()
+ api._rootDesc = get_rootDesc()
+
+ runner = arvados_cwl.executor.ArvCwlExecutor(api)
+ self.assertEqual(runner.work_api, 'containers')
+
+ list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
+ runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
+ runner.api.collections().list().execute.return_value = {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "portable_data_hash": "99999999999999999999999999999993+99"}]}
+
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+ runner.num_retries = 0
+ runner.secret_store = cwltool.secrets.SecretStore()
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
+ metadata["cwlVersion"] = tool["cwlVersion"]
+
+ mockc = mock.MagicMock()
+ mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
+ mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
+
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
+ arvtool.formatgraph = None
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+
+ next(it).run(runtimeContext)
+ next(it).run(runtimeContext)
+
+ with open("tests/wf/scatter2_subwf.cwl") as f:
+ subwf = StripYAMLComments(f.read()).rstrip()
+
+ runner.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher({
+ "command": [
+ "cwltool",
+ "--no-container",
+ "--move-outputs",
+ "--preserve-entire-environment",
+ "workflow.cwl#main",
+ "cwl.input.yml"
+ ],
+ "container_image": "99999999999999999999999999999993+99",
+ "cwd": "/var/spool/cwl",
+ "environment": {
+ "HOME": "/var/spool/cwl",
+ "TMPDIR": "/tmp"
+ },
+ "mounts": {
+ "/keep/99999999999999999999999999999999+118": {
+ "kind": "collection",
+ "portable_data_hash": "99999999999999999999999999999999+118"
+ },
+ "/tmp": {
+ "capacity": 1073741824,
+ "kind": "tmp"
+ },
+ "/var/spool/cwl": {
+ "capacity": 1073741824,
+ "kind": "tmp"
+ },
+ "/var/spool/cwl/cwl.input.yml": {
+ "kind": "collection",
+ "path": "cwl.input.yml",
+ "portable_data_hash": "99999999999999999999999999999996+99"
+ },
+ "/var/spool/cwl/workflow.cwl": {
+ "kind": "collection",
+ "path": "workflow.cwl",
+ "portable_data_hash": "99999999999999999999999999999996+99"
+ },
+ "stdout": {
+ "kind": "file",
+ "path": "/var/spool/cwl/cwl.output.json"
+ }
+ },
+ "name": "scatterstep",
+ "output_name": "Output for step scatterstep",
+ "output_path": "/var/spool/cwl",
+ "output_ttl": 0,
+ "priority": 500,
+ "properties": {},
+ "runtime_constraints": {
+ "ram": 1073741824,
+ "vcpus": 1
+ },
+ "scheduling_parameters": {},
+ "secret_mounts": {},
+ "state": "Committed",
+ "use_existing": True
+ }))
+ mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
+ mockc.open().__enter__().write.assert_has_calls([mock.call(
+'''{
+ "fileblub": {
+ "basename": "token.txt",
+ "class": "File",
+ "location": "/keep/99999999999999999999999999999999+118/token.txt",
+ "size": 0
+ },
+ "sleeptime": 5
+}''')])
+
+ # The test passes no builder.resources
+ # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("arvados.collection.Collection")
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
+ def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
+ arv_docker_clear_cache()
+ arvados_cwl.add_arv_hints()
+
+ api = mock.MagicMock()
+ api._rootDesc = get_rootDesc()
+
+ runner = arvados_cwl.executor.ArvCwlExecutor(api)
+ self.assertEqual(runner.work_api, 'containers')
+
+ list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
+ runner.api.collections().get().execute.return_value = {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+ runner.api.collections().list().execute.return_value = {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "portable_data_hash": "99999999999999999999999999999993+99"}]}
+
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+ runner.num_retries = 0
+ runner.secret_store = cwltool.secrets.SecretStore()
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+ loadingContext.do_update = True
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
+
+ mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
+
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
+ arvtool.formatgraph = None
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+
+ next(it).run(runtimeContext)
+ next(it).run(runtimeContext)
+
+ with open("tests/wf/echo-subwf.cwl") as f:
+ subwf = StripYAMLComments(f.read())
+
+ runner.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher({
+ 'output_ttl': 0,
+ 'environment': {'HOME': '/var/spool/cwl', 'TMPDIR': '/tmp'},
+ 'scheduling_parameters': {},
+ 'name': u'echo-subwf',
+ 'secret_mounts': {},
+ 'runtime_constraints': {'API': True, 'vcpus': 3, 'ram': 1073741824},
+ 'properties': {},
+ 'priority': 500,
+ 'mounts': {
+ '/var/spool/cwl/cwl.input.yml': {
+ 'portable_data_hash': '99999999999999999999999999999996+99',
+ 'kind': 'collection',
+ 'path': 'cwl.input.yml'
+ },
+ '/var/spool/cwl/workflow.cwl': {
+ 'portable_data_hash': '99999999999999999999999999999996+99',
+ 'kind': 'collection',
+ 'path': 'workflow.cwl'
+ },
+ 'stdout': {
+ 'path': '/var/spool/cwl/cwl.output.json',
+ 'kind': 'file'
+ },
+ '/tmp': {
+ 'kind': 'tmp',
+ 'capacity': 1073741824
+ }, '/var/spool/cwl': {
+ 'kind': 'tmp',
+ 'capacity': 3221225472
+ }
+ },
+ 'state': 'Committed',
+ 'output_path': '/var/spool/cwl',
+ 'container_image': '99999999999999999999999999999993+99',
+ 'command': [
+ u'cwltool',
+ u'--no-container',
+ u'--move-outputs',
+ u'--preserve-entire-environment',
+ u'workflow.cwl#main',
+ u'cwl.input.yml'
+ ],
+ 'use_existing': True,
+ 'output_name': u'Output for step echo-subwf',
+ 'cwd': '/var/spool/cwl'
+ }))
+
+ def test_default_work_api(self):
+ arvados_cwl.add_arv_hints()
+
+ api = mock.MagicMock()
+ api._rootDesc = copy.deepcopy(get_rootDesc())
+ del api._rootDesc.get('resources')['jobs']['methods']['create']
+ runner = arvados_cwl.executor.ArvCwlExecutor(api)
+ self.assertEqual(runner.work_api, 'containers')
import unittest
import copy
import io
+import argparse
import arvados
import arvados_cwl
from .mock_discovery import get_rootDesc
from .matcher import JsonDiffMatcher, StripYAMLComments
from .test_container import CollectionMock
+from arvados_cwl.arvdocker import arv_docker_clear_cache
if not os.getenv('ARVADOS_DEBUG'):
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
class TestJob(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"basedir": "",
"make_fs_access": make_fs_access,
"loader": Loader({}),
- "metadata": {"cwlVersion": "v1.0"},
+ "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
"makeTool": runner.arv_make_tool})
runtimeContext = arvados_cwl.context.ArvRuntimeContext(
{"work_api": "jobs",
@mock.patch('arvados.commands.keepdocker.list_images_in_arv')
def test_run(self, list_images_in_arv):
for enable_reuse in (True, False):
+ arv_docker_clear_cache()
runner = mock.MagicMock()
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
arvados_cwl.add_arv_hints()
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
+ runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
tool = {
"inputs": [],
class TestWorkflow(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"basedir": "",
"make_fs_access": make_fs_access,
"loader": document_loader,
- "metadata": {"cwlVersion": "v1.0"},
+ "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
"construct_tool_object": runner.arv_make_tool})
runtimeContext = arvados_cwl.context.ArvRuntimeContext(
{"work_api": "jobs",
@mock.patch("arvados.collection.Collection")
@mock.patch('arvados.commands.keepdocker.list_images_in_arv')
def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
+ arv_docker_clear_cache()
arvados_cwl.add_arv_hints()
api = mock.MagicMock()
api._rootDesc = get_rootDesc()
- runner = arvados_cwl.executor.ArvCwlExecutor(api)
+ runner = arvados_cwl.executor.ArvCwlExecutor(api, argparse.Namespace(work_api="jobs",
+ output_name=None,
+ output_tags=None,
+ thread_count=1,
+ collection_cache_size=None))
self.assertEqual(runner.work_api, 'jobs')
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
- runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
+ runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
+ runner.api.collections().list().execute.return_value = {"items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "portable_data_hash": "99999999999999999999999999999993+99"}]}
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
loadingContext, runtimeContext = self.helper(runner)
-
+ runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
next(it).run(runtimeContext)
with open("tests/wf/scatter2_subwf.cwl") as f:
- subwf = StripYAMLComments(f.read())
+ subwf = StripYAMLComments(f.read().rstrip())
runner.api.jobs().create.assert_called_with(
body=JsonDiffMatcher({
@mock.patch("arvados.collection.Collection")
@mock.patch('arvados.commands.keepdocker.list_images_in_arv')
def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
+ arv_docker_clear_cache()
arvados_cwl.add_arv_hints()
api = mock.MagicMock()
api._rootDesc = get_rootDesc()
- runner = arvados_cwl.executor.ArvCwlExecutor(api)
+ runner = arvados_cwl.executor.ArvCwlExecutor(api, argparse.Namespace(work_api="jobs",
+ output_name=None,
+ output_tags=None,
+ thread_count=1,
+ collection_cache_size=None))
self.assertEqual(runner.work_api, 'jobs')
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
- runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
+ runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
+ runner.api.collections().list().execute.return_value = {"items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "portable_data_hash": "99999999999999999999999999999993+99"}]}
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
loadingContext, runtimeContext = self.helper(runner)
-
+ loadingContext.do_update = True
+ runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
- metadata["cwlVersion"] = tool["cwlVersion"]
mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
arvtool.formatgraph = None
it = arvtool.job({}, mock.MagicMock(), runtimeContext)
-
+
next(it).run(runtimeContext)
next(it).run(runtimeContext)
stubs.api = mock.MagicMock()
stubs.api._rootDesc = get_rootDesc()
+ stubs.api._rootDesc["uuidPrefix"] = "zzzzz"
stubs.api.users().current().execute.return_value = {
"uuid": stubs.fake_user_uuid,
stubs.api.collections().create.assert_has_calls([
mock.call(body=JsonDiffMatcher({
'manifest_text':
- '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
+ '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+ 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
}), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
- '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
+ 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
}), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
def test_submit_runner_ram(self, stubs, tm):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--submit-runner-ram=2048",
+ "--api=jobs",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--output-name", output_name,
+ "--api=jobs",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
def test_submit_pipeline_name(self, stubs, tm):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--name=hello job 123",
+ "--api=jobs",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--output-tags", output_tags,
+ "--api=jobs",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug",
"--project-uuid", project_uuid,
+ "--api=jobs",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
sys.stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
stubs.api.collections().create.assert_has_calls([
mock.call(body=JsonDiffMatcher({
'manifest_text':
- '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
+ '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+ 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
}), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
- '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
- }), ensure_unique_name=False)])
+ 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+ }), ensure_unique_name=False),
+ ])
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
class TestCreateTemplate(unittest.TestCase):
- existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
+ existing_template_uuid = "zzzzz-p5p6p-validworkfloyml"
def _adjust_script_params(self, expect_component):
expect_component['script_parameters']['x'] = {
@stubs
def test_inputs_empty(self, stubs):
exited = arvados_cwl.main(
- ["--create-template",
+ ["--debug", "--api=jobs", "--create-template",
"tests/wf/inputs_test.cwl", "tests/order/empty_order.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
@stubs
def test_inputs(self, stubs):
exited = arvados_cwl.main(
- ["--create-template",
+ ["--api=jobs", "--create-template",
"tests/wf/inputs_test.cwl", "tests/order/inputs_test_order.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
"$graph": [
{
"class": "Workflow",
- "cwlVersion": "v1.0",
+ "cwlVersion": "v1.1",
"hints": [],
"id": "#main",
"inputs": [
"run": {
"baseCommand": "sleep",
"class": "CommandLineTool",
- "id": "#main/sleep1/subtool",
+ "id": "#main/sleep1/run/subtool",
"inputs": [
{
- "id": "#main/sleep1/subtool/sleeptime",
+ "id": "#main/sleep1/run/subtool/sleeptime",
"inputBinding": {
"position": 1
},
],
"outputs": [
{
- "id": "#main/sleep1/subtool/out",
+ "id": "#main/sleep1/run/subtool/out",
"outputBinding": {
"outputEval": "out"
},
]
}
],
- "cwlVersion": "v1.0"
-}
\ No newline at end of file
+ "cwlVersion": "v1.1"
+}
# (This dockerfile file must be located in the arvados/sdk/ directory because
# of the docker build root.)
-FROM debian:jessie
+FROM debian:9
MAINTAINER Ward Vandewege <ward@curoverse.com>
ENV DEBIAN_FRONTEND noninteractive
ARG pythoncmd=python
+ARG pipcmd=pip
RUN apt-get update -q && apt-get install -qy --no-install-recommends \
git ${pythoncmd}-pip ${pythoncmd}-virtualenv ${pythoncmd}-dev libcurl4-gnutls-dev \
libgnutls28-dev nodejs ${pythoncmd}-pyasn1-modules build-essential
-RUN if [ "$pythoncmd" = "python3" ]; then \
- pip3 install -U setuptools six requests ; \
- else \
- pip install -U setuptools six requests ; \
- fi
+RUN $pipcmd install -U setuptools six requests
ARG sdk
ARG runner
ADD cwl/cwltool_dist/$cwltool /tmp/
ADD cwl/dist/$runner /tmp/
-RUN cd /tmp/arvados-python-client-* && $pythoncmd setup.py install
-RUN if test -d /tmp/schema-salad-* ; then cd /tmp/schema-salad-* && $pythoncmd setup.py install ; fi
-RUN if test -d /tmp/cwltool-* ; then cd /tmp/cwltool-* && $pythoncmd setup.py install ; fi
-RUN cd /tmp/arvados-cwl-runner-* && $pythoncmd setup.py install
+RUN cd /tmp/arvados-python-client-* && $pipcmd install .
+RUN if test -d /tmp/schema-salad-* ; then cd /tmp/schema-salad-* && $pipcmd install . ; fi
+RUN if test -d /tmp/cwltool-* ; then cd /tmp/cwltool-* && $pipcmd install networkx==2.2 && $pipcmd install . ; fi
+RUN cd /tmp/arvados-cwl-runner-* && $pipcmd install .
# Install dependencies and set up system.
RUN /usr/sbin/adduser --disabled-password \
before_action :require_auth_scope, except: ERROR_ACTIONS
before_action :catch_redirect_hint
+ before_action :load_required_parameters
before_action(:find_object_by_uuid,
except: [:index, :create] + ERROR_ACTIONS)
- before_action :load_required_parameters
before_action :load_limit_offset_order_params, only: [:index, :contents]
before_action :load_where_param, only: [:index, :contents]
before_action :load_filters_param, only: [:index, :contents]
protected
+ def bool_param(pname)
+ if params.include?(pname)
+ if params[pname].is_a?(Boolean)
+ return params[pname]
+ else
+ logger.warn "Warning: received non-boolean parameter '#{pname}' on #{self.class.inspect}."
+ end
+ end
+ false
+ end
+
def send_error(*args)
if args.last.is_a? Hash
err = args.pop
def find_objects_for_index
@objects ||= model_class.readable_by(*@read_users, {
- :include_trash => (params[:include_trash] || 'untrash' == action_name),
- :include_old_versions => params[:include_old_versions]
+ :include_trash => (bool_param(:include_trash) || 'untrash' == action_name),
+ :include_old_versions => bool_param(:include_old_versions)
})
apply_where_limit_order_params
end
})
end
+ def self._show_requires_parameters
+ (super rescue {}).
+ merge({
+ include_trash: {
+ type: 'boolean', required: false, description: "Show collection even if its is_trashed attribute is true."
+ },
+ include_old_versions: {
+ type: 'boolean', required: false, description: "Include past collection versions."
+ },
+ })
+ end
+
def create
if resource_attrs[:uuid] and (loc = Keep::Locator.parse(resource_attrs[:uuid]))
resource_attrs[:portable_data_hash] = loc.to_s
accept_attribute_as_json :filters, Array
accept_attribute_as_json :scheduling_parameters, Hash
accept_attribute_as_json :secret_mounts, Hash
+
+ def self._index_requires_parameters
+ (super rescue {}).
+ merge({
+ include_trash: {
+ type: 'boolean', required: false, description: "Include container requests whose owner project is trashed."
+ },
+ })
+ end
+
+ def self._show_requires_parameters
+ (super rescue {}).
+ merge({
+ include_trash: {
+ type: 'boolean', required: false, description: "Show container request even if its owner project is trashed."
+ },
+ })
+ end
end
})
end
+ def self._show_requires_parameters
+ (super rescue {}).
+ merge({
+ include_trash: {
+ type: 'boolean', required: false, description: "Show group/project even if its is_trashed attribute is true."
+ },
+ })
+ end
+
def self._contents_requires_parameters
params = _index_requires_parameters.
merge({
end
end
- test 'get trashed collection with include_trash' do
- uuid = 'zzzzz-4zz18-mto52zx1s7sn3ih' # expired_collection
- authorize_with :active
- get :show, params: {
- id: uuid,
- include_trash: true,
- }
- assert_response 200
+ [true, false].each do |include_trash|
+ test "get trashed collection with include_trash=#{include_trash}" do
+ uuid = 'zzzzz-4zz18-mto52zx1s7sn3ih' # expired_collection
+ authorize_with :active
+ get :show, params: {
+ id: uuid,
+ include_trash: include_trash,
+ }
+ if include_trash
+ assert_response 200
+ else
+ assert_response 404
+ end
+ end
end
[:admin, :active].each do |user|
end
end
+ test "show include_trash=false #{project} #{untrash} as #{auth}" do
+ authorize_with auth
+ untrash.each do |pr|
+ Group.find_by_uuid(groups(pr).uuid).update! is_trashed: false
+ end
+ get :show, params: {
+ id: groups(project).uuid,
+ format: :json,
+ include_trash: false
+ }
+ if visible
+ assert_response :success
+ else
+ assert_response 404
+ end
+ end
+
test "show include_trash #{project} #{untrash} as #{auth}" do
authorize_with auth
untrash.each do |pr|
end
end
+ [
+ ["false", false],
+ ["0", false],
+ ["true", true],
+ ["1", true]
+ ].each do |param, truthiness|
+ test "include_trash=#{param.inspect} param JSON-encoded should be interpreted as include_trash=#{truthiness}" do
+ expired_col = collections(:expired_collection)
+ assert expired_col.is_trashed
+ # Try #index first
+ post "/arvados/v1/collections",
+ params: {
+ :_method => 'GET',
+ :include_trash => param,
+ :filters => [['uuid', '=', expired_col.uuid]].to_json
+ },
+ headers: auth(:active)
+ assert_response :success
+ assert_not_nil json_response['items']
+ assert_equal truthiness, json_response['items'].collect {|c| c['uuid']}.include?(expired_col.uuid)
+ # Try #show next
+ post "/arvados/v1/collections/#{expired_col.uuid}",
+ params: {
+ :_method => 'GET',
+ :include_trash => param,
+ },
+ headers: auth(:active)
+ if truthiness
+ assert_response :success
+ else
+ assert_response 404
+ end
+ end
+ end
+
+ [
+ ["false", false],
+ ["0", false],
+ ["true", true],
+ ["1", true]
+ ].each do |param, truthiness|
+ test "include_trash=#{param.inspect} param encoding via query string should be interpreted as include_trash=#{truthiness}" do
+ expired_col = collections(:expired_collection)
+ assert expired_col.is_trashed
+ # Try #index first
+ get("/arvados/v1/collections?include_trash=#{param}&filters=#{[['uuid','=',expired_col.uuid]].to_json}",
+ headers: auth(:active))
+ assert_response :success
+ assert_not_nil json_response['items']
+ assert_equal truthiness, json_response['items'].collect {|c| c['uuid']}.include?(expired_col.uuid)
+ # Try #show next
+ get("/arvados/v1/collections/#{expired_col.uuid}?include_trash=#{param}",
+ headers: auth(:active))
+ if truthiness
+ assert_response :success
+ else
+ assert_response 404
+ end
+ end
+ end
+
+ [
+ ["false", false],
+ ["0", false],
+ ["true", true],
+ ["1", true]
+ ].each do |param, truthiness|
+ test "include_trash=#{param.inspect} form-encoded param should be interpreted as include_trash=#{truthiness}" do
+ expired_col = collections(:expired_collection)
+ assert expired_col.is_trashed
+ params = [
+ ['_method', 'GET'],
+ ['include_trash', param],
+ ['filters', [['uuid','=',expired_col.uuid]].to_json],
+ ]
+ # Try #index first
+ post "/arvados/v1/collections",
+ params: URI.encode_www_form(params),
+ headers: {
+ "Content-type" => "application/x-www-form-urlencoded"
+ }.update(auth(:active))
+ assert_response :success
+ assert_not_nil json_response['items']
+ assert_equal truthiness, json_response['items'].collect {|c| c['uuid']}.include?(expired_col.uuid)
+ # Try #show next
+ post "/arvados/v1/collections/#{expired_col.uuid}",
+ params: URI.encode_www_form([['_method', 'GET'],['include_trash', param]]),
+ headers: {
+ "Content-type" => "application/x-www-form-urlencoded"
+ }.update(auth(:active))
+ if truthiness
+ assert_response :success
+ else
+ assert_response 404
+ end
+ end
+ end
+
test "search collection using full text search" do
# create collection to be searched for
signed_manifest = Collection.sign_manifest(". 85877ca2d7e05498dd3d109baf2df106+95+A3a4e26a366ee7e4ed3e476ccf05354761be2e4ae@545a9920 0:95:file_in_subdir1\n./subdir2/subdir3 2bbc341c702df4d8f42ec31f16c10120+64+A315d7e7bad2ce937e711fc454fae2d1194d14d64@545a9920 0:32:file1_in_subdir3.txt 32:32:file2_in_subdir3.txt\n./subdir2/subdir3/subdir4 2bbc341c702df4d8f42ec31f16c10120+64+A315d7e7bad2ce937e711fc454fae2d1194d14d64@545a9920 0:32:file3_in_subdir4.txt 32:32:file4_in_subdir4.txt\n", api_token(:active))