*/script/rails
sdk/cwl/tests/input/blorp.txt
sdk/cwl/tests/tool/blub.txt
+sdk/cwl/tests/19109-upload-secondary/*
sdk/cwl/tests/federation/data/*
sdk/cwl/tests/fake-keep-mount/fake_collection_dir/.arvados#collection
sdk/go/manifest/testdata/*_manifest
The compute image "build script":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html now installs Singularity 3.9.9 instead of 3.7.4. The newer version includes a bugfix that should resolve "intermittent loopback device errors":https://dev.arvados.org/issues/18489 when running containers.
+h3. Changes to @arvados-cwl-runner --create-workflow@ and @--update-workflow@
+
+When using @arvados-cwl-runner --create-workflow@ or @--update-workflow@, by default it will now make a copy of all collection and Docker image dependencies in the target project. Running workflows retains the old behavior (use the dependencies wherever they are found). The can be controlled explicit with @--copy-deps@ and @--no-copy-deps@.
+
h2(#v2_4_0). v2.4.0 (2022-04-08)
"previous: Upgrading to 2.3.1":#v2_3_1
The @--match-submitter-images@ option will check the id of the image in the local Docker instance and compare it to the id of the image already in Arvados with the same name and tag. If they are different, it will choose the image matching the local image id, which will be uploaded it if necessary. This helpful for development, if you locally rebuild the image with the 'latest' tag, the @--match-submitter-images@ will ensure that the newer version is used.
+h3(#dependencies). Dependencies
+
+Dependencies include collections and Docker images referenced by the workflow. Dependencies are automatically uploaded by @arvados-cwl-runner@ if they are not present or need to be updated. When running a workflow, dependencies that already exist somewhere on the Arvados instance (from a previous upload) will not be uploaded or copied, regardless of the project they are located in. Sometimes this creates problems when sharing a workflow run with others. In this case, use @--copy-deps@ to indicate that you want all dependencies to be copied into the destination project (specified by @--project-uuid@).
+
h3. Command line options
See "arvados-cwl-runner options":{{site.baseurl}}/user/cwl/cwl-run-options.html
h2(#registering). Registering a workflow to use in Workbench
-Use @--create-workflow@ to register a CWL workflow with Arvados. This enables you to share workflows with other Arvados users, and run them by clicking the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button on the Workbench Dashboard and on the command line by UUID.
+Use @--create-workflow@ to register a CWL workflow with Arvados. Use @--project-uuid@ to upload the workflow to a specific project, along with its dependencies. You can share the workflow with other Arvados users by sharing that project. You can run the workflow by clicking the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button on the Workbench Dashboard, and on the command line by UUID.
<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl</span>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --project-uuid zzzzz-j7d0g-p32bi47ogkjke11 --create-workflow bwa-mem.cwl</span>
arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to zzzzz-4zz18-7e0hedrmkuyoei3
You can provide a partial input file to set default values for the workflow input parameters. You can also use the @--name@ option to set the name of the workflow:
<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --name "My workflow with defaults" --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --name "My workflow with defaults" --project-uuid zzzzz-j7d0g-p32bi47ogkjke11 --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Upload local files: "bwa-mem.cwl"
2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Uploaded to zzzzz-4zz18-0f91qkovk4ml18o
</code></pre>
</notextile>
+Use @--update-workflow <uuid>@ to update an existing workflow.
+
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --update-workflow zzzzz-p5p6p-zuniv58hn8d0qd8 bwa-mem.cwl</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to zzzzz-4zz18-7e0hedrmkuyoei3
+2016-07-01 12:21:01 arvados.cwl-runner[15796] INFO: Created template zzzzz-p5p6p-zuniv58hn8d0qd8
+zzzzz-p5p6p-zuniv58hn8d0qd8
+</code></pre>
+</notextile>
+
+When using @--create-workflow@ or @--update-workflow@, the @--copy-deps@ and @--match-submitter-images@ options are enabled by default.
+
h3. Running registered workflows at the command line
You can run a registered workflow at the command line by its UUID:
exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
+ exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.")
+
parser.add_argument(
"--skip-schemas",
action="store_true",
logger_handler=arvados.log_handler,
custom_schema_callback=add_arv_hints,
loadingContext=executor.loadingContext,
- runtimeContext=executor.runtimeContext,
+ runtimeContext=executor.toplevel_runtimeContext,
input_required=not (arvargs.create_workflow or arvargs.update_workflow))
runtimeContext.project_uuid,
runtimeContext.force_docker_pull,
runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker)
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
network_req, _ = self.get_requirement("NetworkAccess")
if network_req:
"cwd": "/var/spool/cwl",
"priority": self.priority,
"state": "Committed",
- "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
+ "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"kind": "json",
"portable_data_hash": "%s" % workflowcollection
}
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
- if self.on_error:
+ if runtimeContext.on_error:
command.append("--on-error=" + self.on_error)
- if self.intermediate_output_ttl:
- command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
+ if runtimeContext.intermediate_output_ttl:
+ command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
- if self.arvrunner.trash_intermediate:
+ if runtimeContext.trash_intermediate:
command.append("--trash-intermediate")
- if self.arvrunner.project_uuid:
- command.append("--project-uuid="+self.arvrunner.project_uuid)
+ if runtimeContext.project_uuid:
+ command.append("--project-uuid="+runtimeContext.project_uuid)
if self.enable_dev:
command.append("--enable-dev")
def run(self, runtimeContext):
runtimeContext.keepprefix = "keep:"
job_spec = self.arvados_job_spec(runtimeContext)
- if self.arvrunner.project_uuid:
- job_spec["owner_uuid"] = self.arvrunner.project_uuid
+ if runtimeContext.project_uuid:
+ job_spec["owner_uuid"] = runtimeContext.project_uuid
extra_submit_params = {}
if runtimeContext.submit_runner_cluster:
def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid,
- force_pull, tmp_outdir_prefix, match_local_docker):
+ force_pull, tmp_outdir_prefix, match_local_docker, copy_deps):
"""Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement:
image_name = sp[0]
image_tag = sp[1] if len(sp) > 1 else "latest"
- images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ out_of_project_images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
image_name=image_name,
- image_tag=image_tag)
+ image_tag=image_tag,
+ project_uuid=None)
- if images and match_local_docker:
+ if copy_deps:
+ # Only images that are available in the destination project
+ images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ image_name=image_name,
+ image_tag=image_tag,
+ project_uuid=project_uuid)
+ else:
+ images = out_of_project_images
+
+ if match_local_docker:
local_image_id = determine_image_id(dockerRequirement["dockerImageId"])
if local_image_id:
# find it in the list
# force re-upload.
images = []
+ for i in out_of_project_images:
+ if i[1]["dockerhash"] == local_image_id:
+ found = True
+ out_of_project_images = [i]
+ break
+ if not found:
+ # force re-upload.
+ out_of_project_images = []
+
if not images:
- # Fetch Docker image if necessary.
- try:
- result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image,
- force_pull, tmp_outdir_prefix)
- if not result:
- raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"])
- except OSError as e:
- raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e))
+ if not out_of_project_images:
+ # Fetch Docker image if necessary.
+ try:
+ result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image,
+ force_pull, tmp_outdir_prefix)
+ if not result:
+ raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"])
+ except OSError as e:
+ raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e))
# Upload image to Arvados
args = []
images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
image_name=image_name,
- image_tag=image_tag)
+ image_tag=image_tag,
+ project_uuid=project_uuid)
if not images:
raise WorkflowException("Could not find Docker image %s:%s" % (image_name, image_tag))
max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
sum_res_pars = ("outdirMin", "outdirMax")
-def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
+def upload_workflow(arvRunner, tool, job_order, project_uuid,
+ runtimeContext, uuid=None,
submit_runner_ram=0, name=None, merged_map=None,
submit_runner_image=None):
- packed = packed_workflow(arvRunner, tool, merged_map)
+ packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext)
adjustDirObjs(job_order, trim_listing)
adjustFileObjs(job_order, trim_anonymous_location)
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
upload_dependencies(arvRunner, name, tool.doc_loader,
- packed, tool.tool["id"], False)
+ packed, tool.tool["id"], False,
+ runtimeContext)
wf_runner_resources = None
wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
hints.append(wf_runner_resources)
- wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, submit_runner_image or "arvados/jobs:"+__version__)
+ wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+ submit_runner_image or "arvados/jobs:"+__version__,
+ runtimeContext)
if submit_runner_ram:
wf_runner_resources["ramMin"] = submit_runner_ram
self.doc_loader,
joborder,
joborder.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if self.wf_pdh is None:
packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
self.doc_loader,
packed,
self.tool["id"],
- False)
+ False,
+ runtimeContext)
# Discover files/directories referenced by the
# workflow (mainly "default" values)
if self.wf_pdh is None:
adjustFileObjs(packed, keepmount)
adjustDirObjs(packed, keepmount)
- self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+ self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
self.loadingContext = self.loadingContext.copy()
self.loadingContext.metadata = self.loadingContext.metadata.copy()
self.collection_cache_size = 256
self.match_local_docker = False
self.enable_preemptible = None
+ self.copy_deps = None
super(ArvRuntimeContext, self).__init__(kwargs)
handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
root_logger.addHandler(handler)
- self.runtimeContext = ArvRuntimeContext(vars(arvargs))
- self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
+ self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
+ self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=self.collection_cache)
- validate_cluster_target(self, self.runtimeContext)
+ validate_cluster_target(self, self.toplevel_runtimeContext)
def arv_make_tool(self, toolpath_object, loadingContext):
updated_tool.visit(self.check_features)
- self.project_uuid = runtimeContext.project_uuid
self.pipeline = None
self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
self.secret_store = runtimeContext.secret_store
if runtimeContext.submit_request_uuid and self.work_api != "containers":
raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+ runtimeContext = runtimeContext.copy()
+
default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
if runtimeContext.storage_classes == "default":
runtimeContext.storage_classes = default_storage_classes
if not runtimeContext.name:
runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
+ if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
+ # When creating or updating workflow record, by default
+ # always copy dependencies and ensure Docker images are up
+ # to date.
+ runtimeContext.copy_deps = True
+ runtimeContext.match_local_docker = True
+
+ if runtimeContext.update_workflow and self.project_uuid is None:
+ # If we are updating a workflow, make sure anything that
+ # gets uploaded goes into the same parent project, unless
+ # an alternate --project-uuid was provided.
+ existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
+ runtimeContext.project_uuid = existing_wf["owner_uuid"]
+
+ self.project_uuid = runtimeContext.project_uuid
+
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- updated_tool, job_order)
+ updated_tool, job_order, runtimeContext)
# the last clause means: if it is a command line tool, and we
# are going to wait for the result, and always_submit_runner
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- merged_map = upload_workflow_deps(self, tool)
+ merged_map = upload_workflow_deps(self, tool, runtimeContext)
# Recreate process object (ArvadosWorkflow or
# ArvadosCommandTool) because tool document may have been
loadingContext.metadata = tool.metadata
tool = load_tool(tool.tool, loadingContext)
- existing_uuid = runtimeContext.update_workflow
- if existing_uuid or runtimeContext.create_workflow:
+ if runtimeContext.update_workflow or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
if self.work_api == "containers":
uuid = upload_workflow(self, tool, job_order,
- self.project_uuid,
- uuid=existing_uuid,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map,
- submit_runner_image=runtimeContext.submit_runner_image)
+ runtimeContext.project_uuid,
+ runtimeContext,
+ uuid=runtimeContext.update_workflow,
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ merged_map=merged_map,
+ submit_runner_image=runtimeContext.submit_runner_image)
self.stdout.write(uuid + "\n")
return (None, "success")
self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
self.eval_timeout = runtimeContext.eval_timeout
- runtimeContext = runtimeContext.copy()
runtimeContext.use_container = True
runtimeContext.tmpdir_prefix = "tmp"
runtimeContext.work_api = self.work_api
import schema_salad.validate as validate
import arvados.collection
+import arvados.util
from .util import collectionUUID
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq
set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
return
+ if inputschema == "File":
+ inputschema = {"type": "File"}
+
if isinstance(inputschema, basestring):
sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
if sd:
set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
elif (inputschema["type"] == "File" and
- secondaryspec and
isinstance(primary, Mapping) and
- primary.get("class") == "File" and
- "secondaryFiles" not in primary):
+ primary.get("class") == "File"):
+
+ if "secondaryFiles" in primary or not secondaryspec:
+ # Nothing to do.
+ return
+
#
# Found a file, check for secondaryFiles
#
primary["secondaryFiles"] = secondaryspec
for i, sf in enumerate(aslist(secondaryspec)):
if builder.cwlVersion == "v1.0":
- pattern = builder.do_eval(sf, context=primary)
+ pattern = sf
else:
- pattern = builder.do_eval(sf["pattern"], context=primary)
+ pattern = sf["pattern"]
if pattern is None:
continue
if isinstance(pattern, list):
"Expression must return list, object, string or null")
if pattern is not None:
- sfpath = substitute(primary["location"], pattern)
+ if "${" in pattern or "$(" in pattern:
+ sfname = builder.do_eval(pattern, context=primary)
+ else:
+ sfname = substitute(primary["basename"], pattern)
+
+ if sfname is None:
+ continue
+
+ p_location = primary["location"]
+ if "/" in p_location:
+ sfpath = (
+ p_location[0 : p_location.rindex("/") + 1]
+ + sfname
+ )
required = builder.do_eval(required, context=primary)
set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run,
+ workflowobj, uri, loadref_run, runtimeContext,
include_primary=True, discovered_secondaryfiles=None):
"""Upload the dependencies of the workflowobj document to Keep.
single_collection=True,
optional_deps=optional_deps)
+ keeprefs = set()
+ def addkeepref(k):
+ if k.startswith("keep:"):
+ keeprefs.add(collection_pdh_pattern.match(k).group(1))
+
def setloc(p):
loc = p.get("location")
if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
p["location"] = mapper.mapper(p["location"]).resolved
+ addkeepref(p["location"])
return
if not loc:
gp = collection_uuid_pattern.match(loc)
if not gp:
+ # Not a uuid pattern (must be a pdh pattern)
+ addkeepref(p["location"])
return
+
uuid = gp.groups()[0]
if uuid not in uuid_map:
raise SourceLine(p, "location", validate.ValidationException).makeError(
for d in discovered:
discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
+ if runtimeContext.copy_deps:
+ # Find referenced collections and copy them into the
+ # destination project, for easy sharing.
+ already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+ filters=[["portable_data_hash", "in", list(keeprefs)],
+ ["owner_uuid", "=", runtimeContext.project_uuid]],
+ select=["uuid", "portable_data_hash", "created_at"]))
+
+ keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+ for kr in keeprefs:
+ col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+ order="created_at desc",
+ select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+ limit=1).execute()
+ if len(col["items"]) == 0:
+ logger.warning("Cannot find collection with portable data hash %s", kr)
+ continue
+ col = col["items"][0]
+ try:
+ arvrunner.api.collections().create(body={"collection": {
+ "owner_uuid": runtimeContext.project_uuid,
+ "name": col["name"],
+ "description": col["description"],
+ "properties": col["properties"],
+ "portable_data_hash": col["portable_data_hash"],
+ "manifest_text": col["manifest_text"],
+ "storage_classes_desired": col["storage_classes_desired"],
+ "trash_at": col["trash_at"]
+ }}, ensure_unique_name=True).execute()
+ except Exception as e:
+ logger.warning("Unable copy collection to destination: %s", e)
+
if "$schemas" in workflowobj:
sch = CommentedSeq()
for s in workflowobj["$schemas"]:
return mapper
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
"""Uploads Docker images used in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
if docker_req:
if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
- # TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
- True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
+ upload_docker(arvrunner, s.embedded_tool, runtimeContext)
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
if v.get("class") == "DockerRequirement":
v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
- arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
packed["http://schema.org/version"] = githash
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
object with 'location' updated to the proper keep references.
"""
tool.doc_loader,
job_order,
job_order.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if "id" in job_order:
del job_order["id"]
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- upload_docker(arvrunner, tool)
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
deptool,
deptool["id"],
False,
+ runtimeContext,
include_primary=False,
discovered_secondaryfiles=discovered_secondaryfiles)
document_loader.idx[deptool["id"]] = deptool
return merged_map
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
collection = arvados.collection.Collection(api_client=arvrunner.api,
keep_client=arvrunner.keep_client,
num_retries=arvrunner.num_retries)
filters = [["portable_data_hash", "=", collection.portable_data_hash()],
["name", "like", name+"%"]]
- if arvrunner.project_uuid:
- filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+ if runtimeContext.project_uuid:
+ filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
if exists["items"]:
logger.info("Using collection %s", exists["items"][0]["uuid"])
else:
collection.save_new(name=name,
- owner_uuid=arvrunner.project_uuid,
+ owner_uuid=runtimeContext.project_uuid,
ensure_unique_name=True,
num_retries=arvrunner.num_retries)
logger.info("Uploaded to %s", collection.manifest_locator())
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.2
+class: CommandLineTool
+baseCommand: echo
+inputs:
+ message:
+ type: File
+ inputBinding:
+ position: 1
+ default:
+ class: File
+ location: keep:d7514270f356df848477718d58308cc4+94/b
+
+outputs: []
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.2
+class: Workflow
+
+requirements:
+ InlineJavascriptRequirement: {}
+
+inputs:
+ file1:
+ type: File?
+ secondaryFiles:
+ - pattern: .tbi
+ required: true
+ file2:
+ type: File
+ secondaryFiles:
+ - pattern: |
+ ${
+ return self.basename + '.tbi';
+ }
+ required: true
+outputs:
+ out:
+ type: File
+ outputSource: cat/out
+ out2:
+ type: File
+ outputSource: cat2/out
+steps:
+ cat:
+ in:
+ inp: file1
+ run: cat2.cwl
+ out: [out]
+ cat2:
+ in:
+ inp: file2
+ run: cat2.cwl
+ out: [out]
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+file1:
+ class: File
+ location: 19109-upload-secondary/file1.txt
+file2:
+ class: File
+ location: 19109-upload-secondary/file2.txt
--- /dev/null
+strawberry
--- /dev/null
+blueberry
\ No newline at end of file
--- /dev/null
+mango
\ No newline at end of file
arv-put --portable-data-hash samples/sample1_S01_R1_001.fastq.gz
fi
+# Test for #18888
+# This is a standalone test because the bug was observed with this
+# command line and was thought to be due to command line handling.
arvados-cwl-runner 18888-download_def.cwl --scripts scripts/
+# Test for #19070
+# The most effective way to test this seemed to be to write an
+# integration test to check for the expected behavior.
+python test_copy_deps.py
+
+# Run integration tests
exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum --api=containers
output: {}
tool: 18994-basename/wf_ren.cwl
doc: "Test issue 18994 - correctly stage file with modified basename"
+
+- job: 19109-upload-secondary.yml
+ output: {
+ "out": {
+ "basename": "file1.catted",
+ "class": "File",
+ "location": "file1.catted",
+ "size": 20,
+ "checksum": "sha1$c4cead17cebdd829f38c48e18a28f1da72339ef7"
+ },
+ "out2": {
+ "basename": "file2.catted",
+ "checksum": "sha1$6f71c5d1512519ede45bedfdd624e05fd8037b0d",
+ "class": "File",
+ "location": "file2.catted",
+ "size": 12
+ }
+ }
+ tool: 19109-upload-secondary.cwl
+ doc: "Test issue 19109 - correctly discover & upload secondary files"
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.2
+class: CommandLineTool
+inputs:
+ - id: inp
+ type: File
+ secondaryFiles:
+ - pattern: .tbi
+ required: true
+stdout: $(inputs.inp.nameroot).catted
+outputs:
+ out:
+ type: stdout
+arguments: [cat, '$(inputs.inp.path)', '$(inputs.inp.secondaryFiles[0].path)']
+#!/bin/sh
+
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
-#!/bin/bash
-
echo bubble
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import subprocess
+
+api = arvados.api()
+
+def check_contents(group, wf_uuid):
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 3:
+ raise Exception("Expected 3 items in "+group["uuid"]+" was "+len(contents["items"]))
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid:
+ found = True
+ if not found:
+ raise Exception("Couldn't find workflow in "+group["uuid"])
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if not found:
+ raise Exception("Couldn't find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if not found:
+ raise Exception("Couldn't find jobs image dependency")
+
+
+def test_create():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-1", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Create workflow, by default should also copy dependencies
+ cmd = ["arvados-cwl-runner", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+ check_contents(group, wf_uuid)
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+
+def test_update():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-2", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Create workflow, but with --no-copy-deps it shouldn't copy anything
+ cmd = ["arvados-cwl-runner", "--no-copy-deps", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 1:
+ raise Exception("Expected 1 items")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid:
+ found = True
+ if not found:
+ raise Exception("Couldn't find workflow")
+
+ # Updating by default will copy missing items
+ cmd = ["arvados-cwl-runner", "--update-workflow", wf_uuid, "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+ check_contents(group, wf_uuid)
+
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+
+def test_execute():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-3", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Execute workflow, shouldn't copy anything.
+ cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ # container request
+ # final output collection
+ # container log
+ # step output collection
+ # container request log
+ if len(contents["items"]) != 5:
+ raise Exception("Expected 5 items")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if found:
+ raise Exception("Didn't expect to find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if found:
+ raise Exception("Didn't expect to find jobs image dependency")
+
+ # Execute workflow with --copy-deps
+ cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "--copy-deps", "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if not found:
+ raise Exception("Couldn't find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if not found:
+ raise Exception("Couldn't find jobs image dependency")
+
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+if __name__ == '__main__':
+ test_create()
+ test_update()
+ test_execute()
def stubs(func):
@functools.wraps(func)
+ @mock.patch("arvados_cwl.arvdocker.determine_image_id")
@mock.patch("uuid.uuid4")
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@mock.patch("arvados.collection.KeepClient")
@mock.patch("arvados.keep.KeepClient")
@mock.patch("arvados.events.subscribe")
- def wrapped(self, events, keep_client1, keep_client2, keepdocker, uuid4, *args, **kwargs):
+ def wrapped(self, events, keep_client1, keep_client2, keepdocker,
+ uuid4, determine_image_id, *args, **kwargs):
class Stubs(object):
pass
stubs = Stubs()
"df80736f-f14d-4b10-b2e3-03aa27f034b2", "df80736f-f14d-4b10-b2e3-03aa27f034b3",
"df80736f-f14d-4b10-b2e3-03aa27f034b4", "df80736f-f14d-4b10-b2e3-03aa27f034b5"]
+ determine_image_id.return_value = None
+
def putstub(p, **kwargs):
return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
keep_client1().put.side_effect = putstub
"arvados/jobs:123": [("zzzzz-4zz18-zzzzzzzzzzzzzd5", {})],
"arvados/jobs:latest": [("zzzzz-4zz18-zzzzzzzzzzzzzd6", {})],
}
- def kd(a, b, image_name=None, image_tag=None):
+ def kd(a, b, image_name=None, image_tag=None, project_uuid=None):
return stubs.docker_images.get("%s:%s" % (image_name, image_tag), [])
stubs.keepdocker.side_effect = kd
"name": "arvados/jobs:"+arvados_cwl.__version__,
"owner_uuid": "",
"properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+ {"items": [{"created_at": "",
+ "head_uuid": "",
+ "link_class": "docker_image_hash",
+ "name": "123456",
+ "owner_uuid": "",
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+ {"items": [{"created_at": "",
+ "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+ "link_class": "docker_image_repo+tag",
+ "name": "arvados/jobs:"+arvados_cwl.__version__,
+ "owner_uuid": "",
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
{"items": [{"created_at": "",
"head_uuid": "",
"link_class": "docker_image_hash",
"owner_uuid": "",
"manifest_text": "",
"properties": ""
- }], "items_available": 1, "offset": 0},)
+ }], "items_available": 1, "offset": 0},
+ {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+ "owner_uuid": "",
+ "manifest_text": "",
+ "properties": ""
+ }], "items_available": 1, "offset": 0})
arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
arvrunner.api.collections().get().execute.return_value = {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
"portable_data_hash": "9999999999999999999999999999999b+99"}
+
self.assertEqual("9999999999999999999999999999999b+99",
- arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
+ arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__, arvrunner.runtimeContext))
@stubs
@stubs
def test_update(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid}
+
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
"--debug",
"name": "submit_wf.cwl",
"description": "",
"definition": self.expect_workflow,
+ "owner_uuid": project_uuid
}
}
stubs.api.workflows().update.assert_called_with(
@stubs
def test_update_name(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid}
+
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
"--debug", "--name", "testing 123",
"name": "testing 123",
"description": "",
"definition": self.expect_workflow,
+ "owner_uuid": project_uuid
}
}
stubs.api.workflows().update.assert_called_with(
def popen_docker(cmd, *args, **kwargs):
manage_stdin = ('stdin' not in kwargs)
kwargs.setdefault('stdin', subprocess.PIPE)
- kwargs.setdefault('stdout', sys.stderr)
+ kwargs.setdefault('stdout', subprocess.PIPE)
+ kwargs.setdefault('stderr', subprocess.PIPE)
try:
docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
except OSError: # No docker in $PATH, try docker.io
'tag': tag,
}
-def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
+def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None):
"""List all Docker images known to the api_client with image_name and
image_tag. If no image_name is given, defaults to listing all
Docker images.
search_filters = []
repo_links = None
hash_links = None
+
+ project_filter = []
+ if project_uuid is not None:
+ project_filter = [["owner_uuid", "=", project_uuid]]
+
if image_name:
# Find images with the name the user specified.
search_links = _get_docker_links(
api_client, num_retries,
filters=[['link_class', '=', 'docker_image_repo+tag'],
['name', '=',
- '{}:{}'.format(image_name, image_tag or 'latest')]])
+ '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter)
if search_links:
repo_links = search_links
else:
search_links = _get_docker_links(
api_client, num_retries,
filters=[['link_class', '=', 'docker_image_hash'],
- ['name', 'ilike', image_name + '%']])
+ ['name', 'ilike', image_name + '%']]+project_filter)
hash_links = search_links
# Only list information about images that were found in the search.
search_filters.append(['head_uuid', 'in',
if hash_links is None:
hash_links = _get_docker_links(
api_client, num_retries,
- filters=search_filters + [['link_class', '=', 'docker_image_hash']])
+ filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter)
hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
# Each collection may have more than one name (though again, one name
repo_links = _get_docker_links(
api_client, num_retries,
filters=search_filters + [['link_class', '=',
- 'docker_image_repo+tag']])
+ 'docker_image_repo+tag']]+project_filter)
seen_image_names = collections.defaultdict(set)
images = []
for link in repo_links:
# Remove any image listings that refer to unknown collections.
existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
api_client.collections().list, num_retries,
- filters=[['uuid', 'in', [im['collection'] for im in images]]],
+ filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter,
select=['uuid'])}
return [(image['collection'], image) for image in images
if image['collection'] in existing_coll_uuids]
if args.pull and not find_image_hashes(args.image):
pull_image(args.image, args.tag)
+ images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag)
+
+ image_hash = None
try:
image_hash = find_one_image_hash(args.image, args.tag)
+ if not docker_image_compatible(api, image_hash):
+ if args.force_image_format:
+ logger.warning("forcing incompatible image")
+ else:
+ logger.error("refusing to store " \
+ "incompatible format (use --force-image-format to override)")
+ sys.exit(1)
except DockerError as error:
- logger.error(str(error))
- sys.exit(1)
-
- if not docker_image_compatible(api, image_hash):
- if args.force_image_format:
- logger.warning("forcing incompatible image")
+ if images_in_arv:
+ # We don't have Docker / we don't have the image locally,
+ # use image that's already uploaded to Arvados
+ image_hash = images_in_arv[0][1]['dockerhash']
else:
- logger.error("refusing to store " \
- "incompatible format (use --force-image-format to override)")
+ logger.error(str(error))
sys.exit(1)
image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
self.run_arv_keepdocker(['--version'], sys.stderr)
self.assertVersionOutput(out, err)
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv',
+ return_value=[])
@mock.patch('arvados.commands.keepdocker.find_image_hashes',
return_value=['abc123'])
@mock.patch('arvados.commands.keepdocker.find_one_image_hash',
return_value='abc123')
- def test_image_format_compatibility(self, _1, _2):
+ def test_image_format_compatibility(self, _1, _2, _3):
old_id = hashlib.sha256(b'old').hexdigest()
new_id = 'sha256:'+hashlib.sha256(b'new').hexdigest()
for supported, img_id, expect_ok in [
self.run_arv_keepdocker(['[::1]/repo/img'], sys.stderr)
find_image_mock.assert_called_with('[::1]/repo/img', 'latest')
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv',
+ return_value=[])
@mock.patch('arvados.commands.keepdocker.find_image_hashes',
return_value=['abc123'])
@mock.patch('arvados.commands.keepdocker.find_one_image_hash',
return_value='abc123')
- def test_collection_property_update(self, _1, _2):
+ def test_collection_property_update(self, _1, _2, _3):
image_id = 'sha256:'+hashlib.sha256(b'image').hexdigest()
fakeDD = arvados.api('v1')._rootDesc
fakeDD['dockerImageFormats'] = ['v2']