Merge branch 'master' of git.curoverse.com:arvados into 13330-cwl-intermediate-collec...
authorFuad Muhic <fmuhic@capeannenterprises.com>
Thu, 5 Jul 2018 15:36:24 +0000 (17:36 +0200)
committerFuad Muhic <fmuhic@capeannenterprises.com>
Thu, 5 Jul 2018 15:36:24 +0000 (17:36 +0200)
Arvados-DCO-1.1-Signed-off-by: Fuad Muhic <fmuhic@capeannenterprises.com>

1  2 
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_job.py

index a7f160c6c307ae0432e968c8a27eb9aef20a345f,4ebcefb1365029a9f70665cbddedc71ffa8172bc..03f5a5eb678201f9610cf5fd76e085260695b742
@@@ -14,13 -14,13 +14,14 @@@ import uui
  import ruamel.yaml as yaml
  
  from cwltool.errors import WorkflowException
- from cwltool.process import get_feature, UnsupportedRequirement, shortname
+ from cwltool.process import UnsupportedRequirement, shortname
  from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
  from cwltool.utils import aslist
+ from cwltool.job import JobBase
  
  import arvados.collection
  
 +from arvados.errors import ApiError
  from .arvdocker import arv_docker_get_image
  from . import done
  from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
@@@ -31,10 -31,18 +32,18 @@@ from .perf import Per
  logger = logging.getLogger('arvados.cwl-runner')
  metrics = logging.getLogger('arvados.cwl-runner.metrics')
  
- class ArvadosContainer(object):
+ class ArvadosContainer(JobBase):
      """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
  
-     def __init__(self, runner):
+     def __init__(self, runner,
+                  builder,   # type: Builder
+                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
+                  make_path_mapper,  # type: Callable[..., PathMapper]
+                  requirements,      # type: List[Dict[Text, Text]]
+                  hints,     # type: List[Dict[Text, Text]]
+                  name       # type: Text
+     ):
+         super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
          self.arvrunner = runner
          self.running = False
          self.uuid = None
@@@ -42,7 -50,7 +51,7 @@@
      def update_pipeline_component(self, r):
          pass
  
-     def run(self, dry_run=False, pull_image=True, **kwargs):
+     def run(self, runtimeContext):
          # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
          # which calls makeJobRunner() to get a new ArvadosContainer
          # object.  The fields that define execution such as
@@@ -55,7 -63,7 +64,7 @@@
              "name": self.name,
              "output_path": self.outdir,
              "cwd": self.outdir,
-             "priority": kwargs.get("priority"),
+             "priority": runtimeContext.priority,
              "state": "Committed",
              "properties": {},
          }
  
                  keepemptydirs(vwd)
  
 -                with Perf(metrics, "generatefiles.save_new %s" % self.name):
 -                    vwd.save_new()
 +                info = self._get_intermediate_collection_info()
 +                vwd.save_new(name=info["name"],
 +                             ensure_unique_name=True,
 +                             trash_at=info["trash_at"],
 +                             properties=info["properties"])
  
                  prev = None
                  for f, p in sorteditems:
              mounts["stdout"] = {"kind": "file",
                                  "path": "%s/%s" % (self.outdir, self.stdout)}
  
-         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
          if not docker_req:
              docker_req = {"dockerImageId": "arvados/jobs"}
  
          container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
                                                                       docker_req,
-                                                                      pull_image,
+                                                                      runtimeContext.pull_image,
                                                                       self.arvrunner.project_uuid)
  
-         api_req, _ = get_feature(self, "http://arvados.org/cwl#APIRequirement")
+         api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
          if api_req:
              runtime_constraints["API"] = True
  
-         runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
+         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
          if runtime_req:
              if "keep_cache" in runtime_req:
                  runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
                          "writable": True
                      }
  
-         partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
+         partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
          if partition_req:
              scheduling_parameters["partitions"] = aslist(partition_req["partition"])
  
-         intermediate_output_req, _ = get_feature(self, "http://arvados.org/cwl#IntermediateOutput")
+         intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
          if intermediate_output_req:
              self.output_ttl = intermediate_output_req["outputTTL"]
          else:
          if self.output_ttl < 0:
              raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
  
+         if self.timelimit is not None:
+             scheduling_parameters["max_run_time"] = self.timelimit
          container_request["output_ttl"] = self.output_ttl
          container_request["mounts"] = mounts
          container_request["secret_mounts"] = secret_mounts
          container_request["runtime_constraints"] = runtime_constraints
          container_request["scheduling_parameters"] = scheduling_parameters
  
-         enable_reuse = kwargs.get("enable_reuse", True)
+         enable_reuse = runtimeContext.enable_reuse
          if enable_reuse:
-             reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement")
+             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
              if reuse_req:
                  enable_reuse = reuse_req["enableReuse"]
          container_request["use_existing"] = enable_reuse
  
-         if kwargs.get("runnerjob", "").startswith("arvwf:"):
-             wfuuid = kwargs["runnerjob"][6:kwargs["runnerjob"].index("#")]
+         if runtimeContext.runnerjob.startswith("arvwf:"):
+             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
              wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
              if container_request["name"] == "main":
                  container_request["name"] = wfrecord["name"]
          self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
  
          try:
-             if kwargs.get("submit_request_uuid"):
+             if runtimeContext.submit_request_uuid:
                  response = self.arvrunner.api.container_requests().update(
-                     uuid=kwargs["submit_request_uuid"],
+                     uuid=runtimeContext.submit_request_uuid,
                      body=container_request
                  ).execute(num_retries=self.arvrunner.num_retries)
              else:
                                                             api_client=self.arvrunner.api,
                                                             keep_client=self.arvrunner.keep_client,
                                                             num_retries=self.arvrunner.num_retries)
-                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
+                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
  
              if record["output_uuid"]:
                  if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
          finally:
              self.output_callback(outputs, processStatus)
  
 +    def _get_intermediate_collection_info(self):
 +            trash_time = None
 +            if self.arvrunner.intermediate_output_ttl > 0:
 +                trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
 +
 +            current_container_uuid = None
 +            try:
 +                current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
 +                current_container_uuid = current_container['uuid']
 +            except ApiError as e:
 +                # Status code 404 just means we're not running in a container.
 +                if e.resp.status != 404:
 +                    logger.info("Getting current container: %s", e)
 +            props = {"type": "Intermediate",
 +                          "container": current_container_uuid}
 +
 +            return {"name" : "Intermediate collection",
 +                    "trash_at" : trash_time,
 +                    "properties" : props}
 +
  
  class RunnerContainer(Runner):
      """Submit and manage a container that runs arvados-cwl-runner."""
  
-     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+     def arvados_job_spec(self, runtimeContext):
          """Create an Arvados container request for this workflow.
  
          The returned dict can be used to create a container passed as
          if self.output_tags:
              command.append("--output-tags=" + self.output_tags)
  
-         if kwargs.get("debug"):
+         if runtimeContext.debug:
              command.append("--debug")
  
+         if runtimeContext.storage_classes != "default":
+             command.append("--storage-classes=" + runtimeContext.storage_classes)
          if self.on_error:
              command.append("--on-error=" + self.on_error)
  
          return container_req
  
  
-     def run(self, **kwargs):
-         kwargs["keepprefix"] = "keep:"
-         job_spec = self.arvados_job_spec(**kwargs)
+     def run(self, runtimeContext):
+         runtimeContext.keepprefix = "keep:"
+         job_spec = self.arvados_job_spec(runtimeContext)
          if self.arvrunner.project_uuid:
              job_spec["owner_uuid"] = self.arvrunner.project_uuid
  
-         if kwargs.get("submit_request_uuid"):
+         if runtimeContext.submit_request_uuid:
              response = self.arvrunner.api.container_requests().update(
-                 uuid=kwargs["submit_request_uuid"],
+                 uuid=runtimeContext.submit_request_uuid,
                  body=job_spec
              ).execute(num_retries=self.arvrunner.num_retries)
          else:
index 332c6a7b9fb3383c13ce23e606a4243fe286d7a5,70c2173db9fa2f7ff5054ff4be7252bd64156b67..2d112c87a7c5affa2b6d490e526ae1e2d2aea28b
@@@ -6,15 -6,15 +6,16 @@@ import loggin
  import re
  import copy
  import json
 +import datetime
  import time
  
- from cwltool.process import get_feature, shortname, UnsupportedRequirement
+ from cwltool.process import shortname, UnsupportedRequirement
  from cwltool.errors import WorkflowException
  from cwltool.command_line_tool import revmap_file, CommandLineTool
  from cwltool.load_tool import fetch_document
  from cwltool.builder import Builder
  from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
+ from cwltool.job import JobBase
  
  from schema_salad.sourceline import SourceLine
  
@@@ -37,15 -37,23 +38,23 @@@ crunchrunner_re = re.compile(r"^.*crunc
  
  crunchrunner_git_commit = 'a3f2cb186e437bfce0031b024b2157b73ed2717d'
  
- class ArvadosJob(object):
+ class ArvadosJob(JobBase):
      """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
  
-     def __init__(self, runner):
+     def __init__(self, runner,
+                  builder,   # type: Builder
+                  joborder,  # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
+                  make_path_mapper,  # type: Callable[..., PathMapper]
+                  requirements,      # type: List[Dict[Text, Text]]
+                  hints,     # type: List[Dict[Text, Text]]
+                  name       # type: Text
+     ):
+         super(ArvadosJob, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
          self.arvrunner = runner
          self.running = False
          self.uuid = None
  
-     def run(self, dry_run=False, pull_image=True, **kwargs):
+     def run(self, runtimeContext):
          script_parameters = {
              "command": self.command_line
          }
  
                  if vwd:
                      with Perf(metrics, "generatefiles.save_new %s" % self.name):
 -                        vwd.save_new()
 +                        info = self._get_intermediate_collection_info()
 +                        vwd.save_new(name=info["name"],
 +                                     ensure_unique_name=True,
 +                                     trash_at=info["trash_at"],
 +                                     properties=info["properties"])
  
                  for f, p in generatemapper.items():
                      if p.type == "File":
              script_parameters["task.permanentFailCodes"] = self.permanentFailCodes
  
          with Perf(metrics, "arv_docker_get_image %s" % self.name):
-             (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
-             if docker_req and kwargs.get("use_container") is not False:
+             (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
+             if docker_req and runtimeContext.use_container is not False:
                  if docker_req.get("dockerOutputDirectory"):
                      raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                          "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
-                 runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+                 runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api,
+                                                                            docker_req,
+                                                                            runtimeContext.pull_image,
+                                                                            self.arvrunner.project_uuid)
              else:
                  runtime_constraints["docker_image"] = "arvados/jobs"
  
              runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
              runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
  
-         runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
+         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
          if runtime_req:
              if "keep_cache" in runtime_req:
                  runtime_constraints["keep_cache_mb_per_task"] = runtime_req["keep_cache"]
          if not self.arvrunner.ignore_docker_for_reuse:
              filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
  
-         enable_reuse = kwargs.get("enable_reuse", True)
+         enable_reuse = runtimeContext.enable_reuse
          if enable_reuse:
-             reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement")
+             reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
              if reuse_req:
                  enable_reuse = reuse_req["enableReuse"]
  
                                  dirs[g.group(1)] = g.group(2)
  
                      if processStatus == "permanentFail":
-                         done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
+                         done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
  
                      with Perf(metrics, "output collection %s" % self.name):
                          outputs = done.done(self, record, dirs["tmpdir"],
          finally:
              self.output_callback(outputs, processStatus)
  
 +    def _get_intermediate_collection_info(self):
 +            trash_time = None
 +            if self.arvrunner.intermediate_output_ttl > 0:
 +                trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
 +
 +            current_container_uuid = None
 +            try:
 +                current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
 +                current_container_uuid = current_container['uuid']
 +            except ApiError as e:
 +                # Status code 404 just means we're not running in a container.
 +                if e.resp.status != 404:
 +                    logger.info("Getting current container: %s", e)
 +            props = {"type": "Intermediate",
 +                          "container": current_container_uuid}
 +
 +            return {"name" : "Intermediate collection",
 +                    "trash_at" : trash_time,
 +                    "properties" : props}
 +
  
  class RunnerJob(Runner):
      """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
  
-     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+     def arvados_job_spec(self, debug=False):
          """Create an Arvados job specification for this workflow.
  
          The returned dict can be used to create a job (i.e., passed as
          if self.on_error:
              self.job_order["arv:on_error"] = self.on_error
  
-         if kwargs.get("debug"):
+         if debug:
              self.job_order["arv:debug"] = True
  
          return {
              }
          }
  
-     def run(self, **kwargs):
-         job_spec = self.arvados_job_spec(**kwargs)
+     def run(self, runtimeContext):
+         job_spec = self.arvados_job_spec(runtimeContext.debug)
  
          job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
  
              body=instance_spec).execute(num_retries=self.arvrunner.num_retries)
          logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
  
-         if kwargs.get("wait") is False:
+         if runtimeContext.wait is False:
              self.uuid = self.arvrunner.pipeline["uuid"]
              return
  
index 76676944358da571e4a5f61c9ab02995fbe49e21,05a358e0d57a44f26cf6a4ec26c9d1bd35200163..9a76f81da54bbd1131ade82be8adda9e5d21d2a5
@@@ -7,14 -7,12 +7,14 @@@ import loggin
  import uuid
  import os
  import urllib
 +import datetime
  
  import arvados.commands.run
  import arvados.collection
  
  from schema_salad.sourceline import SourceLine
  
 +from arvados.errors import ApiError
  from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
  from cwltool.workflow import WorkflowException
  
@@@ -44,7 -42,7 +44,7 @@@ class ArvPathMapper(PathMapper)
      pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
  
      def __init__(self, arvrunner, referenced_files, input_basedir,
-                  collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
+                  collection_pattern, file_pattern, name=None, single_collection=False):
          self.arvrunner = arvrunner
          self.input_basedir = input_basedir
          self.collection_pattern = collection_pattern
                  for l in srcobj.get("listing", []):
                      self.addentry(l, c, ".", remap)
  
 -                check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
 -                if not check["items"]:
 -                    c.save_new(owner_uuid=self.arvrunner.project_uuid)
 +                info = self._get_intermediate_collection_info()
 +
 +                c.save_new(name=info["name"],
 +                           owner_uuid=self.arvrunner.project_uuid,
 +                           ensure_unique_name=True,
 +                           trash_at=info["trash_at"],
 +                           properties=info["properties"])
  
                  ab = self.collection_pattern % c.portable_data_hash()
                  self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
                                                    num_retries=self.arvrunner.num_retries                                                  )
                  self.addentry(srcobj, c, ".", remap)
  
 -                check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
 -                if not check["items"]:
 -                    c.save_new(owner_uuid=self.arvrunner.project_uuid)
 +                info = self._get_intermediate_collection_info()
 +
 +                c.save_new(name=info["name"],
 +                           owner_uuid=self.arvrunner.project_uuid,
 +                           ensure_unique_name=True,
 +                           trash_at=info["trash_at"],
 +                           properties=info["properties"])
  
                  ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
                  self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
          else:
              return None
  
 +    def _get_intermediate_collection_info(self):
 +            trash_time = None
 +            if self.arvrunner.intermediate_output_ttl > 0:
 +                trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
 +
 +            current_container_uuid = None
 +            try:
 +                current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
 +                current_container_uuid = current_container['uuid']
 +            except ApiError as e:
 +                # Status code 404 just means we're not running in a container.
 +                if e.resp.status != 404:
 +                    logger.info("Getting current container: %s", e)
 +            props = {"type": "Intermediate",
 +                          "container": current_container_uuid}
 +
 +            return {"name" : "Intermediate collection",
 +                    "trash_at" : trash_time,
 +                    "properties" : props}
 +
 +
  class StagingPathMapper(PathMapper):
      _follow_dirs = True
  
index a8a91efd3ad01432a281a9b14a107ebc2e7a88d2,dd484690c14ebb11caeae501da978f81b0e24abd..bd5b1a1b613850995ecdc36cb741d81c8b13766e
@@@ -3,12 -3,12 +3,13 @@@
  # SPDX-License-Identifier: Apache-2.0
  
  import arvados_cwl
+ import arvados_cwl.context
  from arvados_cwl.arvdocker import arv_docker_clear_cache
  import logging
  import mock
  import unittest
  import os
 +import datetime
  import functools
  import cwltool.process
  import cwltool.secrets
@@@ -21,15 -21,31 +22,37 @@@ if not os.getenv('ARVADOS_DEBUG')
      logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
      logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
  
 +class MockDateTime(datetime.datetime):
 +    @classmethod
 +    def now(cls):
 +        return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
 +
 +datetime.datetime = MockDateTime
  
  class TestContainer(unittest.TestCase):
  
+     def helper(self, runner, enable_reuse=True):
+         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+         loadingContext = arvados_cwl.context.ArvLoadingContext(
+             {"avsc_names": avsc_names,
+              "basedir": "",
+              "make_fs_access": make_fs_access,
+              "loader": Loader({}),
+              "metadata": {"cwlVersion": "v1.0"}})
+         runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+             {"work_api": "containers",
+              "basedir": "",
+              "name": "test_run_"+str(enable_reuse),
+              "make_fs_access": make_fs_access,
+              "tmpdir": "/tmp",
+              "enable_reuse": enable_reuse,
+              "priority": 500})
+         return loadingContext, runtimeContext
      # The test passes no builder.resources
      # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
      @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@@@ -47,8 -63,6 +70,6 @@@
              runner.api.collections().get().execute.return_value = {
                  "portable_data_hash": "99999999999999999999999999999993+99"}
  
-             document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
              tool = cmap({
                  "inputs": [],
                  "outputs": [],
                  "id": "#",
                  "class": "CommandLineTool"
              })
-             make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                      basedir="", make_fs_access=make_fs_access, loader=Loader({}),
-                                                      metadata={"cwlVersion": "v1.0"})
+             loadingContext, runtimeContext = self.helper(runner, enable_reuse)
+             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
              arvtool.formatgraph = None
-             for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse),
-                                  make_fs_access=make_fs_access, tmpdir="/tmp"):
-                 j.run(enable_reuse=enable_reuse, priority=500)
+             for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+                 j.run(runtimeContext)
                  runner.api.container_requests().create.assert_called_with(
                      body=JsonDiffMatcher({
                          'environment': {
          runner.intermediate_output_ttl = 3600
          runner.secret_store = cwltool.secrets.SecretStore()
  
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
          keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
          runner.api.collections().get().execute.return_value = {
              "portable_data_hash": "99999999999999999999999999999993+99"}
              "id": "#",
              "class": "CommandLineTool"
          })
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
-                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
-                                                  loader=Loader({}), metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_resource_requirements"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements",
-                              make_fs_access=make_fs_access, tmpdir="/tmp"):
-             j.run(enable_reuse=True, priority=500)
+         for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
  
          call_args, call_kwargs = runner.api.container_requests().create.call_args
  
          runner.intermediate_output_ttl = 0
          runner.secret_store = cwltool.secrets.SecretStore()
  
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
          keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
          runner.api.collections().get().execute.return_value = {
              "portable_data_hash": "99999999999999999999999999999993+99"}
              "id": "#",
              "class": "CommandLineTool"
          })
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
-                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
-                                                  loader=Loader({}), metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_initial_work_dir"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir",
-                              make_fs_access=make_fs_access, tmpdir="/tmp"):
-             j.run(priority=500)
+         for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
  
          call_args, call_kwargs = runner.api.container_requests().create.call_args
  
              "id": "#",
              "class": "CommandLineTool"
          })
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                  basedir="", make_fs_access=make_fs_access, loader=Loader({}),
-                                                  metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_run_redirect"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
-                              make_fs_access=make_fs_access, tmpdir="/tmp"):
-             j.run(priority=500)
+         for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
              runner.api.container_requests().create.assert_called_with(
                  body=JsonDiffMatcher({
                      'environment': {
  
          col().open.return_value = []
  
-         arvjob = arvados_cwl.ArvadosContainer(runner)
-         arvjob.name = "testjob"
-         arvjob.builder = mock.MagicMock()
+         arvjob = arvados_cwl.ArvadosContainer(runner,
+                                               mock.MagicMock(),
+                                               {},
+                                               None,
+                                               [],
+                                               [],
+                                               "testjob")
          arvjob.output_callback = mock.MagicMock()
          arvjob.collect_outputs = mock.MagicMock()
          arvjob.successCodes = [0]
              "id": "#",
              "class": "CommandLineTool"
          })
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                      collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                  basedir="", make_fs_access=make_fs_access, loader=Loader({}),
-                                                  metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_run_mounts"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
          job_order = {
              "p1": {
                  ]
              }
          }
-         for j in arvtool.job(job_order, mock.MagicMock(), basedir="", name="test_run_mounts",
-                              make_fs_access=make_fs_access, tmpdir="/tmp"):
-             j.run(priority=500)
+         for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
              runner.api.container_requests().create.assert_called_with(
                  body=JsonDiffMatcher({
                      'environment': {
                               ]
                           }
                       ]})
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                      collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                  basedir="", make_fs_access=make_fs_access, loader=Loader({}),
-                                                  metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_secrets"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
  
          job_order = {"pw": "blorp"}
          runner.secret_store.store(["pw"], job_order)
  
-         for j in arvtool.job(job_order, mock.MagicMock(), basedir="", name="test_secrets",
-                              make_fs_access=make_fs_access, tmpdir="/tmp"):
-             j.run(enable_reuse=True, priority=500)
+         for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
              runner.api.container_requests().create.assert_called_with(
                  body=JsonDiffMatcher({
                      'environment': {
                      }
                  }))
  
+     # The test passes no builder.resources
+     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+     def test_timelimit(self, keepdocker):
+         arv_docker_clear_cache()
+         runner = mock.MagicMock()
+         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+         runner.ignore_docker_for_reuse = False
+         runner.intermediate_output_ttl = 0
+         runner.secret_store = cwltool.secrets.SecretStore()
+         keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+         runner.api.collections().get().execute.return_value = {
+             "portable_data_hash": "99999999999999999999999999999993+99"}
+         tool = cmap({
+             "inputs": [],
+             "outputs": [],
+             "baseCommand": "ls",
+             "arguments": [{"valueFrom": "$(runtime.outdir)"}],
+             "id": "#",
+             "class": "CommandLineTool",
+             "hints": [
+                 {
+                     "class": "http://commonwl.org/cwltool#TimeLimit",
+                     "timelimit": 42
+                 }
+             ]
+         })
+         loadingContext, runtimeContext = self.helper(runner)
+         runtimeContext.name = "test_timelimit"
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
+         arvtool.formatgraph = None
+         for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
+         _, kwargs = runner.api.container_requests().create.call_args
+         self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))
++
++
 +    def test_get_intermediate_collection_info(self):
 +        arvrunner = mock.MagicMock()
 +        arvrunner.intermediate_output_ttl = 60
 +        arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
 +
 +        container = arvados_cwl.ArvadosContainer(arvrunner)
 +
 +        info = container._get_intermediate_collection_info()
 +
 +        self.assertEqual(info["name"], "Intermediate collection")
 +        self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
 +        self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
index 1841fd3f89ea7a89aa1ae246f4f0dd5d305c9532,c110bc5d53cd4634656d93fab2937954be973d07..b9f1c396172b6d32e0b22423878ed2b0c78696cf
@@@ -10,7 -10,6 +10,7 @@@ import o
  import unittest
  import copy
  import StringIO
 +import datetime
  
  import arvados
  import arvados_cwl
@@@ -25,15 -24,30 +25,37 @@@ if not os.getenv('ARVADOS_DEBUG')
      logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
      logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
  
 +class MockDateTime(datetime.datetime):
 +    @classmethod
 +    def now(cls):
 +        return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
 +
 +datetime.datetime = MockDateTime
 +
  class TestJob(unittest.TestCase):
  
+     def helper(self, runner, enable_reuse=True):
+         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+         loadingContext = arvados_cwl.context.ArvLoadingContext(
+             {"avsc_names": avsc_names,
+              "basedir": "",
+              "make_fs_access": make_fs_access,
+              "loader": Loader({}),
+              "metadata": {"cwlVersion": "v1.0"},
+              "makeTool": runner.arv_make_tool})
+         runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+             {"work_api": "jobs",
+              "basedir": "",
+              "name": "test_run_job_"+str(enable_reuse),
+              "make_fs_access": make_fs_access,
+              "enable_reuse": enable_reuse,
+              "priority": 500})
+         return loadingContext, runtimeContext
      # The test passes no builder.resources
      # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
      @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
@@@ -43,7 -57,6 +65,6 @@@
              runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
              runner.ignore_docker_for_reuse = False
              runner.num_retries = 0
-             document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
  
              list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
              runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
                  "id": "#",
                  "class": "CommandLineTool"
              })
-             make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                      basedir="", make_fs_access=make_fs_access, loader=Loader({}),
-                                                      metadata={"cwlVersion": "v1.0"})
+             loadingContext, runtimeContext = self.helper(runner, enable_reuse)
+             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
              arvtool.formatgraph = None
-             for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
-                 j.run(enable_reuse=enable_reuse)
+             for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+                 j.run(runtimeContext)
                  runner.api.jobs().create.assert_called_with(
                      body=JsonDiffMatcher({
                          'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                      runner.api.links().create.side_effect = ApiError(
                          mock.MagicMock(return_value={'status': 403}),
                          'Permission denied')
-                     j.run(enable_reuse=enable_reuse)
+                     j.run(runtimeContext)
                  else:
                      assert not runner.api.links().create.called
  
          list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
          runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
  
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
          tool = {
              "inputs": [],
              "outputs": [],
              "id": "#",
              "class": "CommandLineTool"
          }
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                  make_fs_access=make_fs_access, loader=Loader({}),
-                                                  metadata={"cwlVersion": "v1.0"})
+         loadingContext, runtimeContext = self.helper(runner)
+         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
-             j.run(enable_reuse=True)
+         for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+             j.run(runtimeContext)
          runner.api.jobs().create.assert_called_with(
              body=JsonDiffMatcher({
                  'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                                                          {"items": []},
                                                          {"items": [{"manifest_text": "ABC"}]})
  
-         arvjob = arvados_cwl.ArvadosJob(runner)
-         arvjob.name = "testjob"
-         arvjob.builder = mock.MagicMock()
+         arvjob = arvados_cwl.ArvadosJob(runner,
+                                         mock.MagicMock(),
+                                         {},
+                                         None,
+                                         [],
+                                         [],
+                                         "testjob")
          arvjob.output_callback = mock.MagicMock()
          arvjob.collect_outputs = mock.MagicMock()
          arvjob.collect_outputs.return_value = {"out": "stuff"}
              {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
          )
  
-         arvjob = arvados_cwl.ArvadosJob(runner)
-         arvjob.name = "testjob"
-         arvjob.builder = mock.MagicMock()
+         arvjob = arvados_cwl.ArvadosJob(runner,
+                                         mock.MagicMock(),
+                                         {},
+                                         None,
+                                         [],
+                                         [],
+                                         "testjob")
          arvjob.output_callback = mock.MagicMock()
          arvjob.collect_outputs = mock.MagicMock()
          arvjob.collect_outputs.return_value = {"out": "stuff"}
  
          arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
  
 +    def test_get_intermediate_collection_info(self):
 +        arvrunner = mock.MagicMock()
 +        arvrunner.intermediate_output_ttl = 60
 +        arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
 +
 +        job = arvados_cwl.ArvadosJob(arvrunner)
 +
 +        info = job._get_intermediate_collection_info()
 +
 +        self.assertEqual(info["name"], "Intermediate collection")
 +        self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
 +        self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
 +
 +
  
  class TestWorkflow(unittest.TestCase):
+     def helper(self, runner, enable_reuse=True):
+         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+         document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
+         document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
+         document_loader.fetch_text = document_loader.fetcher.fetch_text
+         document_loader.check_exists = document_loader.fetcher.check_exists
+         loadingContext = arvados_cwl.context.ArvLoadingContext(
+             {"avsc_names": avsc_names,
+              "basedir": "",
+              "make_fs_access": make_fs_access,
+              "loader": document_loader,
+              "metadata": {"cwlVersion": "v1.0"},
+              "construct_tool_object": runner.arv_make_tool})
+         runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+             {"work_api": "jobs",
+              "basedir": "",
+              "name": "test_run_wf_"+str(enable_reuse),
+              "make_fs_access": make_fs_access,
+              "enable_reuse": enable_reuse,
+              "priority": 500})
+         return loadingContext, runtimeContext
      # The test passes no builder.resources
      # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
      @mock.patch("arvados.collection.CollectionReader")
          runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
          runner.ignore_docker_for_reuse = False
          runner.num_retries = 0
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
  
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
-         document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
-         document_loader.fetch_text = document_loader.fetcher.fetch_text
-         document_loader.check_exists = document_loader.fetcher.check_exists
+         loadingContext, runtimeContext = self.helper(runner)
  
-         tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
+         tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
          metadata["cwlVersion"] = tool["cwlVersion"]
  
          mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
  
-         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                               basedir="", make_fs_access=make_fs_access, loader=document_loader,
-                                               makeTool=runner.arv_make_tool, metadata=metadata)
+         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
-         it.next().run()
-         it.next().run()
+         it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+         it.next().run(runtimeContext)
+         it.next().run(runtimeContext)
  
          with open("tests/wf/scatter2_subwf.cwl") as f:
              subwf = StripYAMLComments(f.read())
          runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
          runner.ignore_docker_for_reuse = False
          runner.num_retries = 0
-         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
  
-         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-         document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
-         document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
-         document_loader.fetch_text = document_loader.fetcher.fetch_text
-         document_loader.check_exists = document_loader.fetcher.check_exists
+         loadingContext, runtimeContext = self.helper(runner)
  
-         tool, metadata = document_loader.resolve_ref("tests/wf/echo-wf.cwl")
+         tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
          metadata["cwlVersion"] = tool["cwlVersion"]
  
          mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
  
-         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                               basedir="", make_fs_access=make_fs_access, loader=document_loader,
-                                               makeTool=runner.arv_make_tool, metadata=metadata)
+         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
          arvtool.formatgraph = None
-         it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
-         it.next().run()
-         it.next().run()
+         it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+         it.next().run(runtimeContext)
+         it.next().run(runtimeContext)
  
          with open("tests/wf/echo-subwf.cwl") as f:
              subwf = StripYAMLComments(f.read())