class TestContainer(unittest.TestCase):
+ def setUp(self):
+ cwltool.process._names = set()
+
def helper(self, runner, enable_reuse=True):
document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
def setup_and_test_container_executor_and_logging(self, gcc_mock) :
api = mock.MagicMock()
api._rootDesc = copy.deepcopy(get_rootDesc())
- del api._rootDesc.get('resources')['jobs']['methods']['create']
# Make sure ArvCwlExecutor thinks it's running inside a container so it
# adds the logging handler that will call runtime_status_update() mock
class TestWorkflow(unittest.TestCase):
+ def setUp(self):
+ cwltool.process._names = set()
+
def helper(self, runner, enable_reuse=True):
document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
api = mock.MagicMock()
api._rootDesc = copy.deepcopy(get_rootDesc())
- del api._rootDesc.get('resources')['jobs']['methods']['create']
runner = arvados_cwl.executor.ArvCwlExecutor(api)
self.assertEqual(runner.work_api, 'containers')