Merge branch 'wtsi/python-api-timeout' refs #13542
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 5 Sep 2018 18:53:17 +0000 (14:53 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 5 Sep 2018 18:53:17 +0000 (14:53 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

1  2 
sdk/cwl/arvados_cwl/__init__.py
sdk/python/arvados/api.py

index 8c3f0eadee8755ba63027893ce3425df09a082ad,ffccf4e971cbd2e64783d755f36cda1395f4a217..da24dc48465426ced81d1d37311b271bf45132a4
@@@ -28,7 -28,6 +28,7 @@@ import cwltool.workflo
  import cwltool.process
  from schema_salad.sourceline import SourceLine
  import schema_salad.validate as validate
 +import cwltool.argparser
  
  import arvados
  import arvados.config
@@@ -45,14 -44,12 +45,14 @@@ from .fsaccess import CollectionFsAcces
  from .perf import Perf
  from .pathmapper import NoFollowPathMapper
  from .task_queue import TaskQueue
 +from .context import ArvLoadingContext, ArvRuntimeContext
  from ._version import __version__
  
  from cwltool.pack import pack
  from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
  from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
  from cwltool.command_line_tool import compute_checksums
 +
  from arvados.api import OrderedJsonModel
  
  logger = logging.getLogger('arvados.cwl-runner')
@@@ -71,19 -68,9 +71,19 @@@ class ArvCwlRunner(object)
  
      """
  
 -    def __init__(self, api_client, work_api=None, keep_client=None,
 -                 output_name=None, output_tags=None, num_retries=4,
 +    def __init__(self, api_client,
 +                 arvargs=None,
 +                 keep_client=None,
 +                 num_retries=4,
                   thread_count=4):
 +
 +        if arvargs is None:
 +            arvargs = argparse.Namespace()
 +            arvargs.work_api = None
 +            arvargs.output_name = None
 +            arvargs.output_tags = None
 +            arvargs.thread_count = 1
 +
          self.api = api_client
          self.processes = {}
          self.workflow_eval_lock = threading.Condition(threading.RLock())
          self.poll_api = None
          self.pipeline = None
          self.final_output_collection = None
 -        self.output_name = output_name
 -        self.output_tags = output_tags
 +        self.output_name = arvargs.output_name
 +        self.output_tags = arvargs.output_tags
          self.project_uuid = None
          self.intermediate_output_ttl = 0
          self.intermediate_output_collections = []
          self.trash_intermediate = False
 -        self.thread_count = thread_count
 +        self.thread_count = arvargs.thread_count
          self.poll_interval = 12
 +        self.loadingContext = None
  
          if keep_client is not None:
              self.keep_client = keep_client
              try:
                  methods = self.api._rootDesc.get('resources')[api]['methods']
                  if ('httpMethod' in methods['create'] and
 -                    (work_api == api or work_api is None)):
 +                    (arvargs.work_api == api or arvargs.work_api is None)):
                      self.work_api = api
                      break
              except KeyError:
                  pass
  
          if not self.work_api:
 -            if work_api is None:
 +            if arvargs.work_api is None:
                  raise Exception("No supported APIs")
              else:
                  raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
  
 -    def arv_make_tool(self, toolpath_object, **kwargs):
 -        kwargs["work_api"] = self.work_api
 -        kwargs["fetcher_constructor"] = self.fetcher_constructor
 -        kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
 +        if self.work_api == "jobs":
 +            logger.warn("""
 +*******************************
 +Using the deprecated 'jobs' API.
 +
 +To get rid of this warning:
 +
 +Users: read about migrating at
 +http://doc.arvados.org/user/cwl/cwl-style.html#migrate
 +and use the option --api=containers
 +
 +Admins: configure the cluster to disable the 'jobs' API as described at:
 +http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 +*******************************""")
 +
 +        self.loadingContext = ArvLoadingContext(vars(arvargs))
 +        self.loadingContext.fetcher_constructor = self.fetcher_constructor
 +        self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
 +        self.loadingContext.construct_tool_object = self.arv_make_tool
 +
 +
 +    def arv_make_tool(self, toolpath_object, loadingContext):
          if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
 -            return ArvadosCommandTool(self, toolpath_object, **kwargs)
 +            return ArvadosCommandTool(self, toolpath_object, loadingContext)
          elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
 -            return ArvadosWorkflow(self, toolpath_object, **kwargs)
 +            return ArvadosWorkflow(self, toolpath_object, loadingContext)
          else:
 -            return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 +            return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
  
      def output_callback(self, out, processStatus):
          with self.workflow_eval_lock:
                      self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                           body={"state": "Complete"}).execute(num_retries=self.num_retries)
              else:
 -                logger.warn("Overall process status is %s", processStatus)
 +                logger.error("Overall process status is %s", processStatus)
                  if self.pipeline:
                      self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                           body={"state": "Failed"}).execute(num_retries=self.num_retries)
              self.workflow_eval_lock.notifyAll()
  
  
 -    def start_run(self, runnable, kwargs):
 -        self.task_queue.add(partial(runnable.run, **kwargs))
 +    def start_run(self, runnable, runtimeContext):
 +        self.task_queue.add(partial(runnable.run, runtimeContext))
  
      def process_submitted(self, container):
          with self.workflow_eval_lock:
                  with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
                      self.check_features(v)
  
 -    def make_output_collection(self, name, tagsString, outputObj):
 +    def make_output_collection(self, name, storage_classes, tagsString, outputObj):
          outputObj = copy.deepcopy(outputObj)
  
          files = []
          with final.open("cwl.output.json", "w") as f:
              json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
  
 -        final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
 +        final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
  
          logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
                      final.api_response()["name"],
                                         'progress':1.0
                                     }).execute(num_retries=self.num_retries)
  
 -    def arv_executor(self, tool, job_order, **kwargs):
 -        self.debug = kwargs.get("debug")
 +    def arv_executor(self, tool, job_order, runtimeContext, logger=None):
 +        self.debug = runtimeContext.debug
  
          tool.visit(self.check_features)
  
 -        self.project_uuid = kwargs.get("project_uuid")
 +        self.project_uuid = runtimeContext.project_uuid
          self.pipeline = None
 -        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
 -                                                                 collection_cache=self.collection_cache)
 -        self.fs_access = make_fs_access(kwargs["basedir"])
 -        self.secret_store = kwargs.get("secret_store")
 +        self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
 +        self.secret_store = runtimeContext.secret_store
  
 -        self.trash_intermediate = kwargs["trash_intermediate"]
 +        self.trash_intermediate = runtimeContext.trash_intermediate
          if self.trash_intermediate and self.work_api != "containers":
              raise Exception("--trash-intermediate is only supported with --api=containers.")
  
 -        self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
 +        self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
          if self.intermediate_output_ttl and self.work_api != "containers":
              raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
          if self.intermediate_output_ttl < 0:
              raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
  
 -        if kwargs.get("submit_request_uuid") and self.work_api != "containers":
 +        if runtimeContext.submit_request_uuid and self.work_api != "containers":
              raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
  
 -        if not kwargs.get("name"):
 -            kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
 +        if not runtimeContext.name:
 +            runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
  
          # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
          # Also uploads docker images.
          # Reload tool object which may have been updated by
          # upload_workflow_deps
          # Don't validate this time because it will just print redundant errors.
 +        loadingContext = self.loadingContext.copy()
 +        loadingContext.loader = tool.doc_loader
 +        loadingContext.avsc_names = tool.doc_schema
 +        loadingContext.metadata = tool.metadata
 +        loadingContext.do_validate = False
 +
          tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
 -                                  makeTool=self.arv_make_tool,
 -                                  loader=tool.doc_loader,
 -                                  avsc_names=tool.doc_schema,
 -                                  metadata=tool.metadata,
 -                                  do_validate=False)
 +                                  loadingContext)
  
          # Upload local file references in the job order.
 -        job_order = upload_job_order(self, "%s input" % kwargs["name"],
 +        job_order = upload_job_order(self, "%s input" % runtimeContext.name,
                                       tool, job_order)
  
 -        existing_uuid = kwargs.get("update_workflow")
 -        if existing_uuid or kwargs.get("create_workflow"):
 +        existing_uuid = runtimeContext.update_workflow
 +        if existing_uuid or runtimeContext.create_workflow:
              # Create a pipeline template or workflow record and exit.
              if self.work_api == "jobs":
                  tmpl = RunnerTemplate(self, tool, job_order,
 -                                      kwargs.get("enable_reuse"),
 +                                      runtimeContext.enable_reuse,
                                        uuid=existing_uuid,
 -                                      submit_runner_ram=kwargs.get("submit_runner_ram"),
 -                                      name=kwargs["name"],
 +                                      submit_runner_ram=runtimeContext.submit_runner_ram,
 +                                      name=runtimeContext.name,
                                        merged_map=merged_map)
                  tmpl.save()
                  # cwltool.main will write our return value to stdout.
                  return (upload_workflow(self, tool, job_order,
                                          self.project_uuid,
                                          uuid=existing_uuid,
 -                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
 -                                        name=kwargs["name"],
 +                                        submit_runner_ram=runtimeContext.submit_runner_ram,
 +                                        name=runtimeContext.name,
                                          merged_map=merged_map),
                          "success")
  
 -        self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 -        self.eval_timeout = kwargs.get("eval_timeout")
 +        self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
 +        self.eval_timeout = runtimeContext.eval_timeout
  
 -        kwargs["make_fs_access"] = make_fs_access
 -        kwargs["enable_reuse"] = kwargs.get("enable_reuse")
 -        kwargs["use_container"] = True
 -        kwargs["tmpdir_prefix"] = "tmp"
 -        kwargs["compute_checksum"] = kwargs.get("compute_checksum")
 +        runtimeContext = runtimeContext.copy()
 +        runtimeContext.use_container = True
 +        runtimeContext.tmpdir_prefix = "tmp"
 +        runtimeContext.work_api = self.work_api
  
          if self.work_api == "containers":
              if self.ignore_docker_for_reuse:
                  raise Exception("--ignore-docker-for-reuse not supported with containers API.")
 -            kwargs["outdir"] = "/var/spool/cwl"
 -            kwargs["docker_outdir"] = "/var/spool/cwl"
 -            kwargs["tmpdir"] = "/tmp"
 -            kwargs["docker_tmpdir"] = "/tmp"
 +            runtimeContext.outdir = "/var/spool/cwl"
 +            runtimeContext.docker_outdir = "/var/spool/cwl"
 +            runtimeContext.tmpdir = "/tmp"
 +            runtimeContext.docker_tmpdir = "/tmp"
          elif self.work_api == "jobs":
 -            if kwargs["priority"] != DEFAULT_PRIORITY:
 +            if runtimeContext.priority != DEFAULT_PRIORITY:
                  raise Exception("--priority not implemented for jobs API.")
 -            kwargs["outdir"] = "$(task.outdir)"
 -            kwargs["docker_outdir"] = "$(task.outdir)"
 -            kwargs["tmpdir"] = "$(task.tmpdir)"
 +            runtimeContext.outdir = "$(task.outdir)"
 +            runtimeContext.docker_outdir = "$(task.outdir)"
 +            runtimeContext.tmpdir = "$(task.tmpdir)"
  
 -        if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
 +        if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
              raise Exception("--priority must be in the range 1..1000.")
  
          runnerjob = None
 -        if kwargs.get("submit"):
 +        if runtimeContext.submit:
              # Submit a runner job to run the workflow for us.
              if self.work_api == "containers":
 -                if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
 -                    kwargs["runnerjob"] = tool.tool["id"]
 +                if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
 +                    runtimeContext.runnerjob = tool.tool["id"]
                      runnerjob = tool.job(job_order,
                                           self.output_callback,
 -                                         **kwargs).next()
 +                                         runtimeContext).next()
                  else:
 -                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
 +                    runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
                                                  self.output_name,
                                                  self.output_tags,
 -                                                submit_runner_ram=kwargs.get("submit_runner_ram"),
 -                                                name=kwargs.get("name"),
 -                                                on_error=kwargs.get("on_error"),
 -                                                submit_runner_image=kwargs.get("submit_runner_image"),
 -                                                intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
 +                                                submit_runner_ram=runtimeContext.submit_runner_ram,
 +                                                name=runtimeContext.name,
 +                                                on_error=runtimeContext.on_error,
 +                                                submit_runner_image=runtimeContext.submit_runner_image,
 +                                                intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
                                                  merged_map=merged_map,
 -                                                priority=kwargs.get("priority"),
 +                                                priority=runtimeContext.priority,
                                                  secret_store=self.secret_store)
              elif self.work_api == "jobs":
 -                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
 +                runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
                                        self.output_name,
                                        self.output_tags,
 -                                      submit_runner_ram=kwargs.get("submit_runner_ram"),
 -                                      name=kwargs.get("name"),
 -                                      on_error=kwargs.get("on_error"),
 -                                      submit_runner_image=kwargs.get("submit_runner_image"),
 +                                      submit_runner_ram=runtimeContext.submit_runner_ram,
 +                                      name=runtimeContext.name,
 +                                      on_error=runtimeContext.on_error,
 +                                      submit_runner_image=runtimeContext.submit_runner_image,
                                        merged_map=merged_map)
 -        elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
 +        elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
              # Create pipeline for local run
              self.pipeline = self.api.pipeline_instances().create(
                  body={
                      "owner_uuid": self.project_uuid,
 -                    "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
 +                    "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
                      "components": {},
                      "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
              logger.info("Pipeline instance %s", self.pipeline["uuid"])
  
 -        if runnerjob and not kwargs.get("wait"):
 -            submitargs = kwargs.copy()
 -            submitargs['submit'] = False
 -            runnerjob.run(**submitargs)
 +        if runnerjob and not runtimeContext.wait:
 +            submitargs = runtimeContext.copy()
 +            submitargs.submit = False
 +            runnerjob.run(submitargs)
              return (runnerjob.uuid, "success")
  
-         self.poll_api = arvados.api('v1')
+         self.poll_api = arvados.api('v1', timeout=kwargs["http_timeout"])
          self.polling_thread = threading.Thread(target=self.poll_states)
          self.polling_thread.start()
  
          if runnerjob:
              jobiter = iter((runnerjob,))
          else:
 -            if "cwl_runner_job" in kwargs:
 -                self.uuid = kwargs.get("cwl_runner_job").get('uuid')
 +            if runtimeContext.cwl_runner_job is not None:
 +                self.uuid = runtimeContext.cwl_runner_job.get('uuid')
              jobiter = tool.job(job_order,
                                 self.output_callback,
 -                               **kwargs)
 +                               runtimeContext)
  
          try:
              self.workflow_eval_lock.acquire()
  
                  if runnable:
                      with Perf(metrics, "run"):
 -                        self.start_run(runnable, kwargs)
 +                        self.start_run(runnable, runtimeContext)
                  else:
                      if (self.task_queue.in_flight + len(self.processes)) > 0:
                          self.workflow_eval_lock.wait(3)
          if self.final_output is None:
              raise WorkflowException("Workflow did not return a result.")
  
 -        if kwargs.get("submit") and isinstance(runnerjob, Runner):
 +        if runtimeContext.submit and isinstance(runnerjob, Runner):
              logger.info("Final output collection %s", runnerjob.final_output)
          else:
              if self.output_name is None:
                  self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
              if self.output_tags is None:
                  self.output_tags = ""
 -            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
 +
 +            storage_classes = runtimeContext.storage_classes.strip().split(",")
 +            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
              self.set_crunch_output()
  
 -        if kwargs.get("compute_checksum"):
 +        if runtimeContext.compute_checksum:
              adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
              adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
  
@@@ -729,7 -696,7 +729,7 @@@ def arg_parser():  # type: () -> argpar
  
      parser.add_argument("--submit-runner-ram", type=int,
                          help="RAM (in MiB) required for the workflow runner job (default 1024)",
 -                        default=1024)
 +                        default=None)
  
      parser.add_argument("--submit-runner-image", type=str,
                          help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
      parser.add_argument("--enable-dev", action="store_true",
                          help="Enable loading and running development versions "
                               "of CWL spec.", default=False)
 +    parser.add_argument('--storage-classes', default="default", type=str,
 +                        help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
  
      parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
                          help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
      parser.add_argument("--thread-count", type=int,
                          default=4, help="Number of threads to use for job submit and output collection.")
  
+     parser.add_argument("--http-timeout", type=int,
+                         default=5*60, dest="http_timeout", help="Http timeout. Default is 5 minutes.")
      exgroup = parser.add_mutually_exclusive_group()
      exgroup.add_argument("--trash-intermediate", action="store_true",
                          default=False, dest="trash_intermediate",
@@@ -813,14 -781,6 +816,14 @@@ def main(args, stdout, stderr, api_clie
      job_order_object = None
      arvargs = parser.parse_args(args)
  
 +    if len(arvargs.storage_classes.strip().split(',')) > 1:
 +        logger.error("Multiple storage classes are not supported currently.")
 +        return 1
 +
 +    arvargs.use_container = True
 +    arvargs.relax_path_checks = True
 +    arvargs.print_supported_versions = False
 +
      if install_sig_handlers:
          arv_cmd.install_signal_handlers()
  
  
      try:
          if api_client is None:
-             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
+             api_client = arvados.safeapi.ThreadSafeApiCache(
+                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
+                 keep_params={"num_retries": 4})
              keep_client = api_client.keep
 +            # Make an API object now so errors are reported early.
 +            api_client.users().current().execute()
          if keep_client is None:
              keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
 -        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
 -                              num_retries=4, output_name=arvargs.output_name,
 -                              output_tags=arvargs.output_tags,
 -                              thread_count=arvargs.thread_count)
 +        runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
      except Exception as e:
          logger.error(e)
          return 1
      else:
          arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
  
 -    arvargs.conformance_test = None
 -    arvargs.use_container = True
 -    arvargs.relax_path_checks = True
 -    arvargs.print_supported_versions = False
 +    for key, val in cwltool.argparser.get_default_args().items():
 +        if not hasattr(arvargs, key):
 +            setattr(arvargs, key, val)
  
 -    make_fs_access = partial(CollectionFsAccess,
 -                           collection_cache=runner.collection_cache)
 +    runtimeContext = ArvRuntimeContext(vars(arvargs))
 +    runtimeContext.make_fs_access = partial(CollectionFsAccess,
 +                             collection_cache=runner.collection_cache)
  
      return cwltool.main.main(args=arvargs,
                               stdout=stdout,
                               stderr=stderr,
                               executor=runner.arv_executor,
 -                             makeTool=runner.arv_make_tool,
                               versionfunc=versionstring,
                               job_order_object=job_order_object,
 -                             make_fs_access=make_fs_access,
 -                             fetcher_constructor=partial(CollectionFetcher,
 -                                                         api_client=api_client,
 -                                                         fs_access=make_fs_access(""),
 -                                                         num_retries=runner.num_retries),
 -                             resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
                               logger_handler=arvados.log_handler,
 -                             custom_schema_callback=add_arv_hints)
 +                             custom_schema_callback=add_arv_hints,
 +                             loadingContext=runner.loadingContext,
 +                             runtimeContext=runtimeContext)
index b652db77d18a73214740672da6588f0fbaab3de3,7e9ad1ee81ad401feae2d0205e4516accbdbd96d..b18ce25fd218201ab75f4b3c14f9c7b66f84f373
@@@ -96,10 -96,6 +96,10 @@@ def _intercept_http_request(self, uri, 
                            delay, exc_info=True)
              for conn in self.connections.values():
                  conn.close()
 +        except httplib2.SSLHandshakeError as e:
 +            # Intercept and re-raise with a better error message.
 +            raise httplib2.SSLHandshakeError("Could not connect to %s\n%s\nPossible causes: remote SSL/TLS certificate expired, or was issued by an untrusted certificate authority." % (uri, e))
 +
          time.sleep(delay)
          delay = delay * self._retry_delay_backoff
  
@@@ -153,7 -149,7 +153,7 @@@ def http_cache(data_type)
      return cache.SafeHTTPCache(path, max_age=60*60*24*2)
  
  def api(version=None, cache=True, host=None, token=None, insecure=False,
-         request_id=None, **kwargs):
+         request_id=None, timeout=5*60, **kwargs):
      """Return an apiclient Resources object for an Arvados instance.
  
      :version:
      :insecure:
        If True, ignore SSL certificate validation errors.
  
+     :timeout:
+       A timeout value for http requests.
      :request_id:
        Default X-Request-Id header value for outgoing requests that
        don't already provide one. If None or omitted, generate a random
              http_kwargs['disable_ssl_certificate_validation'] = True
          kwargs['http'] = httplib2.Http(**http_kwargs)
  
+     if kwargs['http'].timeout is None:
+         kwargs['http'].timeout = timeout
      kwargs['http'] = _patch_http_request(kwargs['http'], token)
  
      svc = apiclient_discovery.build('arvados', version, cache_discovery=False, **kwargs)
@@@ -258,12 -260,9 +264,12 @@@ def api_from_config(version=None, apico
      if apiconfig is None:
          apiconfig = config.settings()
  
 +    errors = []
      for x in ['ARVADOS_API_HOST', 'ARVADOS_API_TOKEN']:
          if x not in apiconfig:
 -            raise ValueError("%s is not set. Aborting." % x)
 +            errors.append(x)
 +    if errors:
 +        raise ValueError(" and ".join(errors)+" not set.\nPlease set in %s or export environment variable." % config.default_config_file)
      host = apiconfig.get('ARVADOS_API_HOST')
      token = apiconfig.get('ARVADOS_API_TOKEN')
      insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE', apiconfig)