From: Lucas Di Pentima Date: Tue, 10 Jul 2018 18:31:50 +0000 (-0300) Subject: Merge branch '13668-api-wb-package-version' X-Git-Tag: 1.2.0~84 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/eead78e2eb11528af8cb862dcb6f9a41737a8a14?hp=2c87b580a87a55010da626d352307343f75d6d3a Merge branch '13668-api-wb-package-version' Closes #13668 Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima --- diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index c0c6f77221..948a9a46fe 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -11,6 +11,7 @@ import datetime import ciso8601 import uuid +from arvados_cwl.util import get_current_container, get_intermediate_collection_info import ruamel.yaml as yaml from cwltool.errors import WorkflowException @@ -165,8 +166,14 @@ class ArvadosContainer(JobBase): keepemptydirs(vwd) - with Perf(metrics, "generatefiles.save_new %s" % self.name): - vwd.save_new() + if not runtimeContext.current_container: + runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) + info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) + vwd.save_new(name=info["name"], + owner_uuid=self.arvrunner.project_uuid, + ensure_unique_name=True, + trash_at=info["trash_at"], + properties=info["properties"]) prev = None for f, p in sorteditems: @@ -242,6 +249,7 @@ class ArvadosContainer(JobBase): if self.timelimit is not None: scheduling_parameters["max_run_time"] = self.timelimit + container_request["output_name"] = "Output for step %s" % (self.name) container_request["output_ttl"] = self.output_ttl container_request["mounts"] = mounts container_request["secret_mounts"] = secret_mounts diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 70c2173db9..1287fbb6ea 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -18,6 +18,7 @@ from cwltool.job import JobBase from schema_salad.sourceline import SourceLine +from arvados_cwl.util import get_current_container, get_intermediate_collection_info import ruamel.yaml as yaml import arvados.collection @@ -76,7 +77,14 @@ class ArvadosJob(JobBase): if vwd: with Perf(metrics, "generatefiles.save_new %s" % self.name): - vwd.save_new() + if not runtimeContext.current_container: + runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) + info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) + vwd.save_new(name=info["name"], + owner_uuid=self.arvrunner.project_uuid, + ensure_unique_name=True, + trash_at=info["trash_at"], + properties=info["properties"]) for f, p in generatemapper.items(): if p.type == "File": diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py index cf0c1fb7e4..81e256ed54 100644 --- a/sdk/cwl/arvados_cwl/context.py +++ b/sdk/cwl/arvados_cwl/context.py @@ -1,3 +1,7 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + from cwltool.context import LoadingContext, RuntimeContext class ArvLoadingContext(LoadingContext): @@ -24,5 +28,6 @@ class ArvRuntimeContext(RuntimeContext): self.wait = True self.cwl_runner_job = None self.storage_classes = "default" + self.current_container = None super(ArvRuntimeContext, self).__init__(kwargs) diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py index 05a358e0d5..d083b78f5a 100644 --- a/sdk/cwl/arvados_cwl/pathmapper.py +++ b/sdk/cwl/arvados_cwl/pathmapper.py @@ -8,11 +8,13 @@ import uuid import os import urllib +from arvados_cwl.util import get_current_container, get_intermediate_collection_info import arvados.commands.run import arvados.collection from schema_salad.sourceline import SourceLine +from arvados.errors import ApiError from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs from cwltool.workflow import WorkflowException @@ -153,9 +155,14 @@ class ArvPathMapper(PathMapper): for l in srcobj.get("listing", []): self.addentry(l, c, ".", remap) - check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries) - if not check["items"]: - c.save_new(owner_uuid=self.arvrunner.project_uuid) + container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) + info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl) + + c.save_new(name=info["name"], + owner_uuid=self.arvrunner.project_uuid, + ensure_unique_name=True, + trash_at=info["trash_at"], + properties=info["properties"]) ab = self.collection_pattern % c.portable_data_hash() self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True) @@ -167,9 +174,14 @@ class ArvPathMapper(PathMapper): num_retries=self.arvrunner.num_retries ) self.addentry(srcobj, c, ".", remap) - check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries) - if not check["items"]: - c.save_new(owner_uuid=self.arvrunner.project_uuid) + container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) + info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl) + + c.save_new(name=info["name"], + owner_uuid=self.arvrunner.project_uuid, + ensure_unique_name=True, + trash_at=info["trash_at"], + properties=info["properties"]) ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"]) self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]), @@ -202,6 +214,7 @@ class ArvPathMapper(PathMapper): else: return None + class StagingPathMapper(PathMapper): _follow_dirs = True diff --git a/sdk/cwl/arvados_cwl/util.py b/sdk/cwl/arvados_cwl/util.py new file mode 100644 index 0000000000..98a2a89a1d --- /dev/null +++ b/sdk/cwl/arvados_cwl/util.py @@ -0,0 +1,31 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +import datetime +from arvados.errors import ApiError + +def get_intermediate_collection_info(workflow_step_name, current_container, intermediate_output_ttl): + if workflow_step_name: + name = "Intermediate collection for step %s" % (workflow_step_name) + else: + name = "Intermediate collection" + trash_time = None + if intermediate_output_ttl > 0: + trash_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=intermediate_output_ttl) + container_uuid = None + if current_container: + container_uuid = current_container['uuid'] + props = {"type": "intermediate", "container": container_uuid} + + return {"name" : name, "trash_at" : trash_time, "properties" : props} + +def get_current_container(api, num_retries=0, logger=None): + current_container = None + try: + current_container = api.containers().current().execute(num_retries=num_retries) + except ApiError as e: + # Status code 404 just means we're not running in a container. + if e.resp.status != 404 and logger: + logger.info("Getting current container: %s", e) + return current_container diff --git a/sdk/cwl/tests/makes_intermediates/echo.cwl b/sdk/cwl/tests/makes_intermediates/echo.cwl new file mode 100644 index 0000000000..5449bc32c4 --- /dev/null +++ b/sdk/cwl/tests/makes_intermediates/echo.cwl @@ -0,0 +1,14 @@ +class: CommandLineTool +cwlVersion: v1.0 +requirements: + InitialWorkDirRequirement: + listing: + - $(inputs.inp1) + - $(inputs.inp2) + - $(inputs.inp3) +inputs: + inp1: File + inp2: [File, Directory] + inp3: Directory +outputs: [] +arguments: [echo, $(inputs.inp1), $(inputs.inp2), $(inputs.inp3)] diff --git a/sdk/cwl/tests/makes_intermediates/hello1.txt b/sdk/cwl/tests/makes_intermediates/hello1.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/cwl/tests/makes_intermediates/run_in_single.cwl b/sdk/cwl/tests/makes_intermediates/run_in_single.cwl new file mode 100644 index 0000000000..bb596b2c43 --- /dev/null +++ b/sdk/cwl/tests/makes_intermediates/run_in_single.cwl @@ -0,0 +1,38 @@ +cwlVersion: v1.0 +class: Workflow +$namespaces: + arv: "http://arvados.org/cwl#" +requirements: + SubworkflowFeatureRequirement: {} +inputs: + inp1: + type: File + default: + class: File + location: hello1.txt + inp2: + type: [File, Directory] + default: + class: File + basename: "hello2.txt" + contents: "Hello world" + inp3: + type: [File, Directory] + default: + class: Directory + basename: inp3 + listing: + - class: File + basename: "hello3.txt" + contents: "hello world" +outputs: [] +steps: + step1: + requirements: + arv:RunInSingleContainer: {} + in: + inp1: inp1 + inp2: inp2 + inp3: inp3 + out: [] + run: subwf.cwl diff --git a/sdk/cwl/tests/makes_intermediates/subwf.cwl b/sdk/cwl/tests/makes_intermediates/subwf.cwl new file mode 100644 index 0000000000..1852ab40c7 --- /dev/null +++ b/sdk/cwl/tests/makes_intermediates/subwf.cwl @@ -0,0 +1,15 @@ +cwlVersion: v1.0 +class: Workflow +inputs: + inp1: File + inp2: File + inp3: Directory +outputs: [] +steps: + step1: + in: + inp1: inp1 + inp2: inp2 + inp3: inp3 + out: [] + run: echo.cwl diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py index dd484690c1..ae234414a3 100644 --- a/sdk/cwl/tests/test_container.py +++ b/sdk/cwl/tests/test_container.py @@ -21,7 +21,6 @@ if not os.getenv('ARVADOS_DEBUG'): logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN) logging.getLogger('arvados.arv-run').setLevel(logging.WARN) - class TestContainer(unittest.TestCase): def helper(self, runner, enable_reuse=True): @@ -100,6 +99,7 @@ class TestContainer(unittest.TestCase): "capacity": 1073741824 } }, 'state': 'Committed', + 'output_name': 'Output for step test_run_'+str(enable_reuse), 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz', 'output_path': '/var/spool/cwl', 'output_ttl': 0, @@ -186,6 +186,7 @@ class TestContainer(unittest.TestCase): "capacity": 5242880000 } }, 'state': 'Committed', + 'output_name': 'Output for step test_resource_requirements', 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz', 'output_path': '/var/spool/cwl', 'output_ttl': 7200, @@ -318,6 +319,7 @@ class TestContainer(unittest.TestCase): } }, 'state': 'Committed', + 'output_name': 'Output for step test_initial_work_dir', 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz', 'output_path': '/var/spool/cwl', 'output_ttl': 0, @@ -405,6 +407,7 @@ class TestContainer(unittest.TestCase): }, }, 'state': 'Committed', + "output_name": "Output for step test_run_redirect", 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz', 'output_path': '/var/spool/cwl', 'output_ttl': 0, @@ -541,6 +544,7 @@ class TestContainer(unittest.TestCase): "capacity": 1073741824 } }, 'state': 'Committed', + 'output_name': 'Output for step test_run_mounts', 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz', 'output_path': '/var/spool/cwl', 'output_ttl': 0, @@ -633,6 +637,7 @@ class TestContainer(unittest.TestCase): "capacity": 1073741824 } }, 'state': 'Committed', + 'output_name': 'Output for step test_secrets', 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz', 'output_path': '/var/spool/cwl', 'output_ttl': 0, diff --git a/sdk/cwl/tests/test_util.py b/sdk/cwl/tests/test_util.py new file mode 100644 index 0000000000..2532bd596c --- /dev/null +++ b/sdk/cwl/tests/test_util.py @@ -0,0 +1,45 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +import unittest +import mock +import datetime +import httplib2 + +from arvados_cwl.util import * +from arvados.errors import ApiError + +class MockDateTime(datetime.datetime): + @classmethod + def utcnow(cls): + return datetime.datetime(2018, 1, 1, 0, 0, 0, 0) + +datetime.datetime = MockDateTime + +class TestUtil(unittest.TestCase): + def test_get_intermediate_collection_info(self): + name = "one" + current_container = {"uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"} + intermediate_output_ttl = 120 + + info = get_intermediate_collection_info(name, current_container, intermediate_output_ttl) + + self.assertEqual(info["name"], "Intermediate collection for step one") + self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 2, 0, 0)) + self.assertEqual(info["properties"], {"type" : "intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}) + + def test_get_current_container_success(self): + api = mock.MagicMock() + api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"} + + current_container = get_current_container(api) + + self.assertEqual(current_container, {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}) + + def test_get_current_container_error(self): + api = mock.MagicMock() + api.containers().current().execute.side_effect = ApiError(httplib2.Response({"status": 300}), "") + logger = mock.MagicMock() + + self.assertRaises(ApiError, get_current_container(api, num_retries=0, logger=logger))