Merge branch 'master' into 14075-uploadfiles
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 24 Aug 2018 19:28:16 +0000 (15:28 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 24 Aug 2018 19:28:16 +0000 (15:28 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

1  2 
sdk/cwl/setup.py
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_submit.py
sdk/python/arvados/commands/run.py

diff --combined sdk/cwl/setup.py
index bdbe778292ac33b6505cd0d185520028015c5f38,e452ce26440da32cdf3732cdc8f43f271600b432..2b7b31b9f3f4b4070cbd14d986ffe87259989200
@@@ -36,17 -36,19 +36,21 @@@ setup(name='arvados-cwl-runner'
            'cwltool==1.0.20180806194258',
            'schema-salad==2.7.20180719125426',
            'typing >= 3.6.4',
 -          'ruamel.yaml >=0.13.11, <0.16',
 +          # Need to limit ruamel.yaml version to 0.15.26 because of bug
 +          # https://bitbucket.org/ruamel/yaml/issues/227/regression-parsing-flow-mapping
 +          'ruamel.yaml >=0.13.11, <= 0.15.26',
            'arvados-python-client>=1.1.4.20180607143841',
            'setuptools',
-           'ciso8601 >=1.0.6, <2.0.0'
+           'ciso8601 >=1.0.6, <2.0.0',
+           'subprocess32>=3.5.1',
        ],
        data_files=[
            ('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
        ],
        test_suite='tests',
-       tests_require=['mock>=1.0'],
+       tests_require=[
+           'mock>=1.0',
+           'subprocess32>=3.5.1',
+       ],
        zip_safe=True
        )
index 0209d2eba9f681d5eb8e1ad51d24954b0f2b9f3f,3f8a32816ddccdad01c78eedfdce1ed0b2be5e64..69f3ae046e31ca3e02aa52be7967e43da2394d00
@@@ -3,8 -3,8 +3,9 @@@
  # SPDX-License-Identifier: Apache-2.0
  
  import arvados_cwl
+ import arvados_cwl.context
  from arvados_cwl.arvdocker import arv_docker_clear_cache
 +import arvados.config
  import logging
  import mock
  import unittest
@@@ -21,35 -21,30 +22,56 @@@ if not os.getenv('ARVADOS_DEBUG')
      logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
      logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
  
 +class CollectionMock(object):
 +    def __init__(self, vwdmock, *args, **kwargs):
 +        self.vwdmock = vwdmock
 +        self.count = 0
 +
 +    def open(self, *args, **kwargs):
 +        self.count += 1
 +        return self.vwdmock.open(*args, **kwargs)
 +
 +    def copy(self, *args, **kwargs):
 +        self.count += 1
 +        self.vwdmock.copy(*args, **kwargs)
 +
 +    def save_new(self, *args, **kwargs):
 +        pass
 +
 +    def __len__(self):
 +        return self.count
 +
 +    def portable_data_hash(self):
 +        if self.count == 0:
 +            return arvados.config.EMPTY_BLOCK_LOCATOR
 +        else:
 +            return "99999999999999999999999999999996+99"
 +
 +
  class TestContainer(unittest.TestCase):
  
+     def helper(self, runner, enable_reuse=True):
+         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+         loadingContext = arvados_cwl.context.ArvLoadingContext(
+             {"avsc_names": avsc_names,
+              "basedir": "",
+              "make_fs_access": make_fs_access,
+              "loader": Loader({}),
+              "metadata": {"cwlVersion": "v1.0"}})
+         runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+             {"work_api": "containers",
+              "basedir": "",
+              "name": "test_run_"+str(enable_reuse),
+              "make_fs_access": make_fs_access,
+              "tmpdir": "/tmp",
+              "enable_reuse": enable_reuse,
+              "priority": 500})
+         return loadingContext, runtimeContext
      # The test passes no builder.resources
      # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
      @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@@@ -67,8 -62,6 +89,6 @@@
              runner.api.collections().get().execute.return_value = {
                  "portable_data_hash": "99999999999999999999999999999993+99"}
  
-             document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
              tool = cmap({
                  "inputs": [],
                  "outputs": [],
                  "id": "#",
                  "class": "CommandLineTool"
              })
-             make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                      basedir="", make_fs_access=make_fs_access, loader=Loader({}),
-                                                      metadata={"cwlVersion": "v1.0"})
+             loadingContext, runtimeContext = self.helper(runner, enable_reuse)
+             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
              arvtool.formatgraph = None
-             for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse),
-                                  make_fs_access=make_fs_access, tmpdir="/tmp"):
-                 j.run(enable_reuse=enable_reuse, priority=500)
+             for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+                 j.run(runtimeContext)
                  runner.api.container_requests().create.assert_called_with(
                      body=JsonDiffMatcher({
                          'environment': {
                                                 "capacity": 1073741824 }
                          },
                          'state': 'Committed',
+                         'output_name': 'Output for step test_run_'+str(enable_reuse),
                          'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                          'output_path': '/var/spool/cwl',
                          'output_ttl': 0,
          runner.intermediate_output_ttl = 3600
          runner.secret_store = cwltool.secrets.SecretStore()
  
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
          keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
          runner.api.collections().get().execute.return_value = {
              "portable_data_hash": "99999999999999999999999999999993+99"}
              "id": "#",
              "class": "CommandLineTool"
          })
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
-                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
-                                                  loader=Loader({}), metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_resource_requirements"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements",
-                              make_fs_access=make_fs_access, tmpdir="/tmp"):
-             j.run(enable_reuse=True, priority=500)
+         for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
  
          call_args, call_kwargs = runner.api.container_requests().create.call_args
  
                                     "capacity": 5242880000 }
              },
              'state': 'Committed',
+             'output_name': 'Output for step test_resource_requirements',
              'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
              'output_path': '/var/spool/cwl',
              'output_ttl': 7200,
          runner.intermediate_output_ttl = 0
          runner.secret_store = cwltool.secrets.SecretStore()
  
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
          keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
          runner.api.collections().get().execute.return_value = {
              "portable_data_hash": "99999999999999999999999999999993+99"}
          runner.fs_access.get_collection.side_effect = get_collection_mock
  
          vwdmock = mock.MagicMock()
 -        collection_mock.return_value = vwdmock
 -        vwdmock.portable_data_hash.return_value = "99999999999999999999999999999996+99"
 +        collection_mock.side_effect = lambda *args, **kwargs: CollectionMock(vwdmock, *args, **kwargs)
  
          tool = cmap({
              "inputs": [],
              "id": "#",
              "class": "CommandLineTool"
          })
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
-                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
-                                                  loader=Loader({}), metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_initial_work_dir"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir",
-                              make_fs_access=make_fs_access, tmpdir="/tmp"):
-             j.run(priority=500)
+         for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
  
          call_args, call_kwargs = runner.api.container_requests().create.call_args
  
                  }
              },
              'state': 'Committed',
+             'output_name': 'Output for step test_initial_work_dir',
              'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
              'output_path': '/var/spool/cwl',
              'output_ttl': 0,
              "id": "#",
              "class": "CommandLineTool"
          })
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                  basedir="", make_fs_access=make_fs_access, loader=Loader({}),
-                                                  metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_run_redirect"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
-                              make_fs_access=make_fs_access, tmpdir="/tmp"):
-             j.run(priority=500)
+         for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
              runner.api.container_requests().create.assert_called_with(
                  body=JsonDiffMatcher({
                      'environment': {
                          },
                      },
                      'state': 'Committed',
+                     "output_name": "Output for step test_run_redirect",
                      'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                      'output_path': '/var/spool/cwl',
                      'output_ttl': 0,
  
          col().open.return_value = []
  
-         arvjob = arvados_cwl.ArvadosContainer(runner)
-         arvjob.name = "testjob"
-         arvjob.builder = mock.MagicMock()
+         arvjob = arvados_cwl.ArvadosContainer(runner,
+                                               mock.MagicMock(),
+                                               {},
+                                               None,
+                                               [],
+                                               [],
+                                               "testjob")
          arvjob.output_callback = mock.MagicMock()
          arvjob.collect_outputs = mock.MagicMock()
          arvjob.successCodes = [0]
  
          keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
          runner.api.collections().get().execute.return_value = {
-             "portable_data_hash": "99999999999999999999999999999993+99"}
+             "portable_data_hash": "99999999999999999999999999999994+99",
+             "manifest_text": ". 99999999999999999999999999999994+99 0:0:file1 0:0:file2"}
  
          document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
  
              "id": "#",
              "class": "CommandLineTool"
          })
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                      collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                  basedir="", make_fs_access=make_fs_access, loader=Loader({}),
-                                                  metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_run_mounts"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
          job_order = {
              "p1": {
                  ]
              }
          }
-         for j in arvtool.job(job_order, mock.MagicMock(), basedir="", name="test_run_mounts",
-                              make_fs_access=make_fs_access, tmpdir="/tmp"):
-             j.run(priority=500)
+         for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
              runner.api.container_requests().create.assert_called_with(
                  body=JsonDiffMatcher({
                      'environment': {
                                             "capacity": 1073741824 }
                      },
                      'state': 'Committed',
+                     'output_name': 'Output for step test_run_mounts',
                      'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                      'output_path': '/var/spool/cwl',
                      'output_ttl': 0,
                               ]
                           }
                       ]})
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                      collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                  basedir="", make_fs_access=make_fs_access, loader=Loader({}),
-                                                  metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_secrets"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
  
          job_order = {"pw": "blorp"}
          runner.secret_store.store(["pw"], job_order)
  
-         for j in arvtool.job(job_order, mock.MagicMock(), basedir="", name="test_secrets",
-                              make_fs_access=make_fs_access, tmpdir="/tmp"):
-             j.run(enable_reuse=True, priority=500)
+         for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
              runner.api.container_requests().create.assert_called_with(
                  body=JsonDiffMatcher({
                      'environment': {
                                             "capacity": 1073741824 }
                      },
                      'state': 'Committed',
+                     'output_name': 'Output for step test_secrets',
                      'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                      'output_path': '/var/spool/cwl',
                      'output_ttl': 0,
                          }
                      }
                  }))
+     # The test passes no builder.resources
+     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+     def test_timelimit(self, keepdocker):
+         arv_docker_clear_cache()
+         runner = mock.MagicMock()
+         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+         runner.ignore_docker_for_reuse = False
+         runner.intermediate_output_ttl = 0
+         runner.secret_store = cwltool.secrets.SecretStore()
+         keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+         runner.api.collections().get().execute.return_value = {
+             "portable_data_hash": "99999999999999999999999999999993+99"}
+         tool = cmap({
+             "inputs": [],
+             "outputs": [],
+             "baseCommand": "ls",
+             "arguments": [{"valueFrom": "$(runtime.outdir)"}],
+             "id": "#",
+             "class": "CommandLineTool",
+             "hints": [
+                 {
+                     "class": "http://commonwl.org/cwltool#TimeLimit",
+                     "timelimit": 42
+                 }
+             ]
+         })
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_timelimit"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
+         arvtool.formatgraph = None
+         for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
+         _, kwargs = runner.api.container_requests().create.call_args
+         self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))
index 1a3f6a9c8eb06527856050528ee8de1ea3cb1bee,4473b88ca0d785dbb2eaff961bf64fd21c25c280..20efe1513981585b3c699f73d0dbba6994f7c682
@@@ -19,7 -19,6 +19,7 @@@ from schema_salad.ref_resolver import L
  from schema_salad.sourceline import cmap
  from .mock_discovery import get_rootDesc
  from .matcher import JsonDiffMatcher, StripYAMLComments
 +from .test_container import CollectionMock
  
  if not os.getenv('ARVADOS_DEBUG'):
      logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
  
  class TestJob(unittest.TestCase):
  
+     def helper(self, runner, enable_reuse=True):
+         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+         loadingContext = arvados_cwl.context.ArvLoadingContext(
+             {"avsc_names": avsc_names,
+              "basedir": "",
+              "make_fs_access": make_fs_access,
+              "loader": Loader({}),
+              "metadata": {"cwlVersion": "v1.0"},
+              "makeTool": runner.arv_make_tool})
+         runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+             {"work_api": "jobs",
+              "basedir": "",
+              "name": "test_run_job_"+str(enable_reuse),
+              "make_fs_access": make_fs_access,
+              "enable_reuse": enable_reuse,
+              "priority": 500})
+         return loadingContext, runtimeContext
      # The test passes no builder.resources
      # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
      @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
@@@ -36,7 -57,6 +58,6 @@@
              runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
              runner.ignore_docker_for_reuse = False
              runner.num_retries = 0
-             document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
  
              list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
              runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
                  "id": "#",
                  "class": "CommandLineTool"
              })
-             make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                      basedir="", make_fs_access=make_fs_access, loader=Loader({}),
-                                                      metadata={"cwlVersion": "v1.0"})
+             loadingContext, runtimeContext = self.helper(runner, enable_reuse)
+             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
              arvtool.formatgraph = None
-             for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
-                 j.run(enable_reuse=enable_reuse)
+             for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+                 j.run(runtimeContext)
                  runner.api.jobs().create.assert_called_with(
                      body=JsonDiffMatcher({
                          'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                      runner.api.links().create.side_effect = ApiError(
                          mock.MagicMock(return_value={'status': 403}),
                          'Permission denied')
-                     j.run(enable_reuse=enable_reuse)
+                     j.run(runtimeContext)
                  else:
                      assert not runner.api.links().create.called
  
          list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
          runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
  
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
          tool = {
              "inputs": [],
              "outputs": [],
              "id": "#",
              "class": "CommandLineTool"
          }
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                  make_fs_access=make_fs_access, loader=Loader({}),
-                                                  metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
-             j.run(enable_reuse=True)
+         for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
          runner.api.jobs().create.assert_called_with(
              body=JsonDiffMatcher({
                  'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                                                          {"items": []},
                                                          {"items": [{"manifest_text": "ABC"}]})
  
-         arvjob = arvados_cwl.ArvadosJob(runner)
-         arvjob.name = "testjob"
-         arvjob.builder = mock.MagicMock()
+         arvjob = arvados_cwl.ArvadosJob(runner,
+                                         mock.MagicMock(),
+                                         {},
+                                         None,
+                                         [],
+                                         [],
+                                         "testjob")
          arvjob.output_callback = mock.MagicMock()
          arvjob.collect_outputs = mock.MagicMock()
          arvjob.collect_outputs.return_value = {"out": "stuff"}
              {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
          )
  
-         arvjob = arvados_cwl.ArvadosJob(runner)
-         arvjob.name = "testjob"
-         arvjob.builder = mock.MagicMock()
+         arvjob = arvados_cwl.ArvadosJob(runner,
+                                         mock.MagicMock(),
+                                         {},
+                                         None,
+                                         [],
+                                         [],
+                                         "testjob")
          arvjob.output_callback = mock.MagicMock()
          arvjob.collect_outputs = mock.MagicMock()
          arvjob.collect_outputs.return_value = {"out": "stuff"}
  
  
  class TestWorkflow(unittest.TestCase):
+     def helper(self, runner, enable_reuse=True):
+         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+         document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
+         document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
+         document_loader.fetch_text = document_loader.fetcher.fetch_text
+         document_loader.check_exists = document_loader.fetcher.check_exists
+         loadingContext = arvados_cwl.context.ArvLoadingContext(
+             {"avsc_names": avsc_names,
+              "basedir": "",
+              "make_fs_access": make_fs_access,
+              "loader": document_loader,
+              "metadata": {"cwlVersion": "v1.0"},
+              "construct_tool_object": runner.arv_make_tool})
+         runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+             {"work_api": "jobs",
+              "basedir": "",
+              "name": "test_run_wf_"+str(enable_reuse),
+              "make_fs_access": make_fs_access,
+              "enable_reuse": enable_reuse,
+              "priority": 500})
+         return loadingContext, runtimeContext
      # The test passes no builder.resources
      # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
      @mock.patch("arvados.collection.CollectionReader")
          runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
          runner.ignore_docker_for_reuse = False
          runner.num_retries = 0
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
  
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
-         document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
-         document_loader.fetch_text = document_loader.fetcher.fetch_text
-         document_loader.check_exists = document_loader.fetcher.check_exists
+         loadingContext, runtimeContext = self.helper(runner)
  
-         tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
+         tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
          metadata["cwlVersion"] = tool["cwlVersion"]
  
 -        mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
 +        mockc = mock.MagicMock()
 +        mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
+         mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
  
-         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                               basedir="", make_fs_access=make_fs_access, loader=document_loader,
-                                               makeTool=runner.arv_make_tool, metadata=metadata)
+         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
-         it.next().run()
-         it.next().run()
+         it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+         it.next().run(runtimeContext)
+         it.next().run(runtimeContext)
  
          with open("tests/wf/scatter2_subwf.cwl") as f:
              subwf = StripYAMLComments(f.read())
                          'HOME': '$(task.outdir)',
                          'TMPDIR': '$(task.tmpdir)'},
                                 'task.vwd': {
 -                                   'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
 -                                   'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
 +                                   'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
 +                                   'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
                                 },
                      'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
                      'task.stdout': 'cwl.output.json'}]},
                       ['docker_image_locator', 'in docker', 'arvados/jobs']],
              find_or_create=True)
  
 -        mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
 -        mockcollection().open().__enter__().write.assert_has_calls([mock.call(
 +        mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
 +        mockc.open().__enter__().write.assert_has_calls([mock.call(
  '''{
    "fileblub": {
      "basename": "token.txt",
      "class": "File",
-     "location": "/keep/99999999999999999999999999999999+118/token.txt"
+     "location": "/keep/99999999999999999999999999999999+118/token.txt",
+     "size": 0
    },
    "sleeptime": 5
  }''')])
          runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
          runner.ignore_docker_for_reuse = False
          runner.num_retries = 0
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
  
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
-         document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
-         document_loader.fetch_text = document_loader.fetcher.fetch_text
-         document_loader.check_exists = document_loader.fetcher.check_exists
+         loadingContext, runtimeContext = self.helper(runner)
  
-         tool, metadata = document_loader.resolve_ref("tests/wf/echo-wf.cwl")
+         tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
          metadata["cwlVersion"] = tool["cwlVersion"]
  
 -        mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
 +        mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
  
-         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                               basedir="", make_fs_access=make_fs_access, loader=document_loader,
-                                               makeTool=runner.arv_make_tool, metadata=metadata)
+         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
-         it.next().run()
-         it.next().run()
+         it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+         it.next().run(runtimeContext)
+         it.next().run(runtimeContext)
  
          with open("tests/wf/echo-subwf.cwl") as f:
              subwf = StripYAMLComments(f.read())
                          'HOME': '$(task.outdir)',
                          'TMPDIR': '$(task.tmpdir)'},
                                 'task.vwd': {
 -                                   'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
 -                                   'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
 +                                   'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
 +                                   'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
                                 },
                      'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
                      'task.stdout': 'cwl.output.json'}]},
index 246d80e506146c3e689ee05bc023c136323294af,d980db575dd8d6e3db1ac3dbb0f7709cb14a894e..8875b7d954d916e332175b5dbdd67103833719b1
@@@ -132,7 -132,8 +132,8 @@@ def stubs(func)
                      "listing": [{
                          "basename": "renamed.txt",
                          "class": "File",
-                         "location": "keep:99999999999999999999999999999998+99/file1.txt"
+                         "location": "keep:99999999999999999999999999999998+99/file1.txt",
+                         "size": 0
                      }],
                      'class': 'Directory'
                  },
                                    {
                                        'basename': 'renamed.txt',
                                        'class': 'File', 'location':
-                                       'keep:99999999999999999999999999999998+99/file1.txt'
+                                       'keep:99999999999999999999999999999998+99/file1.txt',
+                                       'size': 0
                                    }
                                ]}},
                          'cwl:tool': '3fffdeaa75e018172e1b583425f4ebff+60/workflow.cwl#main',
                          'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
                              {'basename': 'renamed.txt',
                               'class': 'File',
-                              'location': 'keep:99999999999999999999999999999998+99/file1.txt'
+                              'location': 'keep:99999999999999999999999999999998+99/file1.txt',
+                              'size': 0
                              }
                          ]}
                      },
@@@ -286,14 -289,14 +289,14 @@@ class TestSubmit(unittest.TestCase)
                  'manifest_text':
                  '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
                  'replication_desired': None,
 -                'name': 'submit_tool.cwl dependencies',
 -            }), ensure_unique_name=True),
 +                'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
 +            }), ensure_unique_name=False),
              mock.call(body=JsonDiffMatcher({
                  'manifest_text':
                  '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
                  'replication_desired': None,
 -                'name': 'submit_wf.cwl input',
 -            }), ensure_unique_name=True),
 +                'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
 +            }), ensure_unique_name=False),
              mock.call(body=JsonDiffMatcher({
                  'manifest_text':
                  '. 61df2ed9ee3eb7dd9b799e5ca35305fa+1217 0:1217:workflow.cwl\n',
          self.assertEqual(capture_stdout.getvalue(),
                           stubs.expect_pipeline_uuid + '\n')
  
+     @stubs
+     def test_error_when_multiple_storage_classes_specified(self, stubs):
+         storage_classes = "foo,bar"
+         exited = arvados_cwl.main(
+                 ["--debug", "--storage-classes", storage_classes,
+                  "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                 sys.stdin, sys.stderr, api_client=stubs.api)
+         self.assertEqual(exited, 1)
      @mock.patch("time.sleep")
      @stubs
      def test_submit_on_error(self, stubs, tm):
                  'manifest_text':
                  '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
                  'replication_desired': None,
 -                'name': 'submit_tool.cwl dependencies',
 -            }), ensure_unique_name=True),
 +                'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
 +            }), ensure_unique_name=False),
              mock.call(body=JsonDiffMatcher({
                  'manifest_text':
                  '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
                  'replication_desired': None,
 -                'name': 'submit_wf.cwl input',
 -            }), ensure_unique_name=True)])
 +                'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
 +            }), ensure_unique_name=False)])
  
          expect_container = copy.deepcopy(stubs.expect_container_spec)
          stubs.api.container_requests().create.assert_called_with(
          self.assertEqual(capture_stdout.getvalue(),
                           stubs.expect_container_request_uuid + '\n')
  
+     @stubs
+     def test_submit_storage_classes(self, stubs):
+         capture_stdout = cStringIO.StringIO()
+         try:
+             exited = arvados_cwl.main(
+                 ["--debug", "--submit", "--no-wait", "--api=containers", "--storage-classes=foo",
+                  "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                 capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+             self.assertEqual(exited, 0)
+         except:
+             logging.exception("")
+         expect_container = copy.deepcopy(stubs.expect_container_spec)
+         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+                                        '--no-log-timestamps', '--disable-validate',
+                                        '--eval-timeout=20', '--thread-count=4',
+                                        '--enable-reuse', "--debug",
+                                        "--storage-classes=foo", '--on-error=continue',
+                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+         stubs.api.container_requests().create.assert_called_with(
+             body=JsonDiffMatcher(expect_container))
+         self.assertEqual(capture_stdout.getvalue(),
+                          stubs.expect_container_request_uuid + '\n')
+     @mock.patch("arvados_cwl.task_queue.TaskQueue")
+     @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
+     @mock.patch("arvados_cwl.ArvCwlRunner.make_output_collection", return_value = (None, None))
+     @stubs
+     def test_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
+         def set_final_output(job_order, output_callback, runtimeContext):
+             output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+             return []
+         job.side_effect = set_final_output
+         try:
+             exited = arvados_cwl.main(
+                 ["--debug", "--local", "--storage-classes=foo",
+                  "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                 sys.stdin, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+             self.assertEqual(exited, 0)
+         except:
+             logging.exception("")
+         make_output.assert_called_with(u'Output of submit_wf.cwl', ['foo'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
+     @mock.patch("arvados_cwl.task_queue.TaskQueue")
+     @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
+     @mock.patch("arvados_cwl.ArvCwlRunner.make_output_collection", return_value = (None, None))
+     @stubs
+     def test_default_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
+         def set_final_output(job_order, output_callback, runtimeContext):
+             output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+             return []
+         job.side_effect = set_final_output
+         try:
+             exited = arvados_cwl.main(
+                 ["--debug", "--local",
+                  "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                 sys.stdin, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+             self.assertEqual(exited, 0)
+         except:
+             logging.exception("")
+         make_output.assert_called_with(u'Output of submit_wf.cwl', ['default'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
  
      @stubs
      def test_submit_container_output_ttl(self, stubs):
      @stubs
      def test_submit_file_keepref(self, stubs, tm, collectionReader):
          capture_stdout = cStringIO.StringIO()
+         collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "blorp.txt")
          exited = arvados_cwl.main(
              ["--submit", "--no-wait", "--api=containers", "--debug",
               "tests/wf/submit_keepref_wf.cwl"],
                           stubs.expect_container_request_uuid + '\n')
  
  
+     @stubs
+     def test_submit_wf_runner_resources(self, stubs):
+         capture_stdout = cStringIO.StringIO()
+         try:
+             exited = arvados_cwl.main(
+                 ["--submit", "--no-wait", "--api=containers", "--debug",
+                  "tests/wf/submit_wf_runner_resources.cwl", "tests/submit_test_job.json"],
+                 capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+             self.assertEqual(exited, 0)
+         except:
+             logging.exception("")
+         expect_container = copy.deepcopy(stubs.expect_container_spec)
+         expect_container["runtime_constraints"] = {
+             "API": True,
+             "vcpus": 2,
+             "ram": 2000 * 2**20
+         }
+         expect_container["name"] = "submit_wf_runner_resources.cwl"
+         expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
+             {
+                 "class": "http://arvados.org/cwl#WorkflowRunnerResources",
+                 "coresMin": 2,
+                 "ramMin": 2000
+             }
+         ]
+         expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["$namespaces"] = {
+             "arv": "http://arvados.org/cwl#",
+         }
+         stubs.api.container_requests().create.assert_called_with(
+             body=JsonDiffMatcher(expect_container))
+         self.assertEqual(capture_stdout.getvalue(),
+                          stubs.expect_container_request_uuid + '\n')
      @mock.patch("arvados.commands.keepdocker.find_one_image_hash")
      @mock.patch("cwltool.docker.DockerCommandLineJob.get_image")
      @mock.patch("arvados.api")
index 5063d75f2a04b22545e7d2589d189139bd6378dc,c4748fa995759ef0cc934b699a14523f8a3181f8..96f5bdd44a12ae42c25fbe64f68b342cb0356fcf
@@@ -1,19 -1,6 +1,19 @@@
  # Copyright (C) The Arvados Authors. All rights reserved.
 +# Copyright (C) 2018 Genome Research Ltd.
  #
  # SPDX-License-Identifier: Apache-2.0
 +#
 +# Licensed under the Apache License, Version 2.0 (the "License");
 +# you may not use this file except in compliance with the License.
 +# You may obtain a copy of the License at
 +#
 +#    http://www.apache.org/licenses/LICENSE-2.0
 +#
 +# Unless required by applicable law or agreed to in writing, software
 +# distributed under the License is distributed on an "AS IS" BASIS,
 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +# See the License for the specific language governing permissions and
 +# limitations under the License.
  
  from __future__ import print_function
  from __future__ import absolute_import
@@@ -35,7 -22,6 +35,7 @@@ import sy
  import errno
  import arvados.commands._util as arv_cmd
  import arvados.collection
 +import arvados.config as config
  
  from arvados._version import __version__
  
@@@ -150,20 -136,21 +150,21 @@@ def statfile(prefix, fn, fnPattern="$(f
  
      return prefix+fn
  
- def write_file(collection, pathprefix, fn):
+ def write_file(collection, pathprefix, fn, flush=False):
      with open(os.path.join(pathprefix, fn)) as src:
          dst = collection.open(fn, "w")
          r = src.read(1024*128)
          while r:
              dst.write(r)
              r = src.read(1024*128)
-         dst.close(flush=False)
+         dst.close(flush=flush)
  
  def uploadfiles(files, api, dry_run=False, num_retries=0,
                  project=None,
                  fnPattern="$(file %s/%s)",
                  name=None,
-                 collection=None):
+                 collection=None,
+                 packed=True):
      # Find the smallest path prefix that includes all the files that need to be uploaded.
      # This starts at the root and iteratively removes common parent directory prefixes
      # until all file paths no longer have a common parent.
                  continue
              prev = localpath
              if os.path.isfile(localpath):
-                 write_file(collection, pathprefix, f.fn)
+                 write_file(collection, pathprefix, f.fn, not packed)
              elif os.path.isdir(localpath):
                  for root, dirs, iterfiles in os.walk(localpath):
                      root = root[len(pathprefix):]
                      for src in iterfiles:
-                         write_file(collection, pathprefix, os.path.join(root, src))
+                         write_file(collection, pathprefix, os.path.join(root, src), not packed)
  
 -        filters=[["portable_data_hash", "=", collection.portable_data_hash()]]
 -        if name:
 -            filters.append(["name", "like", name+"%"])
 -        if project:
 -            filters.append(["owner_uuid", "=", project])
 +        pdh = None
 +        if len(collection) > 0:
 +            # non-empty collection
 +            filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
 +            name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
 +            if name:
 +                filters.append(["name", "=", name_pdh])
 +            if project:
 +                filters.append(["owner_uuid", "=", project])
  
 -        exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
 +            # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
 +            # and there is a potential race with other workflows that may have created the collection
 +            # between when we list it and find it does not exist and when we attempt to create it.
 +            tries = 2
 +            while pdh is None and tries > 0:
 +                exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
  
 -        if exists["items"]:
 -            item = exists["items"][0]
 -            pdh = item["portable_data_hash"]
 -            logger.info("Using collection %s (%s)", pdh, item["uuid"])
 -        elif len(collection) > 0:
 -            collection.save_new(name=name, owner_uuid=project, ensure_unique_name=True)
 +                if exists["items"]:
 +                    item = exists["items"][0]
 +                    pdh = item["portable_data_hash"]
 +                    logger.info("Using collection %s (%s)", pdh, item["uuid"])
 +                else:
 +                    try:
 +                        collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False)
 +                        pdh = collection.portable_data_hash()
 +                        logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
 +                    except arvados.errors.ApiError as ae:
 +                        tries -= 1
 +            if pdh is None:
 +                # Something weird going on here, probably a collection
 +                # with a conflicting name but wrong PDH.  We won't
 +                # able to reuse it but we still need to save our
 +                # collection, so so save it with unique name.
 +                logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh)
 +                collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True)
 +                pdh = collection.portable_data_hash()
 +                logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
 +        else:
 +            # empty collection
              pdh = collection.portable_data_hash()
 -            logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
 +            assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh
 +            logger.info("Using empty collection %s", pdh)
  
      for c in files:
          c.keepref = "%s/%s" % (pdh, c.fn)