import arvados.collection
+from arvados.errors import ApiError
from .arvdocker import arv_docker_get_image
from . import done
from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
keepemptydirs(vwd)
- with Perf(metrics, "generatefiles.save_new %s" % self.name):
- vwd.save_new()
+ info = self._get_intermediate_collection_info()
+ vwd.save_new(name=info["name"],
+ ensure_unique_name=True,
+ trash_at=info["trash_at"],
+ properties=info["properties"])
prev = None
for f, p in sorteditems:
finally:
self.output_callback(outputs, processStatus)
+ def _get_intermediate_collection_info(self):
+ trash_time = None
+ if self.arvrunner.intermediate_output_ttl > 0:
+ trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
+
+ current_container_uuid = None
+ try:
+ current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
+ current_container_uuid = current_container['uuid']
+ except ApiError as e:
+ # Status code 404 just means we're not running in a container.
+ if e.resp.status != 404:
+ logger.info("Getting current container: %s", e)
+ props = {"type": "Intermediate",
+ "container": current_container_uuid}
+
+ return {"name" : "Intermediate collection",
+ "trash_at" : trash_time,
+ "properties" : props}
+
class RunnerContainer(Runner):
"""Submit and manage a container that runs arvados-cwl-runner."""
import re
import copy
import json
+import datetime
import time
from cwltool.process import shortname, UnsupportedRequirement
if vwd:
with Perf(metrics, "generatefiles.save_new %s" % self.name):
- vwd.save_new()
+ info = self._get_intermediate_collection_info()
+ vwd.save_new(name=info["name"],
+ ensure_unique_name=True,
+ trash_at=info["trash_at"],
+ properties=info["properties"])
for f, p in generatemapper.items():
if p.type == "File":
finally:
self.output_callback(outputs, processStatus)
+ def _get_intermediate_collection_info(self):
+ trash_time = None
+ if self.arvrunner.intermediate_output_ttl > 0:
+ trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
+
+ current_container_uuid = None
+ try:
+ current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
+ current_container_uuid = current_container['uuid']
+ except ApiError as e:
+ # Status code 404 just means we're not running in a container.
+ if e.resp.status != 404:
+ logger.info("Getting current container: %s", e)
+ props = {"type": "Intermediate",
+ "container": current_container_uuid}
+
+ return {"name" : "Intermediate collection",
+ "trash_at" : trash_time,
+ "properties" : props}
+
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
import uuid
import os
import urllib
+import datetime
import arvados.commands.run
import arvados.collection
from schema_salad.sourceline import SourceLine
+from arvados.errors import ApiError
from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
from cwltool.workflow import WorkflowException
for l in srcobj.get("listing", []):
self.addentry(l, c, ".", remap)
- check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
- if not check["items"]:
- c.save_new(owner_uuid=self.arvrunner.project_uuid)
+ info = self._get_intermediate_collection_info()
+
+ c.save_new(name=info["name"],
+ owner_uuid=self.arvrunner.project_uuid,
+ ensure_unique_name=True,
+ trash_at=info["trash_at"],
+ properties=info["properties"])
ab = self.collection_pattern % c.portable_data_hash()
self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
num_retries=self.arvrunner.num_retries )
self.addentry(srcobj, c, ".", remap)
- check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
- if not check["items"]:
- c.save_new(owner_uuid=self.arvrunner.project_uuid)
+ info = self._get_intermediate_collection_info()
+
+ c.save_new(name=info["name"],
+ owner_uuid=self.arvrunner.project_uuid,
+ ensure_unique_name=True,
+ trash_at=info["trash_at"],
+ properties=info["properties"])
ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
else:
return None
+ def _get_intermediate_collection_info(self):
+ trash_time = None
+ if self.arvrunner.intermediate_output_ttl > 0:
+ trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
+
+ current_container_uuid = None
+ try:
+ current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
+ current_container_uuid = current_container['uuid']
+ except ApiError as e:
+ # Status code 404 just means we're not running in a container.
+ if e.resp.status != 404:
+ logger.info("Getting current container: %s", e)
+ props = {"type": "Intermediate",
+ "container": current_container_uuid}
+
+ return {"name" : "Intermediate collection",
+ "trash_at" : trash_time,
+ "properties" : props}
+
+
class StagingPathMapper(PathMapper):
_follow_dirs = True
--- /dev/null
+class: CommandLineTool
+cwlVersion: v1.0
+requirements:
+ InitialWorkDirRequirement:
+ listing:
+ - $(inputs.inp1)
+ - $(inputs.inp2)
+ - $(inputs.inp3)
+inputs:
+ inp1: File
+ inp2: [File, Directory]
+ inp3: Directory
+outputs: []
+arguments: [echo, $(inputs.inp1), $(inputs.inp2), $(inputs.inp3)]
--- /dev/null
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ SubworkflowFeatureRequirement: {}
+inputs:
+ inp1:
+ type: File
+ default:
+ class: File
+ location: hello1.txt
+ inp2:
+ type: [File, Directory]
+ default:
+ class: File
+ basename: "hello2.txt"
+ contents: "Hello world"
+ inp3:
+ type: [File, Directory]
+ default:
+ class: Directory
+ basename: inp3
+ listing:
+ - class: File
+ basename: "hello3.txt"
+ contents: "hello world"
+outputs: []
+steps:
+ step1:
+ requirements:
+ arv:RunInSingleContainer: {}
+ in:
+ inp1: inp1
+ inp2: inp2
+ inp3: inp3
+ out: []
+ run: subwf.cwl
--- /dev/null
+cwlVersion: v1.0
+class: Workflow
+inputs:
+ inp1: File
+ inp2: File
+ inp3: Directory
+outputs: []
+steps:
+ step1:
+ in:
+ inp1: inp1
+ inp2: inp2
+ inp3: inp3
+ out: []
+ run: echo.cwl
import mock
import unittest
import os
+import datetime
import functools
import cwltool.process
import cwltool.secrets
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+class MockDateTime(datetime.datetime):
+ @classmethod
+ def now(cls):
+ return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
+
+datetime.datetime = MockDateTime
class TestContainer(unittest.TestCase):
_, kwargs = runner.api.container_requests().create.call_args
self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))
+
+
+ def test_get_intermediate_collection_info(self):
+ arvrunner = mock.MagicMock()
+ arvrunner.intermediate_output_ttl = 60
+ arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+
+ container = arvados_cwl.ArvadosContainer(arvrunner)
+
+ info = container._get_intermediate_collection_info()
+
+ self.assertEqual(info["name"], "Intermediate collection")
+ self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
+ self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
import unittest
import copy
import StringIO
+import datetime
import arvados
import arvados_cwl
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+class MockDateTime(datetime.datetime):
+ @classmethod
+ def now(cls):
+ return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
+
+datetime.datetime = MockDateTime
+
class TestJob(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
+ def test_get_intermediate_collection_info(self):
+ arvrunner = mock.MagicMock()
+ arvrunner.intermediate_output_ttl = 60
+ arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+
+ job = arvados_cwl.ArvadosJob(arvrunner)
+
+ info = job._get_intermediate_collection_info()
+
+ self.assertEqual(info["name"], "Intermediate collection")
+ self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
+ self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
+
+
class TestWorkflow(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
import json
import logging
import os
+import datetime
import arvados
import arvados.keep
c.keepref = "%s/%s" % (pdh, os.path.basename(c.fn))
c.fn = fnPattern % (pdh, os.path.basename(c.fn))
+class MockDateTime(datetime.datetime):
+ @classmethod
+ def now(cls):
+ return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
+
+datetime.datetime = MockDateTime
+
class TestPathmap(unittest.TestCase):
def setUp(self):
self.api = mock.MagicMock()
"class": "File",
"location": "file:tests/hw.py"
}], "", "/test/%s", "/test/%s/%s")
+
+ def test_get_intermediate_collection_info(self):
+ self.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+ arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+ arvrunner.intermediate_output_ttl = 60
+
+ path_mapper = ArvPathMapper(arvrunner, [{
+ "class": "File",
+ "location": "keep:99999999999999999999999999999991+99/hw.py"
+ }], "", "/test/%s", "/test/%s/%s")
+
+ info = path_mapper._get_intermediate_collection_info()
+
+ self.assertEqual(info["name"], "Intermediate collection")
+ self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
+ self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})