import ciso8601
import uuid
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
import arvados.collection
-from arvados.errors import ApiError
from .arvdocker import arv_docker_get_image
from . import done
from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
keepemptydirs(vwd)
- info = self._get_intermediate_collection_info()
+ if not runtimeContext.current_container:
+ runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+ info = get_intermediate_collection_info(runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
vwd.save_new(name=info["name"],
ensure_unique_name=True,
trash_at=info["trash_at"],
finally:
self.output_callback(outputs, processStatus)
- def _get_intermediate_collection_info(self):
- trash_time = None
- if self.arvrunner.intermediate_output_ttl > 0:
- trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
-
- current_container_uuid = None
- try:
- current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
- current_container_uuid = current_container['uuid']
- except ApiError as e:
- # Status code 404 just means we're not running in a container.
- if e.resp.status != 404:
- logger.info("Getting current container: %s", e)
- props = {"type": "Intermediate",
- "container": current_container_uuid}
-
- return {"name" : "Intermediate collection",
- "trash_at" : trash_time,
- "properties" : props}
-
class RunnerContainer(Runner):
"""Submit and manage a container that runs arvados-cwl-runner."""
import re
import copy
import json
-import datetime
import time
from cwltool.process import shortname, UnsupportedRequirement
from schema_salad.sourceline import SourceLine
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
import ruamel.yaml as yaml
import arvados.collection
if vwd:
with Perf(metrics, "generatefiles.save_new %s" % self.name):
- info = self._get_intermediate_collection_info()
+ if not runtimeContext.current_container:
+ runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+ info = get_intermediate_collection_info(runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
vwd.save_new(name=info["name"],
ensure_unique_name=True,
trash_at=info["trash_at"],
finally:
self.output_callback(outputs, processStatus)
- def _get_intermediate_collection_info(self):
- trash_time = None
- if self.arvrunner.intermediate_output_ttl > 0:
- trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
-
- current_container_uuid = None
- try:
- current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
- current_container_uuid = current_container['uuid']
- except ApiError as e:
- # Status code 404 just means we're not running in a container.
- if e.resp.status != 404:
- logger.info("Getting current container: %s", e)
- props = {"type": "Intermediate",
- "container": current_container_uuid}
-
- return {"name" : "Intermediate collection",
- "trash_at" : trash_time,
- "properties" : props}
-
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
from cwltool.context import LoadingContext, RuntimeContext
class ArvLoadingContext(LoadingContext):
self.wait = True
self.cwl_runner_job = None
self.storage_classes = "default"
+ self.current_container = None
super(ArvRuntimeContext, self).__init__(kwargs)
import uuid
import os
import urllib
-import datetime
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
import arvados.commands.run
import arvados.collection
for l in srcobj.get("listing", []):
self.addentry(l, c, ".", remap)
- info = self._get_intermediate_collection_info()
+ container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+ info = get_intermediate_collection_info(container, self.arvrunner.intermediate_output_ttl)
c.save_new(name=info["name"],
owner_uuid=self.arvrunner.project_uuid,
num_retries=self.arvrunner.num_retries )
self.addentry(srcobj, c, ".", remap)
- info = self._get_intermediate_collection_info()
+ container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+ info = get_intermediate_collection_info(container, self.arvrunner.intermediate_output_ttl)
c.save_new(name=info["name"],
owner_uuid=self.arvrunner.project_uuid,
else:
return None
- def _get_intermediate_collection_info(self):
- trash_time = None
- if self.arvrunner.intermediate_output_ttl > 0:
- trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
-
- current_container_uuid = None
- try:
- current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
- current_container_uuid = current_container['uuid']
- except ApiError as e:
- # Status code 404 just means we're not running in a container.
- if e.resp.status != 404:
- logger.info("Getting current container: %s", e)
- props = {"type": "Intermediate",
- "container": current_container_uuid}
-
- return {"name" : "Intermediate collection",
- "trash_at" : trash_time,
- "properties" : props}
-
class StagingPathMapper(PathMapper):
_follow_dirs = True
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import datetime
+from arvados.errors import ApiError
+
+def get_intermediate_collection_info(current_container, intermediate_output_ttl):
+ name = "Intermediate collection"
+ trash_time = None
+ if intermediate_output_ttl > 0:
+ trash_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=intermediate_output_ttl)
+ container_uuid = None
+ if current_container:
+ container_uuid = current_container['uuid']
+ props = {"type": "intermediate", "container": container_uuid}
+
+ return {"name" : name, "trash_at" : trash_time, "properties" : props}
+
+def get_current_container(api, num_retries=0, logger=None):
+ current_container = None
+ try:
+ current_container = api.containers().current().execute(num_retries=num_retries)
+ except ApiError as e:
+ # Status code 404 just means we're not running in a container.
+ if e.resp.status != 404 and logger:
+ logger.info("Getting current container: %s", e)
+ return current_container;
import mock
import unittest
import os
-import datetime
import functools
import cwltool.process
import cwltool.secrets
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-class MockDateTime(datetime.datetime):
- @classmethod
- def now(cls):
- return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
-
-datetime.datetime = MockDateTime
-
class TestContainer(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
_, kwargs = runner.api.container_requests().create.call_args
self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))
-
-
- def test_get_intermediate_collection_info(self):
- arvrunner = mock.MagicMock()
- arvrunner.intermediate_output_ttl = 60
- arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
-
- container = arvados_cwl.ArvadosContainer(arvrunner)
-
- info = container._get_intermediate_collection_info()
-
- self.assertEqual(info["name"], "Intermediate collection")
- self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
- self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
import unittest
import copy
import StringIO
-import datetime
import arvados
import arvados_cwl
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-class MockDateTime(datetime.datetime):
- @classmethod
- def now(cls):
- return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
-
-datetime.datetime = MockDateTime
-
class TestJob(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
- def test_get_intermediate_collection_info(self):
- arvrunner = mock.MagicMock()
- arvrunner.intermediate_output_ttl = 60
- arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
-
- job = arvados_cwl.ArvadosJob(arvrunner)
-
- info = job._get_intermediate_collection_info()
-
- self.assertEqual(info["name"], "Intermediate collection")
- self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
- self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
-
-
class TestWorkflow(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
import json
import logging
import os
-import datetime
import arvados
import arvados.keep
c.keepref = "%s/%s" % (pdh, os.path.basename(c.fn))
c.fn = fnPattern % (pdh, os.path.basename(c.fn))
-class MockDateTime(datetime.datetime):
- @classmethod
- def now(cls):
- return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
-
-datetime.datetime = MockDateTime
-
class TestPathmap(unittest.TestCase):
def setUp(self):
self.api = mock.MagicMock()
"class": "File",
"location": "file:tests/hw.py"
}], "", "/test/%s", "/test/%s/%s")
-
- def test_get_intermediate_collection_info(self):
- self.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
- arvrunner = arvados_cwl.ArvCwlRunner(self.api)
- arvrunner.intermediate_output_ttl = 60
-
- path_mapper = ArvPathMapper(arvrunner, [{
- "class": "File",
- "location": "keep:99999999999999999999999999999991+99/hw.py"
- }], "", "/test/%s", "/test/%s/%s")
-
- info = path_mapper._get_intermediate_collection_info()
-
- self.assertEqual(info["name"], "Intermediate collection")
- self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
- self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import unittest
+import mock
+import datetime
+import httplib2
+
+from arvados_cwl.util import *
+from arvados.errors import ApiError
+
+class MockDateTime(datetime.datetime):
+ @classmethod
+ def utcnow(cls):
+ return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
+
+datetime.datetime = MockDateTime
+
+class TestUtil(unittest.TestCase):
+ def test_get_intermediate_collection_info(self):
+ current_container = {"uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+ intermediate_output_ttl = 120
+
+ info = get_intermediate_collection_info(current_container, intermediate_output_ttl)
+
+ self.assertEqual(info["name"], "Intermediate collection")
+ self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 2, 0, 0))
+ self.assertEqual(info["properties"], {"type" : "intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
+
+ def test_get_current_container_success(self):
+ api = mock.MagicMock()
+ api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+
+ current_container = get_current_container(api)
+
+ self.assertEqual(current_container, {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
+
+ def test_get_current_container_error(self):
+ api = mock.MagicMock()
+ api.containers().current().execute.side_effect = ApiError(httplib2.Response({"status": 300}), "")
+ logger = mock.MagicMock()
+
+ self.assertRaises(ApiError, get_current_container(api, num_retries=0, logger=logger))