From: Peter Amstutz Date: Fri, 13 May 2022 15:08:31 +0000 (-0400) Subject: Merge branch '19109-upload-secondary' refs #19109 X-Git-Tag: 2.5.0~178 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/a89fbc8b4f2d8db8654175428bd1f041eed6f109?hp=1a6cf8e65c86a0002f0cf0c3a2d4092b67f9b57b Merge branch '19109-upload-secondary' refs #19109 Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- diff --git a/doc/admin/upgrading.html.textile.liquid b/doc/admin/upgrading.html.textile.liquid index 227a8cf07b..2d10c870d5 100644 --- a/doc/admin/upgrading.html.textile.liquid +++ b/doc/admin/upgrading.html.textile.liquid @@ -36,6 +36,10 @@ h3. Now recommending Singularity 3.9.9 The compute image "build script":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html now installs Singularity 3.9.9 instead of 3.7.4. The newer version includes a bugfix that should resolve "intermittent loopback device errors":https://dev.arvados.org/issues/18489 when running containers. +h3. Changes to @arvados-cwl-runner --create-workflow@ and @--update-workflow@ + +When using @arvados-cwl-runner --create-workflow@ or @--update-workflow@, by default it will now make a copy of all collection and Docker image dependencies in the target project. Running workflows retains the old behavior (use the dependencies wherever they are found). The can be controlled explicit with @--copy-deps@ and @--no-copy-deps@. + h2(#v2_4_0). v2.4.0 (2022-04-08) "previous: Upgrading to 2.3.1":#v2_3_1 diff --git a/doc/user/cwl/cwl-runner.html.textile.liquid b/doc/user/cwl/cwl-runner.html.textile.liquid index 07663849ad..d3aed6ce58 100644 --- a/doc/user/cwl/cwl-runner.html.textile.liquid +++ b/doc/user/cwl/cwl-runner.html.textile.liquid @@ -121,16 +121,20 @@ If there is already a Docker image in Arvados with the same name, it will use th The @--match-submitter-images@ option will check the id of the image in the local Docker instance and compare it to the id of the image already in Arvados with the same name and tag. If they are different, it will choose the image matching the local image id, which will be uploaded it if necessary. This helpful for development, if you locally rebuild the image with the 'latest' tag, the @--match-submitter-images@ will ensure that the newer version is used. +h3(#dependencies). Dependencies + +Dependencies include collections and Docker images referenced by the workflow. Dependencies are automatically uploaded by @arvados-cwl-runner@ if they are not present or need to be updated. When running a workflow, dependencies that already exist somewhere on the Arvados instance (from a previous upload) will not be uploaded or copied, regardless of the project they are located in. Sometimes this creates problems when sharing a workflow run with others. In this case, use @--copy-deps@ to indicate that you want all dependencies to be copied into the destination project (specified by @--project-uuid@). + h3. Command line options See "arvados-cwl-runner options":{{site.baseurl}}/user/cwl/cwl-run-options.html h2(#registering). Registering a workflow to use in Workbench -Use @--create-workflow@ to register a CWL workflow with Arvados. This enables you to share workflows with other Arvados users, and run them by clicking the Run a process... button on the Workbench Dashboard and on the command line by UUID. +Use @--create-workflow@ to register a CWL workflow with Arvados. Use @--project-uuid@ to upload the workflow to a specific project, along with its dependencies. You can share the workflow with other Arvados users by sharing that project. You can run the workflow by clicking the Run a process... button on the Workbench Dashboard, and on the command line by UUID. -
~/arvados/doc/user/cwl/bwa-mem$ arvados-cwl-runner --create-workflow bwa-mem.cwl
+
~/arvados/doc/user/cwl/bwa-mem$ arvados-cwl-runner --project-uuid zzzzz-j7d0g-p32bi47ogkjke11 --create-workflow bwa-mem.cwl
 arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
 2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
 2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to zzzzz-4zz18-7e0hedrmkuyoei3
@@ -142,7 +146,7 @@ zzzzz-p5p6p-rjleou1dwr167v5
 You can provide a partial input file to set default values for the workflow input parameters.  You can also use the @--name@ option to set the name of the workflow:
 
 
-
~/arvados/doc/user/cwl/bwa-mem$ arvados-cwl-runner --name "My workflow with defaults" --create-workflow bwa-mem.cwl bwa-mem-template.yml
+
~/arvados/doc/user/cwl/bwa-mem$ arvados-cwl-runner --name "My workflow with defaults" --project-uuid zzzzz-j7d0g-p32bi47ogkjke11 --create-workflow bwa-mem.cwl bwa-mem-template.yml
 arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
 2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Upload local files: "bwa-mem.cwl"
 2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Uploaded to zzzzz-4zz18-0f91qkovk4ml18o
@@ -151,6 +155,20 @@ zzzzz-p5p6p-zuniv58hn8d0qd8
 
+Use @--update-workflow @ to update an existing workflow. + + +
~/arvados/doc/user/cwl/bwa-mem$ arvados-cwl-runner --update-workflow zzzzz-p5p6p-zuniv58hn8d0qd8 bwa-mem.cwl
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to zzzzz-4zz18-7e0hedrmkuyoei3
+2016-07-01 12:21:01 arvados.cwl-runner[15796] INFO: Created template zzzzz-p5p6p-zuniv58hn8d0qd8
+zzzzz-p5p6p-zuniv58hn8d0qd8
+
+
+ +When using @--create-workflow@ or @--update-workflow@, the @--copy-deps@ and @--match-submitter-images@ options are enabled by default. + h3. Running registered workflows at the command line You can run a registered workflow at the command line by its UUID: diff --git a/lib/config/load_test.go b/lib/config/load_test.go index abf3217056..4ae9a513c8 100644 --- a/lib/config/load_test.go +++ b/lib/config/load_test.go @@ -316,7 +316,11 @@ Clusters: ManagementToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa Collections: - BlobSigningKey: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa`, &logbuf).Load() + BlobSigningKey: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + InstanceTypes: + abc: + IncludedScratch: 123456 +`, &logbuf).Load() c.Assert(err, check.IsNil) yaml, err := yaml.Marshal(cfg) c.Assert(err, check.IsNil) diff --git a/lib/controller/router/router.go b/lib/controller/router/router.go index 05bdb4754f..586ea8e676 100644 --- a/lib/controller/router/router.go +++ b/lib/controller/router/router.go @@ -407,7 +407,7 @@ func (rtr *router) addRoutes() { }, { arvados.EndpointAPIClientAuthorizationList, - func() interface{} { return &arvados.ListOptions{} }, + func() interface{} { return &arvados.ListOptions{Limit: -1} }, func(ctx context.Context, opts interface{}) (interface{}, error) { return rtr.backend.APIClientAuthorizationList(ctx, *opts.(*arvados.ListOptions)) }, diff --git a/lib/controller/router/router_test.go b/lib/controller/router/router_test.go index ce440dac57..11b090a214 100644 --- a/lib/controller/router/router_test.go +++ b/lib/controller/router/router_test.go @@ -92,6 +92,12 @@ func (s *RouterSuite) TestOptions(c *check.C) { shouldCall: "CollectionList", withOptions: arvados.ListOptions{Limit: -1}, }, + { + method: "GET", + path: "/arvados/v1/api_client_authorizations", + shouldCall: "APIClientAuthorizationList", + withOptions: arvados.ListOptions{Limit: -1}, + }, { method: "GET", path: "/arvados/v1/collections?limit=123&offset=456&include_trash=true&include_old_versions=1", diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index c73b358ecc..21b629f37a 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -217,6 +217,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.") exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.") + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.") + exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.") + parser.add_argument( "--skip-schemas", action="store_true", @@ -363,5 +367,5 @@ def main(args=sys.argv[1:], logger_handler=arvados.log_handler, custom_schema_callback=add_arv_hints, loadingContext=executor.loadingContext, - runtimeContext=executor.runtimeContext, + runtimeContext=executor.toplevel_runtimeContext, input_required=not (arvargs.create_workflow or arvargs.update_workflow)) diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index f75bde81e6..5082cc2f4b 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -247,7 +247,8 @@ class ArvadosContainer(JobBase): runtimeContext.project_uuid, runtimeContext.force_docker_pull, runtimeContext.tmp_outdir_prefix, - runtimeContext.match_local_docker) + runtimeContext.match_local_docker, + runtimeContext.copy_deps) network_req, _ = self.get_requirement("NetworkAccess") if network_req: @@ -465,7 +466,7 @@ class RunnerContainer(Runner): "cwd": "/var/spool/cwl", "priority": self.priority, "state": "Committed", - "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image), + "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext), "mounts": { "/var/lib/cwl/cwl.input.json": { "kind": "json", @@ -500,7 +501,7 @@ class RunnerContainer(Runner): "portable_data_hash": "%s" % workflowcollection } else: - packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map) + packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext) workflowpath = "/var/lib/cwl/workflow.json#main" container_req["mounts"]["/var/lib/cwl/workflow.json"] = { "kind": "json", @@ -550,17 +551,17 @@ class RunnerContainer(Runner): if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes: command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes) - if self.on_error: + if runtimeContext.on_error: command.append("--on-error=" + self.on_error) - if self.intermediate_output_ttl: - command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl) + if runtimeContext.intermediate_output_ttl: + command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl) - if self.arvrunner.trash_intermediate: + if runtimeContext.trash_intermediate: command.append("--trash-intermediate") - if self.arvrunner.project_uuid: - command.append("--project-uuid="+self.arvrunner.project_uuid) + if runtimeContext.project_uuid: + command.append("--project-uuid="+runtimeContext.project_uuid) if self.enable_dev: command.append("--enable-dev") @@ -581,8 +582,8 @@ class RunnerContainer(Runner): def run(self, runtimeContext): runtimeContext.keepprefix = "keep:" job_spec = self.arvados_job_spec(runtimeContext) - if self.arvrunner.project_uuid: - job_spec["owner_uuid"] = self.arvrunner.project_uuid + if runtimeContext.project_uuid: + job_spec["owner_uuid"] = runtimeContext.project_uuid extra_submit_params = {} if runtimeContext.submit_runner_cluster: diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py index 04e2a4cffc..cf0b3b9daf 100644 --- a/sdk/cwl/arvados_cwl/arvdocker.py +++ b/sdk/cwl/arvados_cwl/arvdocker.py @@ -57,7 +57,7 @@ def determine_image_id(dockerImageId): def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid, - force_pull, tmp_outdir_prefix, match_local_docker): + force_pull, tmp_outdir_prefix, match_local_docker, copy_deps): """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker.""" if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement: @@ -80,11 +80,21 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid image_name = sp[0] image_tag = sp[1] if len(sp) > 1 else "latest" - images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3, + out_of_project_images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3, image_name=image_name, - image_tag=image_tag) + image_tag=image_tag, + project_uuid=None) - if images and match_local_docker: + if copy_deps: + # Only images that are available in the destination project + images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3, + image_name=image_name, + image_tag=image_tag, + project_uuid=project_uuid) + else: + images = out_of_project_images + + if match_local_docker: local_image_id = determine_image_id(dockerRequirement["dockerImageId"]) if local_image_id: # find it in the list @@ -98,15 +108,25 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid # force re-upload. images = [] + for i in out_of_project_images: + if i[1]["dockerhash"] == local_image_id: + found = True + out_of_project_images = [i] + break + if not found: + # force re-upload. + out_of_project_images = [] + if not images: - # Fetch Docker image if necessary. - try: - result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image, - force_pull, tmp_outdir_prefix) - if not result: - raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"]) - except OSError as e: - raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e)) + if not out_of_project_images: + # Fetch Docker image if necessary. + try: + result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image, + force_pull, tmp_outdir_prefix) + if not result: + raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"]) + except OSError as e: + raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e)) # Upload image to Arvados args = [] @@ -125,7 +145,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3, image_name=image_name, - image_tag=image_tag) + image_tag=image_tag, + project_uuid=project_uuid) if not images: raise WorkflowException("Could not find Docker image %s:%s" % (image_name, image_tag)) diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index 4fe82a6fe1..51e7cd8b9e 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -37,11 +37,12 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics') max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax") sum_res_pars = ("outdirMin", "outdirMax") -def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, +def upload_workflow(arvRunner, tool, job_order, project_uuid, + runtimeContext, uuid=None, submit_runner_ram=0, name=None, merged_map=None, submit_runner_image=None): - packed = packed_workflow(arvRunner, tool, merged_map) + packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext) adjustDirObjs(job_order, trim_listing) adjustFileObjs(job_order, trim_anonymous_location) @@ -57,7 +58,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, name = tool.tool.get("label", os.path.basename(tool.tool["id"])) upload_dependencies(arvRunner, name, tool.doc_loader, - packed, tool.tool["id"], False) + packed, tool.tool["id"], False, + runtimeContext) wf_runner_resources = None @@ -72,7 +74,9 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"} hints.append(wf_runner_resources) - wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, submit_runner_image or "arvados/jobs:"+__version__) + wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, + submit_runner_image or "arvados/jobs:"+__version__, + runtimeContext) if submit_runner_ram: wf_runner_resources["ramMin"] = submit_runner_ram @@ -194,7 +198,8 @@ class ArvadosWorkflow(Workflow): self.doc_loader, joborder, joborder.get("id", "#"), - False) + False, + runtimeContext) if self.wf_pdh is None: packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader) @@ -237,7 +242,8 @@ class ArvadosWorkflow(Workflow): self.doc_loader, packed, self.tool["id"], - False) + False, + runtimeContext) # Discover files/directories referenced by the # workflow (mainly "default" values) @@ -301,7 +307,7 @@ class ArvadosWorkflow(Workflow): if self.wf_pdh is None: adjustFileObjs(packed, keepmount) adjustDirObjs(packed, keepmount) - self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed) + self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext) self.loadingContext = self.loadingContext.copy() self.loadingContext.metadata = self.loadingContext.metadata.copy() diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py index 316250106b..64f85e2076 100644 --- a/sdk/cwl/arvados_cwl/context.py +++ b/sdk/cwl/arvados_cwl/context.py @@ -38,6 +38,7 @@ class ArvRuntimeContext(RuntimeContext): self.collection_cache_size = 256 self.match_local_docker = False self.enable_preemptible = None + self.copy_deps = None super(ArvRuntimeContext, self).__init__(kwargs) diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 680ca0b7b2..1759e4ac28 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -197,11 +197,11 @@ The 'jobs' API is no longer supported. handler = RuntimeStatusLoggingHandler(self.runtime_status_update) root_logger.addHandler(handler) - self.runtimeContext = ArvRuntimeContext(vars(arvargs)) - self.runtimeContext.make_fs_access = partial(CollectionFsAccess, + self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs)) + self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess, collection_cache=self.collection_cache) - validate_cluster_target(self, self.runtimeContext) + validate_cluster_target(self, self.toplevel_runtimeContext) def arv_make_tool(self, toolpath_object, loadingContext): @@ -517,7 +517,6 @@ The 'jobs' API is no longer supported. updated_tool.visit(self.check_features) - self.project_uuid = runtimeContext.project_uuid self.pipeline = None self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir) self.secret_store = runtimeContext.secret_store @@ -535,6 +534,8 @@ The 'jobs' API is no longer supported. if runtimeContext.submit_request_uuid and self.work_api != "containers": raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api)) + runtimeContext = runtimeContext.copy() + default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True]) if runtimeContext.storage_classes == "default": runtimeContext.storage_classes = default_storage_classes @@ -544,9 +545,25 @@ The 'jobs' API is no longer supported. if not runtimeContext.name: runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"]) + if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow): + # When creating or updating workflow record, by default + # always copy dependencies and ensure Docker images are up + # to date. + runtimeContext.copy_deps = True + runtimeContext.match_local_docker = True + + if runtimeContext.update_workflow and self.project_uuid is None: + # If we are updating a workflow, make sure anything that + # gets uploaded goes into the same parent project, unless + # an alternate --project-uuid was provided. + existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute() + runtimeContext.project_uuid = existing_wf["owner_uuid"] + + self.project_uuid = runtimeContext.project_uuid + # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % runtimeContext.name, - updated_tool, job_order) + updated_tool, job_order, runtimeContext) # the last clause means: if it is a command line tool, and we # are going to wait for the result, and always_submit_runner @@ -573,7 +590,7 @@ The 'jobs' API is no longer supported. # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. - merged_map = upload_workflow_deps(self, tool) + merged_map = upload_workflow_deps(self, tool, runtimeContext) # Recreate process object (ArvadosWorkflow or # ArvadosCommandTool) because tool document may have been @@ -584,17 +601,17 @@ The 'jobs' API is no longer supported. loadingContext.metadata = tool.metadata tool = load_tool(tool.tool, loadingContext) - existing_uuid = runtimeContext.update_workflow - if existing_uuid or runtimeContext.create_workflow: + if runtimeContext.update_workflow or runtimeContext.create_workflow: # Create a pipeline template or workflow record and exit. if self.work_api == "containers": uuid = upload_workflow(self, tool, job_order, - self.project_uuid, - uuid=existing_uuid, - submit_runner_ram=runtimeContext.submit_runner_ram, - name=runtimeContext.name, - merged_map=merged_map, - submit_runner_image=runtimeContext.submit_runner_image) + runtimeContext.project_uuid, + runtimeContext, + uuid=runtimeContext.update_workflow, + submit_runner_ram=runtimeContext.submit_runner_ram, + name=runtimeContext.name, + merged_map=merged_map, + submit_runner_image=runtimeContext.submit_runner_image) self.stdout.write(uuid + "\n") return (None, "success") @@ -603,7 +620,6 @@ The 'jobs' API is no longer supported. self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse self.eval_timeout = runtimeContext.eval_timeout - runtimeContext = runtimeContext.copy() runtimeContext.use_container = True runtimeContext.tmpdir_prefix = "tmp" runtimeContext.work_api = self.work_api diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index c8668afcac..f232178c5d 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -39,6 +39,7 @@ from cwltool.builder import Builder import schema_salad.validate as validate import arvados.collection +import arvados.util from .util import collectionUUID from ruamel.yaml import YAML from ruamel.yaml.comments import CommentedMap, CommentedSeq @@ -258,7 +259,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No set_secondary(fsaccess, builder, inputschema, None, primary, discovered) def upload_dependencies(arvrunner, name, document_loader, - workflowobj, uri, loadref_run, + workflowobj, uri, loadref_run, runtimeContext, include_primary=True, discovered_secondaryfiles=None): """Upload the dependencies of the workflowobj document to Keep. @@ -418,10 +419,16 @@ def upload_dependencies(arvrunner, name, document_loader, single_collection=True, optional_deps=optional_deps) + keeprefs = set() + def addkeepref(k): + if k.startswith("keep:"): + keeprefs.add(collection_pdh_pattern.match(k).group(1)) + def setloc(p): loc = p.get("location") if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")): p["location"] = mapper.mapper(p["location"]).resolved + addkeepref(p["location"]) return if not loc: @@ -443,7 +450,10 @@ def upload_dependencies(arvrunner, name, document_loader, gp = collection_uuid_pattern.match(loc) if not gp: + # Not a uuid pattern (must be a pdh pattern) + addkeepref(p["location"]) return + uuid = gp.groups()[0] if uuid not in uuid_map: raise SourceLine(p, "location", validate.ValidationException).makeError( @@ -458,6 +468,38 @@ def upload_dependencies(arvrunner, name, document_loader, for d in discovered: discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d] + if runtimeContext.copy_deps: + # Find referenced collections and copy them into the + # destination project, for easy sharing. + already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list, + filters=[["portable_data_hash", "in", list(keeprefs)], + ["owner_uuid", "=", runtimeContext.project_uuid]], + select=["uuid", "portable_data_hash", "created_at"])) + + keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present) + for kr in keeprefs: + col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]], + order="created_at desc", + select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"], + limit=1).execute() + if len(col["items"]) == 0: + logger.warning("Cannot find collection with portable data hash %s", kr) + continue + col = col["items"][0] + try: + arvrunner.api.collections().create(body={"collection": { + "owner_uuid": runtimeContext.project_uuid, + "name": col["name"], + "description": col["description"], + "properties": col["properties"], + "portable_data_hash": col["portable_data_hash"], + "manifest_text": col["manifest_text"], + "storage_classes_desired": col["storage_classes_desired"], + "trash_at": col["trash_at"] + }}, ensure_unique_name=True).execute() + except Exception as e: + logger.warning("Unable copy collection to destination: %s", e) + if "$schemas" in workflowobj: sch = CommentedSeq() for s in workflowobj["$schemas"]: @@ -468,32 +510,36 @@ def upload_dependencies(arvrunner, name, document_loader, return mapper -def upload_docker(arvrunner, tool): +def upload_docker(arvrunner, tool, runtimeContext): """Uploads Docker images used in CommandLineTool objects.""" if isinstance(tool, CommandLineTool): (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement") if docker_req: if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers": - # TODO: can be supported by containers API, but not jobs API. raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) else: arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__}, - True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: - upload_docker(arvrunner, s.embedded_tool) + upload_docker(arvrunner, s.embedded_tool, runtimeContext) -def packed_workflow(arvrunner, tool, merged_map): +def packed_workflow(arvrunner, tool, merged_map, runtimeContext): """Create a packed workflow. A "packed" workflow is one where all the components have been combined into a single document.""" @@ -522,10 +568,11 @@ def packed_workflow(arvrunner, tool, merged_map): v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] if v.get("class") == "DockerRequirement": v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, - arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -546,7 +593,7 @@ def tag_git_version(packed): packed["http://schema.org/version"] = githash -def upload_job_order(arvrunner, name, tool, job_order): +def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): """Upload local files referenced in the input object and return updated input object with 'location' updated to the proper keep references. """ @@ -582,7 +629,8 @@ def upload_job_order(arvrunner, name, tool, job_order): tool.doc_loader, job_order, job_order.get("id", "#"), - False) + False, + runtimeContext) if "id" in job_order: del job_order["id"] @@ -596,10 +644,10 @@ def upload_job_order(arvrunner, name, tool, job_order): FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"]) -def upload_workflow_deps(arvrunner, tool): +def upload_workflow_deps(arvrunner, tool, runtimeContext): # Ensure that Docker images needed by this workflow are available - upload_docker(arvrunner, tool) + upload_docker(arvrunner, tool, runtimeContext) document_loader = tool.doc_loader @@ -614,6 +662,7 @@ def upload_workflow_deps(arvrunner, tool): deptool, deptool["id"], False, + runtimeContext, include_primary=False, discovered_secondaryfiles=discovered_secondaryfiles) document_loader.idx[deptool["id"]] = deptool @@ -626,19 +675,22 @@ def upload_workflow_deps(arvrunner, tool): return merged_map -def arvados_jobs_image(arvrunner, img): +def arvados_jobs_image(arvrunner, img, runtimeContext): """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it.""" try: - return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, + True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) ) -def upload_workflow_collection(arvrunner, name, packed): +def upload_workflow_collection(arvrunner, name, packed, runtimeContext): collection = arvados.collection.Collection(api_client=arvrunner.api, keep_client=arvrunner.keep_client, num_retries=arvrunner.num_retries) @@ -647,15 +699,15 @@ def upload_workflow_collection(arvrunner, name, packed): filters = [["portable_data_hash", "=", collection.portable_data_hash()], ["name", "like", name+"%"]] - if arvrunner.project_uuid: - filters.append(["owner_uuid", "=", arvrunner.project_uuid]) + if runtimeContext.project_uuid: + filters.append(["owner_uuid", "=", runtimeContext.project_uuid]) exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries) if exists["items"]: logger.info("Using collection %s", exists["items"][0]["uuid"]) else: collection.save_new(name=name, - owner_uuid=arvrunner.project_uuid, + owner_uuid=runtimeContext.project_uuid, ensure_unique_name=True, num_retries=arvrunner.num_retries) logger.info("Uploaded to %s", collection.manifest_locator()) diff --git a/sdk/cwl/tests/19070-copy-deps.cwl b/sdk/cwl/tests/19070-copy-deps.cwl new file mode 100644 index 0000000000..b0d61700ef --- /dev/null +++ b/sdk/cwl/tests/19070-copy-deps.cwl @@ -0,0 +1,17 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +cwlVersion: v1.2 +class: CommandLineTool +baseCommand: echo +inputs: + message: + type: File + inputBinding: + position: 1 + default: + class: File + location: keep:d7514270f356df848477718d58308cc4+94/b + +outputs: [] diff --git a/sdk/cwl/tests/arvados-tests.sh b/sdk/cwl/tests/arvados-tests.sh index 9cb5234cf0..1bbaa505e9 100755 --- a/sdk/cwl/tests/arvados-tests.sh +++ b/sdk/cwl/tests/arvados-tests.sh @@ -18,6 +18,15 @@ if ! arv-get 20850f01122e860fb878758ac1320877+71 > /dev/null ; then arv-put --portable-data-hash samples/sample1_S01_R1_001.fastq.gz fi +# Test for #18888 +# This is a standalone test because the bug was observed with this +# command line and was thought to be due to command line handling. arvados-cwl-runner 18888-download_def.cwl --scripts scripts/ +# Test for #19070 +# The most effective way to test this seemed to be to write an +# integration test to check for the expected behavior. +python test_copy_deps.py + +# Run integration tests exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum --api=containers diff --git a/sdk/cwl/tests/scripts/download_all_data.sh b/sdk/cwl/tests/scripts/download_all_data.sh index d3a9d78762..7c769b5848 100755 --- a/sdk/cwl/tests/scripts/download_all_data.sh +++ b/sdk/cwl/tests/scripts/download_all_data.sh @@ -1,7 +1,7 @@ +#!/bin/sh + # Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: Apache-2.0 -#!/bin/bash - echo bubble diff --git a/sdk/cwl/tests/test_copy_deps.py b/sdk/cwl/tests/test_copy_deps.py new file mode 100644 index 0000000000..853a7d3609 --- /dev/null +++ b/sdk/cwl/tests/test_copy_deps.py @@ -0,0 +1,152 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +import arvados +import subprocess + +api = arvados.api() + +def check_contents(group, wf_uuid): + contents = api.groups().contents(uuid=group["uuid"]).execute() + if len(contents["items"]) != 3: + raise Exception("Expected 3 items in "+group["uuid"]+" was "+len(contents["items"])) + + found = False + for c in contents["items"]: + if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid: + found = True + if not found: + raise Exception("Couldn't find workflow in "+group["uuid"]) + + found = False + for c in contents["items"]: + if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94": + found = True + if not found: + raise Exception("Couldn't find collection dependency") + + found = False + for c in contents["items"]: + if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"): + found = True + if not found: + raise Exception("Couldn't find jobs image dependency") + + +def test_create(): + group = api.groups().create(body={"group": {"name": "test-19070-project-1", "group_class": "project"}}, ensure_unique_name=True).execute() + try: + contents = api.groups().contents(uuid=group["uuid"]).execute() + if len(contents["items"]) != 0: + raise Exception("Expected 0 items") + + # Create workflow, by default should also copy dependencies + cmd = ["arvados-cwl-runner", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"] + print(" ".join(cmd)) + wf_uuid = subprocess.check_output(cmd) + wf_uuid = wf_uuid.decode("utf-8").strip() + check_contents(group, wf_uuid) + finally: + api.groups().delete(uuid=group["uuid"]).execute() + + +def test_update(): + group = api.groups().create(body={"group": {"name": "test-19070-project-2", "group_class": "project"}}, ensure_unique_name=True).execute() + try: + contents = api.groups().contents(uuid=group["uuid"]).execute() + if len(contents["items"]) != 0: + raise Exception("Expected 0 items") + + # Create workflow, but with --no-copy-deps it shouldn't copy anything + cmd = ["arvados-cwl-runner", "--no-copy-deps", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"] + print(" ".join(cmd)) + wf_uuid = subprocess.check_output(cmd) + wf_uuid = wf_uuid.decode("utf-8").strip() + + contents = api.groups().contents(uuid=group["uuid"]).execute() + if len(contents["items"]) != 1: + raise Exception("Expected 1 items") + + found = False + for c in contents["items"]: + if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid: + found = True + if not found: + raise Exception("Couldn't find workflow") + + # Updating by default will copy missing items + cmd = ["arvados-cwl-runner", "--update-workflow", wf_uuid, "19070-copy-deps.cwl"] + print(" ".join(cmd)) + wf_uuid = subprocess.check_output(cmd) + wf_uuid = wf_uuid.decode("utf-8").strip() + check_contents(group, wf_uuid) + + finally: + api.groups().delete(uuid=group["uuid"]).execute() + + +def test_execute(): + group = api.groups().create(body={"group": {"name": "test-19070-project-3", "group_class": "project"}}, ensure_unique_name=True).execute() + try: + contents = api.groups().contents(uuid=group["uuid"]).execute() + if len(contents["items"]) != 0: + raise Exception("Expected 0 items") + + # Execute workflow, shouldn't copy anything. + cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"] + print(" ".join(cmd)) + wf_uuid = subprocess.check_output(cmd) + wf_uuid = wf_uuid.decode("utf-8").strip() + + contents = api.groups().contents(uuid=group["uuid"]).execute() + # container request + # final output collection + # container log + # step output collection + # container request log + if len(contents["items"]) != 5: + raise Exception("Expected 5 items") + + found = False + for c in contents["items"]: + if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94": + found = True + if found: + raise Exception("Didn't expect to find collection dependency") + + found = False + for c in contents["items"]: + if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"): + found = True + if found: + raise Exception("Didn't expect to find jobs image dependency") + + # Execute workflow with --copy-deps + cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "--copy-deps", "19070-copy-deps.cwl"] + print(" ".join(cmd)) + wf_uuid = subprocess.check_output(cmd) + wf_uuid = wf_uuid.decode("utf-8").strip() + + contents = api.groups().contents(uuid=group["uuid"]).execute() + found = False + for c in contents["items"]: + if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94": + found = True + if not found: + raise Exception("Couldn't find collection dependency") + + found = False + for c in contents["items"]: + if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"): + found = True + if not found: + raise Exception("Couldn't find jobs image dependency") + + finally: + api.groups().delete(uuid=group["uuid"]).execute() + +if __name__ == '__main__': + test_create() + test_update() + test_execute() diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py index 5092fc4575..305d51e144 100644 --- a/sdk/cwl/tests/test_submit.py +++ b/sdk/cwl/tests/test_submit.py @@ -47,12 +47,14 @@ _rootDesc = None def stubs(func): @functools.wraps(func) + @mock.patch("arvados_cwl.arvdocker.determine_image_id") @mock.patch("uuid.uuid4") @mock.patch("arvados.commands.keepdocker.list_images_in_arv") @mock.patch("arvados.collection.KeepClient") @mock.patch("arvados.keep.KeepClient") @mock.patch("arvados.events.subscribe") - def wrapped(self, events, keep_client1, keep_client2, keepdocker, uuid4, *args, **kwargs): + def wrapped(self, events, keep_client1, keep_client2, keepdocker, + uuid4, determine_image_id, *args, **kwargs): class Stubs(object): pass stubs = Stubs() @@ -63,6 +65,8 @@ def stubs(func): "df80736f-f14d-4b10-b2e3-03aa27f034b2", "df80736f-f14d-4b10-b2e3-03aa27f034b3", "df80736f-f14d-4b10-b2e3-03aa27f034b4", "df80736f-f14d-4b10-b2e3-03aa27f034b5"] + determine_image_id.return_value = None + def putstub(p, **kwargs): return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p)) keep_client1().put.side_effect = putstub @@ -77,7 +81,7 @@ def stubs(func): "arvados/jobs:123": [("zzzzz-4zz18-zzzzzzzzzzzzzd5", {})], "arvados/jobs:latest": [("zzzzz-4zz18-zzzzzzzzzzzzzd6", {})], } - def kd(a, b, image_name=None, image_tag=None): + def kd(a, b, image_name=None, image_tag=None, project_uuid=None): return stubs.docker_images.get("%s:%s" % (image_name, image_tag), []) stubs.keepdocker.side_effect = kd @@ -1077,6 +1081,18 @@ class TestSubmit(unittest.TestCase): "name": "arvados/jobs:"+arvados_cwl.__version__, "owner_uuid": "", "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0}, + {"items": [{"created_at": "", + "head_uuid": "", + "link_class": "docker_image_hash", + "name": "123456", + "owner_uuid": "", + "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0}, + {"items": [{"created_at": "", + "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb", + "link_class": "docker_image_repo+tag", + "name": "arvados/jobs:"+arvados_cwl.__version__, + "owner_uuid": "", + "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0}, {"items": [{"created_at": "", "head_uuid": "", "link_class": "docker_image_hash", @@ -1090,12 +1106,18 @@ class TestSubmit(unittest.TestCase): "owner_uuid": "", "manifest_text": "", "properties": "" - }], "items_available": 1, "offset": 0},) + }], "items_available": 1, "offset": 0}, + {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb", + "owner_uuid": "", + "manifest_text": "", + "properties": "" + }], "items_available": 1, "offset": 0}) arvrunner.api.collections().create().execute.return_value = {"uuid": ""} arvrunner.api.collections().get().execute.return_value = {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb", "portable_data_hash": "9999999999999999999999999999999b+99"} + self.assertEqual("9999999999999999999999999999999b+99", - arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__)) + arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__, arvrunner.runtimeContext)) @stubs @@ -1599,6 +1621,9 @@ class TestCreateWorkflow(unittest.TestCase): @stubs def test_update(self, stubs): + project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz' + stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid} + exited = arvados_cwl.main( ["--update-workflow", self.existing_workflow_uuid, "--debug", @@ -1610,6 +1635,7 @@ class TestCreateWorkflow(unittest.TestCase): "name": "submit_wf.cwl", "description": "", "definition": self.expect_workflow, + "owner_uuid": project_uuid } } stubs.api.workflows().update.assert_called_with( @@ -1622,6 +1648,9 @@ class TestCreateWorkflow(unittest.TestCase): @stubs def test_update_name(self, stubs): + project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz' + stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid} + exited = arvados_cwl.main( ["--update-workflow", self.existing_workflow_uuid, "--debug", "--name", "testing 123", @@ -1633,6 +1662,7 @@ class TestCreateWorkflow(unittest.TestCase): "name": "testing 123", "description": "", "definition": self.expect_workflow, + "owner_uuid": project_uuid } } stubs.api.workflows().update.assert_called_with( diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index f0adcda5f1..6a90c30ce4 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -424,11 +424,11 @@ type CUDAFeatures struct { } type InstanceType struct { - Name string + Name string `json:"-"` ProviderType string VCPUs int RAM ByteSize - Scratch ByteSize + Scratch ByteSize `json:"-"` IncludedScratch ByteSize AddedScratch ByteSize Price float64 @@ -528,49 +528,23 @@ type InstanceTypeMap map[string]InstanceType var errDuplicateInstanceTypeName = errors.New("duplicate instance type name") -// UnmarshalJSON handles old config files that provide an array of -// instance types instead of a hash. +// UnmarshalJSON does special handling of InstanceTypes: +// * populate computed fields (Name and Scratch) +// * error out if InstancesTypes are populated as an array, which was +// deprecated in Arvados 1.2.0 func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error { fixup := func(t InstanceType) (InstanceType, error) { if t.ProviderType == "" { t.ProviderType = t.Name } - if t.Scratch == 0 { - t.Scratch = t.IncludedScratch + t.AddedScratch - } else if t.AddedScratch == 0 { - t.AddedScratch = t.Scratch - t.IncludedScratch - } else if t.IncludedScratch == 0 { - t.IncludedScratch = t.Scratch - t.AddedScratch - } - - if t.Scratch != (t.IncludedScratch + t.AddedScratch) { - return t, fmt.Errorf("InstanceType %q: Scratch != (IncludedScratch + AddedScratch)", t.Name) - } + // If t.Scratch is set in the configuration file, it will be ignored and overwritten. + // It will also generate a "deprecated or unknown config entry" warning. + t.Scratch = t.IncludedScratch + t.AddedScratch return t, nil } if len(data) > 0 && data[0] == '[' { - var arr []InstanceType - err := json.Unmarshal(data, &arr) - if err != nil { - return err - } - if len(arr) == 0 { - *it = nil - return nil - } - *it = make(map[string]InstanceType, len(arr)) - for _, t := range arr { - if _, ok := (*it)[t.Name]; ok { - return errDuplicateInstanceTypeName - } - t, err := fixup(t) - if err != nil { - return err - } - (*it)[t.Name] = t - } - return nil + return fmt.Errorf("InstanceTypes must be specified as a map, not an array, see https://doc.arvados.org/admin/config.html") } var hash map[string]InstanceType err := json.Unmarshal(data, &hash) diff --git a/sdk/go/arvados/config_test.go b/sdk/go/arvados/config_test.go index 8c77e29287..58f4b961bb 100644 --- a/sdk/go/arvados/config_test.go +++ b/sdk/go/arvados/config_test.go @@ -15,7 +15,7 @@ var _ = check.Suite(&ConfigSuite{}) type ConfigSuite struct{} -func (s *ConfigSuite) TestInstanceTypesAsArray(c *check.C) { +func (s *ConfigSuite) TestStringSetAsArray(c *check.C) { var cluster Cluster yaml.Unmarshal([]byte(` API: @@ -25,13 +25,6 @@ API: c.Check(ok, check.Equals, true) } -func (s *ConfigSuite) TestStringSetAsArray(c *check.C) { - var cluster Cluster - yaml.Unmarshal([]byte("InstanceTypes:\n- Name: foo\n"), &cluster) - c.Check(len(cluster.InstanceTypes), check.Equals, 1) - c.Check(cluster.InstanceTypes["foo"].Name, check.Equals, "foo") -} - func (s *ConfigSuite) TestInstanceTypesAsHash(c *check.C) { var cluster Cluster yaml.Unmarshal([]byte("InstanceTypes:\n foo:\n ProviderType: bar\n"), &cluster) @@ -42,18 +35,16 @@ func (s *ConfigSuite) TestInstanceTypesAsHash(c *check.C) { func (s *ConfigSuite) TestInstanceTypeSize(c *check.C) { var it InstanceType - err := yaml.Unmarshal([]byte("Name: foo\nScratch: 4GB\nRAM: 4GiB\n"), &it) + err := yaml.Unmarshal([]byte("Name: foo\nIncludedScratch: 4GB\nRAM: 4GiB\n"), &it) c.Check(err, check.IsNil) - c.Check(int64(it.Scratch), check.Equals, int64(4000000000)) + c.Check(int64(it.IncludedScratch), check.Equals, int64(4000000000)) c.Check(int64(it.RAM), check.Equals, int64(4294967296)) } func (s *ConfigSuite) TestInstanceTypeFixup(c *check.C) { for _, confdata := range []string{ // Current format: map of entries - `{foo4: {IncludedScratch: 4GB}, foo8: {ProviderType: foo_8, Scratch: 8GB}}`, - // Legacy format: array of entries with key in "Name" field - `[{Name: foo4, IncludedScratch: 4GB}, {Name: foo8, ProviderType: foo_8, Scratch: 8GB}]`, + `{foo4: {IncludedScratch: 4GB}, foo8: {ProviderType: foo_8, AddedScratch: 8GB}}`, } { c.Log(confdata) var itm InstanceTypeMap diff --git a/sdk/go/health/aggregator.go b/sdk/go/health/aggregator.go index 5e010d88bc..f473eff353 100644 --- a/sdk/go/health/aggregator.go +++ b/sdk/go/health/aggregator.go @@ -29,10 +29,14 @@ import ( "git.arvados.org/arvados.git/sdk/go/auth" "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/ghodss/yaml" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) -const defaultTimeout = arvados.Duration(2 * time.Second) +const ( + defaultTimeout = arvados.Duration(2 * time.Second) + maxClockSkew = time.Minute +) // Aggregator implements service.Handler. It handles "GET /_health/all" // by checking the health of all configured services on the cluster @@ -46,6 +50,9 @@ type Aggregator struct { // If non-nil, Log is called after handling each request. Log func(*http.Request, error) + + // If non-nil, report clock skew on each health-check. + MetricClockSkew prometheus.Gauge } func (agg *Aggregator) setup() { @@ -114,6 +121,10 @@ type ClusterHealthResponse struct { // anywhere." Services map[arvados.ServiceName]ServiceHealth `json:"services"` + // Difference between min/max timestamps in individual + // health-check responses. + ClockSkew arvados.Duration + Errors []string `json:"errors"` } @@ -124,7 +135,9 @@ type CheckResult struct { HTTPStatusText string `json:",omitempty"` Response map[string]interface{} `json:"response"` ResponseTime json.Number `json:"responseTime"` + ClockTime time.Time `json:"clockTime"` Metrics Metrics `json:"-"` + respTime time.Duration } type Metrics struct { @@ -225,6 +238,33 @@ func (agg *Aggregator) ClusterHealth() ClusterHealthResponse { } } + var maxResponseTime time.Duration + var clockMin, clockMax time.Time + for _, result := range resp.Checks { + if result.ClockTime.IsZero() { + continue + } + if clockMin.IsZero() || result.ClockTime.Before(clockMin) { + clockMin = result.ClockTime + } + if result.ClockTime.After(clockMax) { + clockMax = result.ClockTime + } + if result.respTime > maxResponseTime { + maxResponseTime = result.respTime + } + } + skew := clockMax.Sub(clockMin) + resp.ClockSkew = arvados.Duration(skew) + if skew > maxClockSkew+maxResponseTime { + msg := fmt.Sprintf("clock skew detected: maximum timestamp spread is %s (exceeds warning threshold of %s)", resp.ClockSkew, arvados.Duration(maxClockSkew)) + resp.Errors = append(resp.Errors, msg) + resp.Health = "ERROR" + } + if agg.MetricClockSkew != nil { + agg.MetricClockSkew.Set(skew.Seconds()) + } + var newest Metrics for _, result := range resp.Checks { if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) { @@ -256,7 +296,8 @@ func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) { func (agg *Aggregator) ping(target *url.URL) (result CheckResult) { t0 := time.Now() defer func() { - result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds())) + result.respTime = time.Since(t0) + result.ResponseTime = json.Number(fmt.Sprintf("%.6f", result.respTime.Seconds())) }() result.Health = "ERROR" @@ -304,6 +345,7 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) { } } result.Health = "OK" + result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date")) return } diff --git a/sdk/go/health/aggregator_test.go b/sdk/go/health/aggregator_test.go index f8f7ff9f1b..5f60cf67f3 100644 --- a/sdk/go/health/aggregator_test.go +++ b/sdk/go/health/aggregator_test.go @@ -220,6 +220,40 @@ func (s *AggregatorSuite) TestConfigMismatch(c *check.C) { s.checkOK(c) } +func (s *AggregatorSuite) TestClockSkew(c *check.C) { + // srv1: report real wall clock time + handler1 := healthyHandler{} + srv1, listen1 := s.stubServer(&handler1) + defer srv1.Close() + // srv2: report near-future time + handler2 := healthyHandler{headerDate: time.Now().Add(3 * time.Second)} + srv2, listen2 := s.stubServer(&handler2) + defer srv2.Close() + // srv3: report far-future time + handler3 := healthyHandler{headerDate: time.Now().Add(3*time.Minute + 3*time.Second)} + srv3, listen3 := s.stubServer(&handler3) + defer srv3.Close() + + s.setAllServiceURLs(listen1) + + // near-future time => OK + s.resp = httptest.NewRecorder() + arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud, + "http://localhost"+listen2+"/") + s.handler.ServeHTTP(s.resp, s.req) + s.checkOK(c) + + // far-future time => error + s.resp = httptest.NewRecorder() + arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, + "http://localhost"+listen3+"/") + s.handler.ServeHTTP(s.resp, s.req) + resp := s.checkUnhealthy(c) + if c.Check(len(resp.Errors) > 0, check.Equals, true) { + c.Check(resp.Errors[0], check.Matches, `clock skew detected: maximum timestamp spread is 3m.* \(exceeds warning threshold of 1m\)`) + } +} + func (s *AggregatorSuite) TestPingTimeout(c *check.C) { s.handler.timeout = arvados.Duration(100 * time.Millisecond) srv, listen := s.stubServer(&slowHandler{}) @@ -321,9 +355,13 @@ func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) type healthyHandler struct { configHash string configTime time.Time + headerDate time.Time } func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + if !h.headerDate.IsZero() { + resp.Header().Set("Date", h.headerDate.Format(time.RFC1123)) + } authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken if req.URL.Path == "/_health/ping" { if !authOK { diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py index 537ea3a945..db4edd2dfa 100644 --- a/sdk/python/arvados/commands/keepdocker.py +++ b/sdk/python/arvados/commands/keepdocker.py @@ -85,7 +85,8 @@ class DockerError(Exception): def popen_docker(cmd, *args, **kwargs): manage_stdin = ('stdin' not in kwargs) kwargs.setdefault('stdin', subprocess.PIPE) - kwargs.setdefault('stdout', sys.stderr) + kwargs.setdefault('stdout', subprocess.PIPE) + kwargs.setdefault('stderr', subprocess.PIPE) try: docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs) except OSError: # No docker in $PATH, try docker.io @@ -257,7 +258,7 @@ def _new_image_listing(link, dockerhash, repo='', tag=''): 'tag': tag, } -def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None): +def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None): """List all Docker images known to the api_client with image_name and image_tag. If no image_name is given, defaults to listing all Docker images. @@ -272,13 +273,18 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None) search_filters = [] repo_links = None hash_links = None + + project_filter = [] + if project_uuid is not None: + project_filter = [["owner_uuid", "=", project_uuid]] + if image_name: # Find images with the name the user specified. search_links = _get_docker_links( api_client, num_retries, filters=[['link_class', '=', 'docker_image_repo+tag'], ['name', '=', - '{}:{}'.format(image_name, image_tag or 'latest')]]) + '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter) if search_links: repo_links = search_links else: @@ -286,7 +292,7 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None) search_links = _get_docker_links( api_client, num_retries, filters=[['link_class', '=', 'docker_image_hash'], - ['name', 'ilike', image_name + '%']]) + ['name', 'ilike', image_name + '%']]+project_filter) hash_links = search_links # Only list information about images that were found in the search. search_filters.append(['head_uuid', 'in', @@ -298,7 +304,7 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None) if hash_links is None: hash_links = _get_docker_links( api_client, num_retries, - filters=search_filters + [['link_class', '=', 'docker_image_hash']]) + filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter) hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)} # Each collection may have more than one name (though again, one name @@ -308,7 +314,7 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None) repo_links = _get_docker_links( api_client, num_retries, filters=search_filters + [['link_class', '=', - 'docker_image_repo+tag']]) + 'docker_image_repo+tag']]+project_filter) seen_image_names = collections.defaultdict(set) images = [] for link in repo_links: @@ -336,7 +342,7 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None) # Remove any image listings that refer to unknown collections. existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all( api_client.collections().list, num_retries, - filters=[['uuid', 'in', [im['collection'] for im in images]]], + filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter, select=['uuid'])} return [(image['collection'], image) for image in images if image['collection'] in existing_coll_uuids] @@ -385,18 +391,25 @@ def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None) if args.pull and not find_image_hashes(args.image): pull_image(args.image, args.tag) + images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag) + + image_hash = None try: image_hash = find_one_image_hash(args.image, args.tag) + if not docker_image_compatible(api, image_hash): + if args.force_image_format: + logger.warning("forcing incompatible image") + else: + logger.error("refusing to store " \ + "incompatible format (use --force-image-format to override)") + sys.exit(1) except DockerError as error: - logger.error(str(error)) - sys.exit(1) - - if not docker_image_compatible(api, image_hash): - if args.force_image_format: - logger.warning("forcing incompatible image") + if images_in_arv: + # We don't have Docker / we don't have the image locally, + # use image that's already uploaded to Arvados + image_hash = images_in_arv[0][1]['dockerhash'] else: - logger.error("refusing to store " \ - "incompatible format (use --force-image-format to override)") + logger.error(str(error)) sys.exit(1) image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None diff --git a/sdk/python/tests/test_arv_keepdocker.py b/sdk/python/tests/test_arv_keepdocker.py index fd3a69cae4..8fbfad4377 100644 --- a/sdk/python/tests/test_arv_keepdocker.py +++ b/sdk/python/tests/test_arv_keepdocker.py @@ -48,11 +48,13 @@ class ArvKeepdockerTestCase(unittest.TestCase, tutil.VersionChecker): self.run_arv_keepdocker(['--version'], sys.stderr) self.assertVersionOutput(out, err) + @mock.patch('arvados.commands.keepdocker.list_images_in_arv', + return_value=[]) @mock.patch('arvados.commands.keepdocker.find_image_hashes', return_value=['abc123']) @mock.patch('arvados.commands.keepdocker.find_one_image_hash', return_value='abc123') - def test_image_format_compatibility(self, _1, _2): + def test_image_format_compatibility(self, _1, _2, _3): old_id = hashlib.sha256(b'old').hexdigest() new_id = 'sha256:'+hashlib.sha256(b'new').hexdigest() for supported, img_id, expect_ok in [ @@ -152,11 +154,13 @@ class ArvKeepdockerTestCase(unittest.TestCase, tutil.VersionChecker): self.run_arv_keepdocker(['[::1]/repo/img'], sys.stderr) find_image_mock.assert_called_with('[::1]/repo/img', 'latest') + @mock.patch('arvados.commands.keepdocker.list_images_in_arv', + return_value=[]) @mock.patch('arvados.commands.keepdocker.find_image_hashes', return_value=['abc123']) @mock.patch('arvados.commands.keepdocker.find_one_image_hash', return_value='abc123') - def test_collection_property_update(self, _1, _2): + def test_collection_property_update(self, _1, _2, _3): image_id = 'sha256:'+hashlib.sha256(b'image').hexdigest() fakeDD = arvados.api('v1')._rootDesc fakeDD['dockerImageFormats'] = ['v2'] diff --git a/services/health/main.go b/services/health/main.go index bc57d36d04..92bd377c80 100644 --- a/services/health/main.go +++ b/services/health/main.go @@ -20,8 +20,18 @@ var ( command cmd.Handler = service.Command(arvados.ServiceNameHealth, newHandler) ) -func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler { - return &health.Aggregator{Cluster: cluster} +func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, reg *prometheus.Registry) service.Handler { + mClockSkew := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "health", + Name: "clock_skew_seconds", + Help: "Clock skew observed in most recent health check", + }) + reg.MustRegister(mClockSkew) + return &health.Aggregator{ + Cluster: cluster, + MetricClockSkew: mClockSkew, + } } func main() {