*/script/rails
sdk/cwl/tests/input/blorp.txt
sdk/cwl/tests/tool/blub.txt
+sdk/cwl/tests/19109-upload-secondary/*
sdk/cwl/tests/federation/data/*
sdk/cwl/tests/fake-keep-mount/fake_collection_dir/.arvados#collection
sdk/go/manifest/testdata/*_manifest
return 0
}
-var healthCommand cmd.Handler = service.Command(arvados.ServiceNameHealth, func(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
- return &health.Aggregator{Cluster: cluster}
+var healthCommand cmd.Handler = service.Command(arvados.ServiceNameHealth, func(ctx context.Context, cluster *arvados.Cluster, _ string, reg *prometheus.Registry) service.Handler {
+ mClockSkew := prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "health",
+ Name: "clock_skew_seconds",
+ Help: "Clock skew observed in most recent health check",
+ })
+ reg.MustRegister(mClockSkew)
+ return &health.Aggregator{
+ Cluster: cluster,
+ MetricClockSkew: mClockSkew,
+ }
})
client_max_body_size 128m;
location / {
- proxy_pass http://controller;
- proxy_redirect off;
- proxy_connect_timeout 90s;
- proxy_read_timeout 300s;
+ proxy_pass http://controller;
+ proxy_redirect off;
+ proxy_connect_timeout 90s;
+ proxy_read_timeout 300s;
+ proxy_max_temp_file_size 0;
+ proxy_request_buffering off;
+ proxy_buffering off;
+ proxy_http_version 1.1;
proxy_set_header Host $http_host;
proxy_set_header Upgrade $http_upgrade;
"previous: Upgrading to 2.4.0":#v2_4_0
+h3. New proxy parameters for arvados-controller
+
+We now recommend disabling nginx proxy caching for arvados-controller, to avoid truncation of large responses.
+
+In your Nginx configuration file (@/etc/nginx/conf.d/arvados-api-and-controller.conf@), add the following lines to the @location /@ block with @http://controller@ (see "Update nginx configuration":{{site.baseurl}}/install/install-api-server.html#update-nginx for an example) and reload/restart Nginx (@sudo nginx -s reload@).
+
+<pre>
+ proxy_max_temp_file_size 0;
+ proxy_request_buffering off;
+ proxy_buffering off;
+ proxy_http_version 1.1;
+</pre>
+
+h3. Now recommending Singularity 3.9.9
+
+The compute image "build script":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html now installs Singularity 3.9.9 instead of 3.7.4. The newer version includes a bugfix that should resolve "intermittent loopback device errors":https://dev.arvados.org/issues/18489 when running containers.
+
+h3. Changes to @arvados-cwl-runner --create-workflow@ and @--update-workflow@
+
+When using @arvados-cwl-runner --create-workflow@ or @--update-workflow@, by default it will now make a copy of all collection and Docker image dependencies in the target project. Running workflows retains the old behavior (use the dependencies wherever they are found). The can be controlled explicit with @--copy-deps@ and @--no-copy-deps@.
+
h2(#v2_4_0). v2.4.0 (2022-04-08)
"previous: Upgrading to 2.3.1":#v2_3_1
* Even when using the Singularity runtime, users' container images are expected to be saved in Docker format. Specifying a @.sif@ file as an image when submitting a container request is not yet supported.
* Arvados' Singularity implementation does not yet limit the amount of memory available in a container. Each container will have access to all memory on the host where it runs, unless memory use is restricted by Slurm/LSF.
* The Docker ENTRYPOINT instruction is ignored.
-* Arvados is tested with Singularity version 3.7.4. Other versions may not work.
+* Arvados is tested with Singularity version 3.9.9. Other versions may not work.
h2(#singularity). Set up Singularity
-Follow the "Singularity installation instructions":https://sylabs.io/guides/3.7/user-guide/quick_start.html. Make sure @singularity@ and @mksquashfs@ are working:
+Follow the "Singularity installation instructions":https://sylabs.io/guides/3.9/user-guide/quick_start.html. Make sure @singularity@ and @mksquashfs@ are working:
<notextile>
<pre><code>$ <span class="userinput">singularity version</span>
-3.7.4
+3.9.9
$ <span class="userinput">mksquashfs -version</span>
-mksquashfs version 4.3-git (2014/06/09)
+mksquashfs version 4.4 (2019/08/29)
[...]
</code></pre>
</notextile>
</notextile>
{% include 'singularity_mksquashfs_configuration' %}
-
-h2(#singularity_loop_device_errors). Singularity loop device errors
-
-With singularity v3.9.1 and earlier, containers may fail intermittently at startup with an error message similar to the following in the container log's @stderr.txt@ (line breaks added):
-
-<notextile>
-<pre><code>FATAL: container creation failed:
- mount /proc/self/fd/3->/usr/local/var/singularity/mnt/session/rootfs error:
- while mounting image /proc/self/fd/3:
- failed to find loop device:
- could not attach image file to loop device:
- failed to set loop flags on loop device:
- resource temporarily unavailable
-</code></pre>
-</notextile>
-
-This problem is addressed in singularity v3.9.2. For details, please see "Arvados issue #18489":https://dev.arvados.org/issues/18489 and "singularity PR #458":https://github.com/sylabs/singularity/pull/458.
client_max_body_size 128m;
location / {
- proxy_pass http://controller;
- proxy_redirect off;
- proxy_connect_timeout 90s;
- proxy_read_timeout 300s;
+ proxy_pass http://controller;
+ proxy_redirect off;
+ proxy_connect_timeout 90s;
+ proxy_read_timeout 300s;
+ proxy_max_temp_file_size 0;
+ proxy_request_buffering off;
+ proxy_buffering off;
+ proxy_http_version 1.1;
proxy_set_header Host $http_host;
proxy_set_header Upgrade $http_upgrade;
The @--match-submitter-images@ option will check the id of the image in the local Docker instance and compare it to the id of the image already in Arvados with the same name and tag. If they are different, it will choose the image matching the local image id, which will be uploaded it if necessary. This helpful for development, if you locally rebuild the image with the 'latest' tag, the @--match-submitter-images@ will ensure that the newer version is used.
+h3(#dependencies). Dependencies
+
+Dependencies include collections and Docker images referenced by the workflow. Dependencies are automatically uploaded by @arvados-cwl-runner@ if they are not present or need to be updated. When running a workflow, dependencies that already exist somewhere on the Arvados instance (from a previous upload) will not be uploaded or copied, regardless of the project they are located in. Sometimes this creates problems when sharing a workflow run with others. In this case, use @--copy-deps@ to indicate that you want all dependencies to be copied into the destination project (specified by @--project-uuid@).
+
h3. Command line options
See "arvados-cwl-runner options":{{site.baseurl}}/user/cwl/cwl-run-options.html
h2(#registering). Registering a workflow to use in Workbench
-Use @--create-workflow@ to register a CWL workflow with Arvados. This enables you to share workflows with other Arvados users, and run them by clicking the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button on the Workbench Dashboard and on the command line by UUID.
+Use @--create-workflow@ to register a CWL workflow with Arvados. Use @--project-uuid@ to upload the workflow to a specific project, along with its dependencies. You can share the workflow with other Arvados users by sharing that project. You can run the workflow by clicking the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button on the Workbench Dashboard, and on the command line by UUID.
<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl</span>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --project-uuid zzzzz-j7d0g-p32bi47ogkjke11 --create-workflow bwa-mem.cwl</span>
arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to zzzzz-4zz18-7e0hedrmkuyoei3
You can provide a partial input file to set default values for the workflow input parameters. You can also use the @--name@ option to set the name of the workflow:
<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --name "My workflow with defaults" --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --name "My workflow with defaults" --project-uuid zzzzz-j7d0g-p32bi47ogkjke11 --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Upload local files: "bwa-mem.cwl"
2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Uploaded to zzzzz-4zz18-0f91qkovk4ml18o
</code></pre>
</notextile>
+Use @--update-workflow <uuid>@ to update an existing workflow.
+
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --update-workflow zzzzz-p5p6p-zuniv58hn8d0qd8 bwa-mem.cwl</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to zzzzz-4zz18-7e0hedrmkuyoei3
+2016-07-01 12:21:01 arvados.cwl-runner[15796] INFO: Created template zzzzz-p5p6p-zuniv58hn8d0qd8
+zzzzz-p5p6p-zuniv58hn8d0qd8
+</code></pre>
+</notextile>
+
+When using @--create-workflow@ or @--update-workflow@, the @--copy-deps@ and @--match-submitter-images@ options are enabled by default.
+
h3. Running registered workflows at the command line
You can run a registered workflow at the command line by its UUID:
ManagementToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
Collections:
- BlobSigningKey: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa`, &logbuf).Load()
+ BlobSigningKey: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+ InstanceTypes:
+ abc:
+ IncludedScratch: 123456
+`, &logbuf).Load()
c.Assert(err, check.IsNil)
yaml, err := yaml.Marshal(cfg)
c.Assert(err, check.IsNil)
},
{
arvados.EndpointAPIClientAuthorizationList,
- func() interface{} { return &arvados.ListOptions{} },
+ func() interface{} { return &arvados.ListOptions{Limit: -1} },
func(ctx context.Context, opts interface{}) (interface{}, error) {
return rtr.backend.APIClientAuthorizationList(ctx, *opts.(*arvados.ListOptions))
},
shouldCall: "CollectionList",
withOptions: arvados.ListOptions{Limit: -1},
},
+ {
+ method: "GET",
+ path: "/arvados/v1/api_client_authorizations",
+ shouldCall: "APIClientAuthorizationList",
+ withOptions: arvados.ListOptions{Limit: -1},
+ },
{
method: "GET",
path: "/arvados/v1/collections?limit=123&offset=456&include_trash=true&include_old_versions=1",
const (
rubyversion = "2.7.5"
bundlerversion = "2.2.19"
- singularityversion = "3.7.4"
+ singularityversion = "3.9.9"
pjsversion = "1.9.8"
geckoversion = "0.24.0"
gradleversion = "5.3.1"
pkgs = append(pkgs,
"dpkg-dev",
"eatmydata", // install it for later steps, even if we're not using it now
- "rsync",
)
}
"r-cran-markdown",
"r-cran-roxygen2",
"r-cran-xml",
+ "rsync",
"sudo",
"uuid-dev",
"wget",
exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
+ exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.")
+
parser.add_argument(
"--skip-schemas",
action="store_true",
logger_handler=arvados.log_handler,
custom_schema_callback=add_arv_hints,
loadingContext=executor.loadingContext,
- runtimeContext=executor.runtimeContext,
+ runtimeContext=executor.toplevel_runtimeContext,
input_required=not (arvargs.create_workflow or arvargs.update_workflow))
runtimeContext.project_uuid,
runtimeContext.force_docker_pull,
runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker)
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
network_req, _ = self.get_requirement("NetworkAccess")
if network_req:
"cwd": "/var/spool/cwl",
"priority": self.priority,
"state": "Committed",
- "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
+ "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"kind": "json",
"portable_data_hash": "%s" % workflowcollection
}
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
- if self.on_error:
+ if runtimeContext.on_error:
command.append("--on-error=" + self.on_error)
- if self.intermediate_output_ttl:
- command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
+ if runtimeContext.intermediate_output_ttl:
+ command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
- if self.arvrunner.trash_intermediate:
+ if runtimeContext.trash_intermediate:
command.append("--trash-intermediate")
- if self.arvrunner.project_uuid:
- command.append("--project-uuid="+self.arvrunner.project_uuid)
+ if runtimeContext.project_uuid:
+ command.append("--project-uuid="+runtimeContext.project_uuid)
if self.enable_dev:
command.append("--enable-dev")
def run(self, runtimeContext):
runtimeContext.keepprefix = "keep:"
job_spec = self.arvados_job_spec(runtimeContext)
- if self.arvrunner.project_uuid:
- job_spec["owner_uuid"] = self.arvrunner.project_uuid
+ if runtimeContext.project_uuid:
+ job_spec["owner_uuid"] = runtimeContext.project_uuid
extra_submit_params = {}
if runtimeContext.submit_runner_cluster:
def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid,
- force_pull, tmp_outdir_prefix, match_local_docker):
+ force_pull, tmp_outdir_prefix, match_local_docker, copy_deps):
"""Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement:
image_name = sp[0]
image_tag = sp[1] if len(sp) > 1 else "latest"
- images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ out_of_project_images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
image_name=image_name,
- image_tag=image_tag)
+ image_tag=image_tag,
+ project_uuid=None)
- if images and match_local_docker:
+ if copy_deps:
+ # Only images that are available in the destination project
+ images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ image_name=image_name,
+ image_tag=image_tag,
+ project_uuid=project_uuid)
+ else:
+ images = out_of_project_images
+
+ if match_local_docker:
local_image_id = determine_image_id(dockerRequirement["dockerImageId"])
if local_image_id:
# find it in the list
# force re-upload.
images = []
+ for i in out_of_project_images:
+ if i[1]["dockerhash"] == local_image_id:
+ found = True
+ out_of_project_images = [i]
+ break
+ if not found:
+ # force re-upload.
+ out_of_project_images = []
+
if not images:
- # Fetch Docker image if necessary.
- try:
- result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image,
- force_pull, tmp_outdir_prefix)
- if not result:
- raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"])
- except OSError as e:
- raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e))
+ if not out_of_project_images:
+ # Fetch Docker image if necessary.
+ try:
+ result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image,
+ force_pull, tmp_outdir_prefix)
+ if not result:
+ raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"])
+ except OSError as e:
+ raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e))
# Upload image to Arvados
args = []
images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
image_name=image_name,
- image_tag=image_tag)
+ image_tag=image_tag,
+ project_uuid=project_uuid)
if not images:
raise WorkflowException("Could not find Docker image %s:%s" % (image_name, image_tag))
max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
sum_res_pars = ("outdirMin", "outdirMax")
-def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
+def upload_workflow(arvRunner, tool, job_order, project_uuid,
+ runtimeContext, uuid=None,
submit_runner_ram=0, name=None, merged_map=None,
submit_runner_image=None):
- packed = packed_workflow(arvRunner, tool, merged_map)
+ packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext)
adjustDirObjs(job_order, trim_listing)
adjustFileObjs(job_order, trim_anonymous_location)
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
upload_dependencies(arvRunner, name, tool.doc_loader,
- packed, tool.tool["id"], False)
+ packed, tool.tool["id"], False,
+ runtimeContext)
wf_runner_resources = None
wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
hints.append(wf_runner_resources)
- wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, submit_runner_image or "arvados/jobs:"+__version__)
+ wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+ submit_runner_image or "arvados/jobs:"+__version__,
+ runtimeContext)
if submit_runner_ram:
wf_runner_resources["ramMin"] = submit_runner_ram
self.doc_loader,
joborder,
joborder.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if self.wf_pdh is None:
packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
self.doc_loader,
packed,
self.tool["id"],
- False)
+ False,
+ runtimeContext)
# Discover files/directories referenced by the
# workflow (mainly "default" values)
if self.wf_pdh is None:
adjustFileObjs(packed, keepmount)
adjustDirObjs(packed, keepmount)
- self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+ self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
self.loadingContext = self.loadingContext.copy()
self.loadingContext.metadata = self.loadingContext.metadata.copy()
self.collection_cache_size = 256
self.match_local_docker = False
self.enable_preemptible = None
+ self.copy_deps = None
super(ArvRuntimeContext, self).__init__(kwargs)
handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
root_logger.addHandler(handler)
- self.runtimeContext = ArvRuntimeContext(vars(arvargs))
- self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
+ self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
+ self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=self.collection_cache)
- validate_cluster_target(self, self.runtimeContext)
+ validate_cluster_target(self, self.toplevel_runtimeContext)
def arv_make_tool(self, toolpath_object, loadingContext):
updated_tool.visit(self.check_features)
- self.project_uuid = runtimeContext.project_uuid
self.pipeline = None
self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
self.secret_store = runtimeContext.secret_store
if runtimeContext.submit_request_uuid and self.work_api != "containers":
raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+ runtimeContext = runtimeContext.copy()
+
default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
if runtimeContext.storage_classes == "default":
runtimeContext.storage_classes = default_storage_classes
if not runtimeContext.name:
runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
+ if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
+ # When creating or updating workflow record, by default
+ # always copy dependencies and ensure Docker images are up
+ # to date.
+ runtimeContext.copy_deps = True
+ runtimeContext.match_local_docker = True
+
+ if runtimeContext.update_workflow and self.project_uuid is None:
+ # If we are updating a workflow, make sure anything that
+ # gets uploaded goes into the same parent project, unless
+ # an alternate --project-uuid was provided.
+ existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
+ runtimeContext.project_uuid = existing_wf["owner_uuid"]
+
+ self.project_uuid = runtimeContext.project_uuid
+
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- updated_tool, job_order)
+ updated_tool, job_order, runtimeContext)
# the last clause means: if it is a command line tool, and we
# are going to wait for the result, and always_submit_runner
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- merged_map = upload_workflow_deps(self, tool)
+ merged_map = upload_workflow_deps(self, tool, runtimeContext)
# Recreate process object (ArvadosWorkflow or
# ArvadosCommandTool) because tool document may have been
loadingContext.metadata = tool.metadata
tool = load_tool(tool.tool, loadingContext)
- existing_uuid = runtimeContext.update_workflow
- if existing_uuid or runtimeContext.create_workflow:
+ if runtimeContext.update_workflow or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
if self.work_api == "containers":
uuid = upload_workflow(self, tool, job_order,
- self.project_uuid,
- uuid=existing_uuid,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map,
- submit_runner_image=runtimeContext.submit_runner_image)
+ runtimeContext.project_uuid,
+ runtimeContext,
+ uuid=runtimeContext.update_workflow,
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ merged_map=merged_map,
+ submit_runner_image=runtimeContext.submit_runner_image)
self.stdout.write(uuid + "\n")
return (None, "success")
self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
self.eval_timeout = runtimeContext.eval_timeout
- runtimeContext = runtimeContext.copy()
runtimeContext.use_container = True
runtimeContext.tmpdir_prefix = "tmp"
runtimeContext.work_api = self.work_api
import schema_salad.validate as validate
import arvados.collection
+import arvados.util
from .util import collectionUUID
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq
set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
return
+ if inputschema == "File":
+ inputschema = {"type": "File"}
+
if isinstance(inputschema, basestring):
sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
if sd:
set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
elif (inputschema["type"] == "File" and
- secondaryspec and
isinstance(primary, Mapping) and
- primary.get("class") == "File" and
- "secondaryFiles" not in primary):
+ primary.get("class") == "File"):
+
+ if "secondaryFiles" in primary or not secondaryspec:
+ # Nothing to do.
+ return
+
#
# Found a file, check for secondaryFiles
#
primary["secondaryFiles"] = secondaryspec
for i, sf in enumerate(aslist(secondaryspec)):
if builder.cwlVersion == "v1.0":
- pattern = builder.do_eval(sf, context=primary)
+ pattern = sf
else:
- pattern = builder.do_eval(sf["pattern"], context=primary)
+ pattern = sf["pattern"]
if pattern is None:
continue
if isinstance(pattern, list):
"Expression must return list, object, string or null")
if pattern is not None:
- sfpath = substitute(primary["location"], pattern)
+ if "${" in pattern or "$(" in pattern:
+ sfname = builder.do_eval(pattern, context=primary)
+ else:
+ sfname = substitute(primary["basename"], pattern)
+
+ if sfname is None:
+ continue
+
+ p_location = primary["location"]
+ if "/" in p_location:
+ sfpath = (
+ p_location[0 : p_location.rindex("/") + 1]
+ + sfname
+ )
required = builder.do_eval(required, context=primary)
set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run,
+ workflowobj, uri, loadref_run, runtimeContext,
include_primary=True, discovered_secondaryfiles=None):
"""Upload the dependencies of the workflowobj document to Keep.
single_collection=True,
optional_deps=optional_deps)
+ keeprefs = set()
+ def addkeepref(k):
+ if k.startswith("keep:"):
+ keeprefs.add(collection_pdh_pattern.match(k).group(1))
+
def setloc(p):
loc = p.get("location")
if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
p["location"] = mapper.mapper(p["location"]).resolved
+ addkeepref(p["location"])
return
if not loc:
gp = collection_uuid_pattern.match(loc)
if not gp:
+ # Not a uuid pattern (must be a pdh pattern)
+ addkeepref(p["location"])
return
+
uuid = gp.groups()[0]
if uuid not in uuid_map:
raise SourceLine(p, "location", validate.ValidationException).makeError(
for d in discovered:
discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
+ if runtimeContext.copy_deps:
+ # Find referenced collections and copy them into the
+ # destination project, for easy sharing.
+ already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+ filters=[["portable_data_hash", "in", list(keeprefs)],
+ ["owner_uuid", "=", runtimeContext.project_uuid]],
+ select=["uuid", "portable_data_hash", "created_at"]))
+
+ keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+ for kr in keeprefs:
+ col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+ order="created_at desc",
+ select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+ limit=1).execute()
+ if len(col["items"]) == 0:
+ logger.warning("Cannot find collection with portable data hash %s", kr)
+ continue
+ col = col["items"][0]
+ try:
+ arvrunner.api.collections().create(body={"collection": {
+ "owner_uuid": runtimeContext.project_uuid,
+ "name": col["name"],
+ "description": col["description"],
+ "properties": col["properties"],
+ "portable_data_hash": col["portable_data_hash"],
+ "manifest_text": col["manifest_text"],
+ "storage_classes_desired": col["storage_classes_desired"],
+ "trash_at": col["trash_at"]
+ }}, ensure_unique_name=True).execute()
+ except Exception as e:
+ logger.warning("Unable copy collection to destination: %s", e)
+
if "$schemas" in workflowobj:
sch = CommentedSeq()
for s in workflowobj["$schemas"]:
return mapper
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
"""Uploads Docker images used in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
if docker_req:
if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
- # TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
- True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
+ upload_docker(arvrunner, s.embedded_tool, runtimeContext)
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
if v.get("class") == "DockerRequirement":
v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
- arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
packed["http://schema.org/version"] = githash
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
object with 'location' updated to the proper keep references.
"""
tool.doc_loader,
job_order,
job_order.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if "id" in job_order:
del job_order["id"]
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- upload_docker(arvrunner, tool)
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
deptool,
deptool["id"],
False,
+ runtimeContext,
include_primary=False,
discovered_secondaryfiles=discovered_secondaryfiles)
document_loader.idx[deptool["id"]] = deptool
return merged_map
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
collection = arvados.collection.Collection(api_client=arvrunner.api,
keep_client=arvrunner.keep_client,
num_retries=arvrunner.num_retries)
filters = [["portable_data_hash", "=", collection.portable_data_hash()],
["name", "like", name+"%"]]
- if arvrunner.project_uuid:
- filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+ if runtimeContext.project_uuid:
+ filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
if exists["items"]:
logger.info("Using collection %s", exists["items"][0]["uuid"])
else:
collection.save_new(name=name,
- owner_uuid=arvrunner.project_uuid,
+ owner_uuid=runtimeContext.project_uuid,
ensure_unique_name=True,
num_retries=arvrunner.num_retries)
logger.info("Uploaded to %s", collection.manifest_locator())
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.2
+class: CommandLineTool
+baseCommand: echo
+inputs:
+ message:
+ type: File
+ inputBinding:
+ position: 1
+ default:
+ class: File
+ location: keep:d7514270f356df848477718d58308cc4+94/b
+
+outputs: []
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.2
+class: Workflow
+
+requirements:
+ InlineJavascriptRequirement: {}
+
+inputs:
+ file1:
+ type: File?
+ secondaryFiles:
+ - pattern: .tbi
+ required: true
+ file2:
+ type: File
+ secondaryFiles:
+ - pattern: |
+ ${
+ return self.basename + '.tbi';
+ }
+ required: true
+outputs:
+ out:
+ type: File
+ outputSource: cat/out
+ out2:
+ type: File
+ outputSource: cat2/out
+steps:
+ cat:
+ in:
+ inp: file1
+ run: cat2.cwl
+ out: [out]
+ cat2:
+ in:
+ inp: file2
+ run: cat2.cwl
+ out: [out]
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+file1:
+ class: File
+ location: 19109-upload-secondary/file1.txt
+file2:
+ class: File
+ location: 19109-upload-secondary/file2.txt
--- /dev/null
+strawberry
--- /dev/null
+blueberry
\ No newline at end of file
--- /dev/null
+mango
\ No newline at end of file
arv-put --portable-data-hash samples/sample1_S01_R1_001.fastq.gz
fi
+# Test for #18888
+# This is a standalone test because the bug was observed with this
+# command line and was thought to be due to command line handling.
arvados-cwl-runner 18888-download_def.cwl --scripts scripts/
+# Test for #19070
+# The most effective way to test this seemed to be to write an
+# integration test to check for the expected behavior.
+python test_copy_deps.py
+
+# Run integration tests
exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum --api=containers
output: {}
tool: 18994-basename/wf_ren.cwl
doc: "Test issue 18994 - correctly stage file with modified basename"
+
+- job: 19109-upload-secondary.yml
+ output: {
+ "out": {
+ "basename": "file1.catted",
+ "class": "File",
+ "location": "file1.catted",
+ "size": 20,
+ "checksum": "sha1$c4cead17cebdd829f38c48e18a28f1da72339ef7"
+ },
+ "out2": {
+ "basename": "file2.catted",
+ "checksum": "sha1$6f71c5d1512519ede45bedfdd624e05fd8037b0d",
+ "class": "File",
+ "location": "file2.catted",
+ "size": 12
+ }
+ }
+ tool: 19109-upload-secondary.cwl
+ doc: "Test issue 19109 - correctly discover & upload secondary files"
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.2
+class: CommandLineTool
+inputs:
+ - id: inp
+ type: File
+ secondaryFiles:
+ - pattern: .tbi
+ required: true
+stdout: $(inputs.inp.nameroot).catted
+outputs:
+ out:
+ type: stdout
+arguments: [cat, '$(inputs.inp.path)', '$(inputs.inp.secondaryFiles[0].path)']
+#!/bin/sh
+
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
-#!/bin/bash
-
echo bubble
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import subprocess
+
+api = arvados.api()
+
+def check_contents(group, wf_uuid):
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 3:
+ raise Exception("Expected 3 items in "+group["uuid"]+" was "+len(contents["items"]))
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid:
+ found = True
+ if not found:
+ raise Exception("Couldn't find workflow in "+group["uuid"])
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if not found:
+ raise Exception("Couldn't find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if not found:
+ raise Exception("Couldn't find jobs image dependency")
+
+
+def test_create():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-1", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Create workflow, by default should also copy dependencies
+ cmd = ["arvados-cwl-runner", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+ check_contents(group, wf_uuid)
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+
+def test_update():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-2", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Create workflow, but with --no-copy-deps it shouldn't copy anything
+ cmd = ["arvados-cwl-runner", "--no-copy-deps", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 1:
+ raise Exception("Expected 1 items")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid:
+ found = True
+ if not found:
+ raise Exception("Couldn't find workflow")
+
+ # Updating by default will copy missing items
+ cmd = ["arvados-cwl-runner", "--update-workflow", wf_uuid, "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+ check_contents(group, wf_uuid)
+
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+
+def test_execute():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-3", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Execute workflow, shouldn't copy anything.
+ cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ # container request
+ # final output collection
+ # container log
+ # step output collection
+ # container request log
+ if len(contents["items"]) != 5:
+ raise Exception("Expected 5 items")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if found:
+ raise Exception("Didn't expect to find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if found:
+ raise Exception("Didn't expect to find jobs image dependency")
+
+ # Execute workflow with --copy-deps
+ cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "--copy-deps", "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if not found:
+ raise Exception("Couldn't find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if not found:
+ raise Exception("Couldn't find jobs image dependency")
+
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+if __name__ == '__main__':
+ test_create()
+ test_update()
+ test_execute()
def stubs(func):
@functools.wraps(func)
+ @mock.patch("arvados_cwl.arvdocker.determine_image_id")
@mock.patch("uuid.uuid4")
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@mock.patch("arvados.collection.KeepClient")
@mock.patch("arvados.keep.KeepClient")
@mock.patch("arvados.events.subscribe")
- def wrapped(self, events, keep_client1, keep_client2, keepdocker, uuid4, *args, **kwargs):
+ def wrapped(self, events, keep_client1, keep_client2, keepdocker,
+ uuid4, determine_image_id, *args, **kwargs):
class Stubs(object):
pass
stubs = Stubs()
"df80736f-f14d-4b10-b2e3-03aa27f034b2", "df80736f-f14d-4b10-b2e3-03aa27f034b3",
"df80736f-f14d-4b10-b2e3-03aa27f034b4", "df80736f-f14d-4b10-b2e3-03aa27f034b5"]
+ determine_image_id.return_value = None
+
def putstub(p, **kwargs):
return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
keep_client1().put.side_effect = putstub
"arvados/jobs:123": [("zzzzz-4zz18-zzzzzzzzzzzzzd5", {})],
"arvados/jobs:latest": [("zzzzz-4zz18-zzzzzzzzzzzzzd6", {})],
}
- def kd(a, b, image_name=None, image_tag=None):
+ def kd(a, b, image_name=None, image_tag=None, project_uuid=None):
return stubs.docker_images.get("%s:%s" % (image_name, image_tag), [])
stubs.keepdocker.side_effect = kd
"name": "arvados/jobs:"+arvados_cwl.__version__,
"owner_uuid": "",
"properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+ {"items": [{"created_at": "",
+ "head_uuid": "",
+ "link_class": "docker_image_hash",
+ "name": "123456",
+ "owner_uuid": "",
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+ {"items": [{"created_at": "",
+ "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+ "link_class": "docker_image_repo+tag",
+ "name": "arvados/jobs:"+arvados_cwl.__version__,
+ "owner_uuid": "",
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
{"items": [{"created_at": "",
"head_uuid": "",
"link_class": "docker_image_hash",
"owner_uuid": "",
"manifest_text": "",
"properties": ""
- }], "items_available": 1, "offset": 0},)
+ }], "items_available": 1, "offset": 0},
+ {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+ "owner_uuid": "",
+ "manifest_text": "",
+ "properties": ""
+ }], "items_available": 1, "offset": 0})
arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
arvrunner.api.collections().get().execute.return_value = {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
"portable_data_hash": "9999999999999999999999999999999b+99"}
+
self.assertEqual("9999999999999999999999999999999b+99",
- arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
+ arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__, arvrunner.runtimeContext))
@stubs
@stubs
def test_update(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid}
+
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
"--debug",
"name": "submit_wf.cwl",
"description": "",
"definition": self.expect_workflow,
+ "owner_uuid": project_uuid
}
}
stubs.api.workflows().update.assert_called_with(
@stubs
def test_update_name(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid}
+
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
"--debug", "--name", "testing 123",
"name": "testing 123",
"description": "",
"definition": self.expect_workflow,
+ "owner_uuid": project_uuid
}
}
stubs.api.workflows().update.assert_called_with(
}
type InstanceType struct {
- Name string
+ Name string `json:"-"`
ProviderType string
VCPUs int
RAM ByteSize
- Scratch ByteSize
+ Scratch ByteSize `json:"-"`
IncludedScratch ByteSize
AddedScratch ByteSize
Price float64
var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
-// UnmarshalJSON handles old config files that provide an array of
-// instance types instead of a hash.
+// UnmarshalJSON does special handling of InstanceTypes:
+// * populate computed fields (Name and Scratch)
+// * error out if InstancesTypes are populated as an array, which was
+// deprecated in Arvados 1.2.0
func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
fixup := func(t InstanceType) (InstanceType, error) {
if t.ProviderType == "" {
t.ProviderType = t.Name
}
- if t.Scratch == 0 {
- t.Scratch = t.IncludedScratch + t.AddedScratch
- } else if t.AddedScratch == 0 {
- t.AddedScratch = t.Scratch - t.IncludedScratch
- } else if t.IncludedScratch == 0 {
- t.IncludedScratch = t.Scratch - t.AddedScratch
- }
-
- if t.Scratch != (t.IncludedScratch + t.AddedScratch) {
- return t, fmt.Errorf("InstanceType %q: Scratch != (IncludedScratch + AddedScratch)", t.Name)
- }
+ // If t.Scratch is set in the configuration file, it will be ignored and overwritten.
+ // It will also generate a "deprecated or unknown config entry" warning.
+ t.Scratch = t.IncludedScratch + t.AddedScratch
return t, nil
}
if len(data) > 0 && data[0] == '[' {
- var arr []InstanceType
- err := json.Unmarshal(data, &arr)
- if err != nil {
- return err
- }
- if len(arr) == 0 {
- *it = nil
- return nil
- }
- *it = make(map[string]InstanceType, len(arr))
- for _, t := range arr {
- if _, ok := (*it)[t.Name]; ok {
- return errDuplicateInstanceTypeName
- }
- t, err := fixup(t)
- if err != nil {
- return err
- }
- (*it)[t.Name] = t
- }
- return nil
+ return fmt.Errorf("InstanceTypes must be specified as a map, not an array, see https://doc.arvados.org/admin/config.html")
}
var hash map[string]InstanceType
err := json.Unmarshal(data, &hash)
type ConfigSuite struct{}
-func (s *ConfigSuite) TestInstanceTypesAsArray(c *check.C) {
+func (s *ConfigSuite) TestStringSetAsArray(c *check.C) {
var cluster Cluster
yaml.Unmarshal([]byte(`
API:
c.Check(ok, check.Equals, true)
}
-func (s *ConfigSuite) TestStringSetAsArray(c *check.C) {
- var cluster Cluster
- yaml.Unmarshal([]byte("InstanceTypes:\n- Name: foo\n"), &cluster)
- c.Check(len(cluster.InstanceTypes), check.Equals, 1)
- c.Check(cluster.InstanceTypes["foo"].Name, check.Equals, "foo")
-}
-
func (s *ConfigSuite) TestInstanceTypesAsHash(c *check.C) {
var cluster Cluster
yaml.Unmarshal([]byte("InstanceTypes:\n foo:\n ProviderType: bar\n"), &cluster)
func (s *ConfigSuite) TestInstanceTypeSize(c *check.C) {
var it InstanceType
- err := yaml.Unmarshal([]byte("Name: foo\nScratch: 4GB\nRAM: 4GiB\n"), &it)
+ err := yaml.Unmarshal([]byte("Name: foo\nIncludedScratch: 4GB\nRAM: 4GiB\n"), &it)
c.Check(err, check.IsNil)
- c.Check(int64(it.Scratch), check.Equals, int64(4000000000))
+ c.Check(int64(it.IncludedScratch), check.Equals, int64(4000000000))
c.Check(int64(it.RAM), check.Equals, int64(4294967296))
}
func (s *ConfigSuite) TestInstanceTypeFixup(c *check.C) {
for _, confdata := range []string{
// Current format: map of entries
- `{foo4: {IncludedScratch: 4GB}, foo8: {ProviderType: foo_8, Scratch: 8GB}}`,
- // Legacy format: array of entries with key in "Name" field
- `[{Name: foo4, IncludedScratch: 4GB}, {Name: foo8, ProviderType: foo_8, Scratch: 8GB}]`,
+ `{foo4: {IncludedScratch: 4GB}, foo8: {ProviderType: foo_8, AddedScratch: 8GB}}`,
} {
c.Log(confdata)
var itm InstanceTypeMap
"git.arvados.org/arvados.git/sdk/go/auth"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
-const defaultTimeout = arvados.Duration(2 * time.Second)
+const (
+ defaultTimeout = arvados.Duration(2 * time.Second)
+ maxClockSkew = time.Minute
+)
// Aggregator implements service.Handler. It handles "GET /_health/all"
// by checking the health of all configured services on the cluster
// If non-nil, Log is called after handling each request.
Log func(*http.Request, error)
+
+ // If non-nil, report clock skew on each health-check.
+ MetricClockSkew prometheus.Gauge
}
func (agg *Aggregator) setup() {
// anywhere."
Services map[arvados.ServiceName]ServiceHealth `json:"services"`
+ // Difference between min/max timestamps in individual
+ // health-check responses.
+ ClockSkew arvados.Duration
+
Errors []string `json:"errors"`
}
HTTPStatusText string `json:",omitempty"`
Response map[string]interface{} `json:"response"`
ResponseTime json.Number `json:"responseTime"`
+ ClockTime time.Time `json:"clockTime"`
Metrics Metrics `json:"-"`
+ respTime time.Duration
}
type Metrics struct {
}
}
+ var maxResponseTime time.Duration
+ var clockMin, clockMax time.Time
+ for _, result := range resp.Checks {
+ if result.ClockTime.IsZero() {
+ continue
+ }
+ if clockMin.IsZero() || result.ClockTime.Before(clockMin) {
+ clockMin = result.ClockTime
+ }
+ if result.ClockTime.After(clockMax) {
+ clockMax = result.ClockTime
+ }
+ if result.respTime > maxResponseTime {
+ maxResponseTime = result.respTime
+ }
+ }
+ skew := clockMax.Sub(clockMin)
+ resp.ClockSkew = arvados.Duration(skew)
+ if skew > maxClockSkew+maxResponseTime {
+ msg := fmt.Sprintf("clock skew detected: maximum timestamp spread is %s (exceeds warning threshold of %s)", resp.ClockSkew, arvados.Duration(maxClockSkew))
+ resp.Errors = append(resp.Errors, msg)
+ resp.Health = "ERROR"
+ }
+ if agg.MetricClockSkew != nil {
+ agg.MetricClockSkew.Set(skew.Seconds())
+ }
+
var newest Metrics
for _, result := range resp.Checks {
if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
t0 := time.Now()
defer func() {
- result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+ result.respTime = time.Since(t0)
+ result.ResponseTime = json.Number(fmt.Sprintf("%.6f", result.respTime.Seconds()))
}()
result.Health = "ERROR"
}
}
result.Health = "OK"
+ result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
return
}
s.checkOK(c)
}
+func (s *AggregatorSuite) TestClockSkew(c *check.C) {
+ // srv1: report real wall clock time
+ handler1 := healthyHandler{}
+ srv1, listen1 := s.stubServer(&handler1)
+ defer srv1.Close()
+ // srv2: report near-future time
+ handler2 := healthyHandler{headerDate: time.Now().Add(3 * time.Second)}
+ srv2, listen2 := s.stubServer(&handler2)
+ defer srv2.Close()
+ // srv3: report far-future time
+ handler3 := healthyHandler{headerDate: time.Now().Add(3*time.Minute + 3*time.Second)}
+ srv3, listen3 := s.stubServer(&handler3)
+ defer srv3.Close()
+
+ s.setAllServiceURLs(listen1)
+
+ // near-future time => OK
+ s.resp = httptest.NewRecorder()
+ arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
+ "http://localhost"+listen2+"/")
+ s.handler.ServeHTTP(s.resp, s.req)
+ s.checkOK(c)
+
+ // far-future time => error
+ s.resp = httptest.NewRecorder()
+ arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
+ "http://localhost"+listen3+"/")
+ s.handler.ServeHTTP(s.resp, s.req)
+ resp := s.checkUnhealthy(c)
+ if c.Check(len(resp.Errors) > 0, check.Equals, true) {
+ c.Check(resp.Errors[0], check.Matches, `clock skew detected: maximum timestamp spread is 3m.* \(exceeds warning threshold of 1m\)`)
+ }
+}
+
func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
s.handler.timeout = arvados.Duration(100 * time.Millisecond)
srv, listen := s.stubServer(&slowHandler{})
type healthyHandler struct {
configHash string
configTime time.Time
+ headerDate time.Time
}
func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if !h.headerDate.IsZero() {
+ resp.Header().Set("Date", h.headerDate.Format(time.RFC1123))
+ }
authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
if req.URL.Path == "/_health/ping" {
if !authOK {
def popen_docker(cmd, *args, **kwargs):
manage_stdin = ('stdin' not in kwargs)
kwargs.setdefault('stdin', subprocess.PIPE)
- kwargs.setdefault('stdout', sys.stderr)
+ kwargs.setdefault('stdout', subprocess.PIPE)
+ kwargs.setdefault('stderr', subprocess.PIPE)
try:
docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
except OSError: # No docker in $PATH, try docker.io
'tag': tag,
}
-def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
+def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None):
"""List all Docker images known to the api_client with image_name and
image_tag. If no image_name is given, defaults to listing all
Docker images.
search_filters = []
repo_links = None
hash_links = None
+
+ project_filter = []
+ if project_uuid is not None:
+ project_filter = [["owner_uuid", "=", project_uuid]]
+
if image_name:
# Find images with the name the user specified.
search_links = _get_docker_links(
api_client, num_retries,
filters=[['link_class', '=', 'docker_image_repo+tag'],
['name', '=',
- '{}:{}'.format(image_name, image_tag or 'latest')]])
+ '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter)
if search_links:
repo_links = search_links
else:
search_links = _get_docker_links(
api_client, num_retries,
filters=[['link_class', '=', 'docker_image_hash'],
- ['name', 'ilike', image_name + '%']])
+ ['name', 'ilike', image_name + '%']]+project_filter)
hash_links = search_links
# Only list information about images that were found in the search.
search_filters.append(['head_uuid', 'in',
if hash_links is None:
hash_links = _get_docker_links(
api_client, num_retries,
- filters=search_filters + [['link_class', '=', 'docker_image_hash']])
+ filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter)
hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
# Each collection may have more than one name (though again, one name
repo_links = _get_docker_links(
api_client, num_retries,
filters=search_filters + [['link_class', '=',
- 'docker_image_repo+tag']])
+ 'docker_image_repo+tag']]+project_filter)
seen_image_names = collections.defaultdict(set)
images = []
for link in repo_links:
# Remove any image listings that refer to unknown collections.
existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
api_client.collections().list, num_retries,
- filters=[['uuid', 'in', [im['collection'] for im in images]]],
+ filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter,
select=['uuid'])}
return [(image['collection'], image) for image in images
if image['collection'] in existing_coll_uuids]
if args.pull and not find_image_hashes(args.image):
pull_image(args.image, args.tag)
+ images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag)
+
+ image_hash = None
try:
image_hash = find_one_image_hash(args.image, args.tag)
+ if not docker_image_compatible(api, image_hash):
+ if args.force_image_format:
+ logger.warning("forcing incompatible image")
+ else:
+ logger.error("refusing to store " \
+ "incompatible format (use --force-image-format to override)")
+ sys.exit(1)
except DockerError as error:
- logger.error(str(error))
- sys.exit(1)
-
- if not docker_image_compatible(api, image_hash):
- if args.force_image_format:
- logger.warning("forcing incompatible image")
+ if images_in_arv:
+ # We don't have Docker / we don't have the image locally,
+ # use image that's already uploaded to Arvados
+ image_hash = images_in_arv[0][1]['dockerhash']
else:
- logger.error("refusing to store " \
- "incompatible format (use --force-image-format to override)")
+ logger.error(str(error))
sys.exit(1)
image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto https;
proxy_redirect off;
+ proxy_max_temp_file_size 0;
+ proxy_request_buffering off;
+ proxy_buffering off;
+ proxy_http_version 1.1;
}
}
upstream arv-git-http {
self.run_arv_keepdocker(['--version'], sys.stderr)
self.assertVersionOutput(out, err)
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv',
+ return_value=[])
@mock.patch('arvados.commands.keepdocker.find_image_hashes',
return_value=['abc123'])
@mock.patch('arvados.commands.keepdocker.find_one_image_hash',
return_value='abc123')
- def test_image_format_compatibility(self, _1, _2):
+ def test_image_format_compatibility(self, _1, _2, _3):
old_id = hashlib.sha256(b'old').hexdigest()
new_id = 'sha256:'+hashlib.sha256(b'new').hexdigest()
for supported, img_id, expect_ok in [
self.run_arv_keepdocker(['[::1]/repo/img'], sys.stderr)
find_image_mock.assert_called_with('[::1]/repo/img', 'latest')
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv',
+ return_value=[])
@mock.patch('arvados.commands.keepdocker.find_image_hashes',
return_value=['abc123'])
@mock.patch('arvados.commands.keepdocker.find_one_image_hash',
return_value='abc123')
- def test_collection_property_update(self, _1, _2):
+ def test_collection_property_update(self, _1, _2, _3):
image_id = 'sha256:'+hashlib.sha256(b'image').hexdigest()
fakeDD = arvados.api('v1')._rootDesc
fakeDD['dockerImageFormats'] = ['v2']
curl -s https://storage.googleapis.com/golang/go${GOVERSION}.linux-amd64.tar.gz | tar -C /var/lib/arvados -xzf -
ln -sf /var/lib/arvados/go/bin/* /usr/local/bin/
-singularityversion=3.7.4
+singularityversion=3.9.9
curl -Ls https://github.com/sylabs/singularity/archive/refs/tags/v${singularityversion}.tar.gz | tar -C /var/lib/arvados -xzf -
cd /var/lib/arvados/singularity-${singularityversion}
- proxy_set_header: 'X-Real-IP $remote_addr'
- proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
- proxy_set_header: 'X-External-Client $external_client'
+ - proxy_max_temp_file_size: 0
+ - proxy_request_buffering: 'off'
+ - proxy_buffering: 'off'
+ - proxy_http_version: '1.1'
- include: snippets/ssl_hardening_default.conf
- ssl_certificate: __CERT_PEM__
- ssl_certificate_key: __CERT_KEY__
- proxy_set_header: 'X-Real-IP $remote_addr'
- proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
- proxy_set_header: 'X-External-Client $external_client'
+ - proxy_max_temp_file_size: 0
+ - proxy_request_buffering: 'off'
+ - proxy_buffering: 'off'
+ - proxy_http_version: '1.1'
- include: snippets/ssl_hardening_default.conf
- ssl_certificate: __CERT_PEM__
- ssl_certificate_key: __CERT_KEY__
- proxy_set_header: 'X-Real-IP $remote_addr'
- proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
- proxy_set_header: 'X-External-Client $external_client'
+ - proxy_max_temp_file_size: 0
+ - proxy_request_buffering: 'off'
+ - proxy_buffering: 'off'
+ - proxy_http_version: '1.1'
- include: snippets/ssl_hardening_default.conf
- ssl_certificate: __CERT_PEM__
- ssl_certificate_key: __CERT_KEY__