Merge branch '15028-cwl-v1.1' refs #15028
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Tue, 4 Jun 2019 17:20:24 +0000 (13:20 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Tue, 4 Jun 2019 17:20:24 +0000 (13:20 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

1  2 
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_submit.py

index f8f1f30f633bd2f7fa9600ae77736ad2634ac3b4,e20eb18ddacae7d0ae9f6facfad897d084dd8be4..aa3388d00bc9964eb8eb845f2210ac6eee6510de
@@@ -155,7 -155,7 +155,7 @@@ class ArvadosContainer(JobBase)
                                  vwd.mkdirs(p.target)
                              else:
                                  source, path = self.arvrunner.fs_access.get_collection(p.resolved)
 -                                vwd.copy(path, p.target, source_collection=source)
 +                                vwd.copy(path or ".", p.target, source_collection=source)
                          elif p.type == "CreateFile":
                              if self.arvrunner.secret_store.has_secret(p.resolved):
                                  secret_mounts["%s/%s" % (self.outdir, p.target)] = {
                                                                      runtimeContext.pull_image,
                                                                      runtimeContext.project_uuid)
  
+         network_req, _ = self.get_requirement("NetworkAccess")
+         if network_req:
+             runtime_constraints["API"] = network_req["networkAccess"]
          api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
          if api_req:
              runtime_constraints["API"] = True
          if self.output_ttl < 0:
              raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
  
-         if self.timelimit is not None:
+         if self.timelimit is not None and self.timelimit > 0:
              scheduling_parameters["max_run_time"] = self.timelimit
  
          extra_submit_params = {}
  
          enable_reuse = runtimeContext.enable_reuse
          if enable_reuse:
+             reuse_req, _ = self.get_requirement("WorkReuse")
+             if reuse_req:
+                 enable_reuse = reuse_req["enableReuse"]
              reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
              if reuse_req:
                  enable_reuse = reuse_req["enableReuse"]
@@@ -485,6 -492,9 +492,9 @@@ class RunnerContainer(Runner)
          if self.arvrunner.project_uuid:
              command.append("--project-uuid="+self.arvrunner.project_uuid)
  
+         if self.enable_dev:
+             command.append("--enable-dev")
          command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
  
          container_req["command"] = command
index 3ddfd358a891431ed8c74d974458f201e862641b,0750693f5f08e7d31922f700ff79968656aa3f49..b62229fbcafa517cbfb30ebf6bb71e39988753cb
@@@ -21,7 -21,7 +21,7 @@@ import cwltool.secret
  from schema_salad.ref_resolver import Loader
  from schema_salad.sourceline import cmap
  
- from .matcher import JsonDiffMatcher
+ from .matcher import JsonDiffMatcher, StripYAMLComments
  from .mock_discovery import get_rootDesc
  
  if not os.getenv('ARVADOS_DEBUG'):
@@@ -57,7 -57,7 +57,7 @@@ class CollectionMock(object)
  class TestContainer(unittest.TestCase):
  
      def helper(self, runner, enable_reuse=True):
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
  
          make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                           collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
@@@ -66,7 -66,7 +66,7 @@@
               "basedir": "",
               "make_fs_access": make_fs_access,
               "loader": Loader({}),
-              "metadata": {"cwlVersion": "v1.0"}})
+              "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"}})
          runtimeContext = arvados_cwl.context.ArvRuntimeContext(
              {"work_api": "containers",
               "basedir": "",
          call_args, call_kwargs = runner.api.container_requests().create.call_args
  
          vwdmock.copy.assert_has_calls([mock.call('bar', 'foo', source_collection=sourcemock)])
 -        vwdmock.copy.assert_has_calls([mock.call('', 'foo2', source_collection=sourcemock)])
 +        vwdmock.copy.assert_has_calls([mock.call('.', 'foo2', source_collection=sourcemock)])
          vwdmock.copy.assert_has_calls([mock.call('baz/filename', 'filename', source_collection=sourcemock)])
          vwdmock.copy.assert_has_calls([mock.call('subdir', 'subdir', source_collection=sourcemock)])
  
          runner.api.collections().get().execute.return_value = {
              "portable_data_hash": "99999999999999999999999999999993+99"}
  
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
  
          tool = cmap({
              "inputs": [],
          self.assertFalse(api.collections().create.called)
          self.assertFalse(runner.runtime_status_error.called)
  
-         arvjob.collect_outputs.assert_called_with("keep:abc+123")
+         arvjob.collect_outputs.assert_called_with("keep:abc+123", 0)
          arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
          runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
  
              "portable_data_hash": "99999999999999999999999999999994+99",
              "manifest_text": ". 99999999999999999999999999999994+99 0:0:file1 0:0:file2"}
  
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
  
          tool = cmap({
              "inputs": [
          runner.api.collections().get().execute.return_value = {
              "portable_data_hash": "99999999999999999999999999999993+99"}
  
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
  
          tool = cmap({"arguments": ["md5sum", "example.conf"],
                       "class": "CommandLineTool",
              "class": "CommandLineTool",
              "hints": [
                  {
-                     "class": "http://commonwl.org/cwltool#TimeLimit",
+                     "class": "ToolTimeLimit",
                      "timelimit": 42
                  }
              ]
  
          _, kwargs = runner.api.container_requests().create.call_args
          self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))
+ class TestWorkflow(unittest.TestCase):
+     def helper(self, runner, enable_reuse=True):
+         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
+         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+         document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
+         document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
+         document_loader.fetch_text = document_loader.fetcher.fetch_text
+         document_loader.check_exists = document_loader.fetcher.check_exists
+         loadingContext = arvados_cwl.context.ArvLoadingContext(
+             {"avsc_names": avsc_names,
+              "basedir": "",
+              "make_fs_access": make_fs_access,
+              "loader": document_loader,
+              "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
+              "construct_tool_object": runner.arv_make_tool})
+         runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+             {"work_api": "containers",
+              "basedir": "",
+              "name": "test_run_wf_"+str(enable_reuse),
+              "make_fs_access": make_fs_access,
+              "tmpdir": "/tmp",
+              "enable_reuse": enable_reuse,
+              "priority": 500})
+         return loadingContext, runtimeContext
+     # The test passes no builder.resources
+     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+     @mock.patch("arvados.collection.CollectionReader")
+     @mock.patch("arvados.collection.Collection")
+     @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
+     def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
+         arv_docker_clear_cache()
+         arvados_cwl.add_arv_hints()
+         api = mock.MagicMock()
+         api._rootDesc = get_rootDesc()
+         runner = arvados_cwl.executor.ArvCwlExecutor(api)
+         self.assertEqual(runner.work_api, 'containers')
+         list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
+         runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
+         runner.api.collections().list().execute.return_value = {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+                                                                            "portable_data_hash": "99999999999999999999999999999993+99"}]}
+         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+         runner.ignore_docker_for_reuse = False
+         runner.num_retries = 0
+         runner.secret_store = cwltool.secrets.SecretStore()
+         loadingContext, runtimeContext = self.helper(runner)
+         runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+         tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
+         metadata["cwlVersion"] = tool["cwlVersion"]
+         mockc = mock.MagicMock()
+         mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
+         mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
+         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
+         arvtool.formatgraph = None
+         it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+         next(it).run(runtimeContext)
+         next(it).run(runtimeContext)
+         with open("tests/wf/scatter2_subwf.cwl") as f:
+             subwf = StripYAMLComments(f.read()).rstrip()
+         runner.api.container_requests().create.assert_called_with(
+             body=JsonDiffMatcher({
+                 "command": [
+                     "cwltool",
+                     "--no-container",
+                     "--move-outputs",
+                     "--preserve-entire-environment",
+                     "workflow.cwl#main",
+                     "cwl.input.yml"
+                 ],
+                 "container_image": "99999999999999999999999999999993+99",
+                 "cwd": "/var/spool/cwl",
+                 "environment": {
+                     "HOME": "/var/spool/cwl",
+                     "TMPDIR": "/tmp"
+                 },
+                 "mounts": {
+                     "/keep/99999999999999999999999999999999+118": {
+                         "kind": "collection",
+                         "portable_data_hash": "99999999999999999999999999999999+118"
+                     },
+                     "/tmp": {
+                         "capacity": 1073741824,
+                         "kind": "tmp"
+                     },
+                     "/var/spool/cwl": {
+                         "capacity": 1073741824,
+                         "kind": "tmp"
+                     },
+                     "/var/spool/cwl/cwl.input.yml": {
+                         "kind": "collection",
+                         "path": "cwl.input.yml",
+                         "portable_data_hash": "99999999999999999999999999999996+99"
+                     },
+                     "/var/spool/cwl/workflow.cwl": {
+                         "kind": "collection",
+                         "path": "workflow.cwl",
+                         "portable_data_hash": "99999999999999999999999999999996+99"
+                     },
+                     "stdout": {
+                         "kind": "file",
+                         "path": "/var/spool/cwl/cwl.output.json"
+                     }
+                 },
+                 "name": "scatterstep",
+                 "output_name": "Output for step scatterstep",
+                 "output_path": "/var/spool/cwl",
+                 "output_ttl": 0,
+                 "priority": 500,
+                 "properties": {},
+                 "runtime_constraints": {
+                     "ram": 1073741824,
+                     "vcpus": 1
+                 },
+                 "scheduling_parameters": {},
+                 "secret_mounts": {},
+                 "state": "Committed",
+                 "use_existing": True
+             }))
+         mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
+         mockc.open().__enter__().write.assert_has_calls([mock.call(
+ '''{
+   "fileblub": {
+     "basename": "token.txt",
+     "class": "File",
+     "location": "/keep/99999999999999999999999999999999+118/token.txt",
+     "size": 0
+   },
+   "sleeptime": 5
+ }''')])
+     # The test passes no builder.resources
+     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+     @mock.patch("arvados.collection.CollectionReader")
+     @mock.patch("arvados.collection.Collection")
+     @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
+     def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
+         arv_docker_clear_cache()
+         arvados_cwl.add_arv_hints()
+         api = mock.MagicMock()
+         api._rootDesc = get_rootDesc()
+         runner = arvados_cwl.executor.ArvCwlExecutor(api)
+         self.assertEqual(runner.work_api, 'containers')
+         list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
+         runner.api.collections().get().execute.return_value = {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+                                                                "portable_data_hash": "99999999999999999999999999999993+99"}
+         runner.api.collections().list().execute.return_value = {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+                                                                            "portable_data_hash": "99999999999999999999999999999993+99"}]}
+         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+         runner.ignore_docker_for_reuse = False
+         runner.num_retries = 0
+         runner.secret_store = cwltool.secrets.SecretStore()
+         loadingContext, runtimeContext = self.helper(runner)
+         runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+         loadingContext.do_update = True
+         tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
+         mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
+         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
+         arvtool.formatgraph = None
+         it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+         next(it).run(runtimeContext)
+         next(it).run(runtimeContext)
+         with open("tests/wf/echo-subwf.cwl") as f:
+             subwf = StripYAMLComments(f.read())
+         runner.api.container_requests().create.assert_called_with(
+             body=JsonDiffMatcher({
+                 'output_ttl': 0,
+                 'environment': {'HOME': '/var/spool/cwl', 'TMPDIR': '/tmp'},
+                 'scheduling_parameters': {},
+                 'name': u'echo-subwf',
+                 'secret_mounts': {},
+                 'runtime_constraints': {'API': True, 'vcpus': 3, 'ram': 1073741824},
+                 'properties': {},
+                 'priority': 500,
+                 'mounts': {
+                     '/var/spool/cwl/cwl.input.yml': {
+                         'portable_data_hash': '99999999999999999999999999999996+99',
+                         'kind': 'collection',
+                         'path': 'cwl.input.yml'
+                     },
+                     '/var/spool/cwl/workflow.cwl': {
+                         'portable_data_hash': '99999999999999999999999999999996+99',
+                         'kind': 'collection',
+                         'path': 'workflow.cwl'
+                     },
+                     'stdout': {
+                         'path': '/var/spool/cwl/cwl.output.json',
+                         'kind': 'file'
+                     },
+                     '/tmp': {
+                         'kind': 'tmp',
+                         'capacity': 1073741824
+                     }, '/var/spool/cwl': {
+                         'kind': 'tmp',
+                         'capacity': 3221225472
+                     }
+                 },
+                 'state': 'Committed',
+                 'output_path': '/var/spool/cwl',
+                 'container_image': '99999999999999999999999999999993+99',
+                 'command': [
+                     u'cwltool',
+                     u'--no-container',
+                     u'--move-outputs',
+                     u'--preserve-entire-environment',
+                     u'workflow.cwl#main',
+                     u'cwl.input.yml'
+                 ],
+                 'use_existing': True,
+                 'output_name': u'Output for step echo-subwf',
+                 'cwd': '/var/spool/cwl'
+             }))
+     def test_default_work_api(self):
+         arvados_cwl.add_arv_hints()
+         api = mock.MagicMock()
+         api._rootDesc = copy.deepcopy(get_rootDesc())
+         del api._rootDesc.get('resources')['jobs']['methods']['create']
+         runner = arvados_cwl.executor.ArvCwlExecutor(api)
+         self.assertEqual(runner.work_api, 'containers')
index f1118695023d017d5507103d4143245e9a0d2a63,ad33e6e9af614b7046a52efe519b103321800f06..d2c4d9c2eaff4e0255e365ae062c478f5f0fcc3f
@@@ -84,6 -84,7 +84,7 @@@ def stubs(func)
  
          stubs.api = mock.MagicMock()
          stubs.api._rootDesc = get_rootDesc()
+         stubs.api._rootDesc["uuidPrefix"] = "zzzzz"
  
          stubs.api.users().current().execute.return_value = {
              "uuid": stubs.fake_user_uuid,
@@@ -353,15 -354,15 +354,15 @@@ class TestSubmit(unittest.TestCase)
          stubs.api.collections().create.assert_has_calls([
              mock.call(body=JsonDiffMatcher({
                  'manifest_text':
-                 '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
+                 '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
                  'replication_desired': None,
-                 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+                 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
              }), ensure_unique_name=False),
              mock.call(body=JsonDiffMatcher({
                  'manifest_text':
-                 '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+                 '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
                  'replication_desired': None,
-                 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
+                 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
              }), ensure_unique_name=False),
              mock.call(body=JsonDiffMatcher({
                  'manifest_text':
      def test_submit_runner_ram(self, stubs, tm):
          exited = arvados_cwl.main(
              ["--submit", "--no-wait", "--debug", "--submit-runner-ram=2048",
+              "--api=jobs",
               "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
              stubs.capture_stdout, sys.stderr, api_client=stubs.api)
  
  
          exited = arvados_cwl.main(
              ["--submit", "--no-wait", "--debug", "--output-name", output_name,
+              "--api=jobs",
               "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
              stubs.capture_stdout, sys.stderr, api_client=stubs.api)
  
      def test_submit_pipeline_name(self, stubs, tm):
          exited = arvados_cwl.main(
              ["--submit", "--no-wait", "--debug", "--name=hello job 123",
+              "--api=jobs",
               "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
              stubs.capture_stdout, sys.stderr, api_client=stubs.api)
          self.assertEqual(exited, 0)
  
          exited = arvados_cwl.main(
              ["--submit", "--no-wait", "--debug", "--output-tags", output_tags,
+              "--api=jobs",
               "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
              stubs.capture_stdout, sys.stderr, api_client=stubs.api)
          self.assertEqual(exited, 0)
          exited = arvados_cwl.main(
              ["--submit", "--no-wait", "--debug",
               "--project-uuid", project_uuid,
+              "--api=jobs",
               "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
              sys.stdout, sys.stderr, api_client=stubs.api)
          self.assertEqual(exited, 0)
          stubs.api.collections().create.assert_has_calls([
              mock.call(body=JsonDiffMatcher({
                  'manifest_text':
-                 '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
+                 '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
                  'replication_desired': None,
-                 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+                 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
              }), ensure_unique_name=False),
              mock.call(body=JsonDiffMatcher({
                  'manifest_text':
-                 '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+                 '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
                  'replication_desired': None,
-                 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
-             }), ensure_unique_name=False)])
+                 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+             }), ensure_unique_name=False),
+             ])
  
          expect_container = copy.deepcopy(stubs.expect_container_spec)
          stubs.api.container_requests().create.assert_called_with(
  
      @stubs
      def test_submit_request_uuid(self, stubs):
 +        stubs.api._rootDesc["remoteHosts"]["zzzzz"] = "123"
          stubs.expect_container_request_uuid = "zzzzz-xvhdp-yyyyyyyyyyyyyyy"
  
          stubs.api.container_requests().update().execute.return_value = {
  
  
  class TestCreateTemplate(unittest.TestCase):
-     existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
+     existing_template_uuid = "zzzzz-p5p6p-validworkfloyml"
  
      def _adjust_script_params(self, expect_component):
          expect_component['script_parameters']['x'] = {
@@@ -1857,7 -1863,7 +1864,7 @@@ class TestTemplateInputs(unittest.TestC
      @stubs
      def test_inputs_empty(self, stubs):
          exited = arvados_cwl.main(
-             ["--create-template",
+             ["--debug", "--api=jobs", "--create-template",
               "tests/wf/inputs_test.cwl", "tests/order/empty_order.json"],
              stubs.capture_stdout, sys.stderr, api_client=stubs.api)
  
      @stubs
      def test_inputs(self, stubs):
          exited = arvados_cwl.main(
-             ["--create-template",
+             ["--api=jobs", "--create-template",
               "tests/wf/inputs_test.cwl", "tests/order/inputs_test_order.json"],
              stubs.capture_stdout, sys.stderr, api_client=stubs.api)