Merge branch '13933-dispatch-batch-size'
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index d6ffe7d1612c487c38d132adbb2f8215153f2d46..8c3f0eadee8755ba63027893ce3425df09a082ad 100644 (file)
@@ -45,12 +45,14 @@ from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver,
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from .task_queue import TaskQueue
+from .context import ArvLoadingContext, ArvRuntimeContext
 from ._version import __version__
 
 from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
 from cwltool.command_line_tool import compute_checksums
+
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -69,9 +71,19 @@ class ArvCwlRunner(object):
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None,
-                 output_name=None, output_tags=None, default_storage_classes="default",
-                 num_retries=4, thread_count=4):
+    def __init__(self, api_client,
+                 arvargs=None,
+                 keep_client=None,
+                 num_retries=4,
+                 thread_count=4):
+
+        if arvargs is None:
+            arvargs = argparse.Namespace()
+            arvargs.work_api = None
+            arvargs.output_name = None
+            arvargs.output_tags = None
+            arvargs.thread_count = 1
+
         self.api = api_client
         self.processes = {}
         self.workflow_eval_lock = threading.Condition(threading.RLock())
@@ -83,15 +95,15 @@ class ArvCwlRunner(object):
         self.poll_api = None
         self.pipeline = None
         self.final_output_collection = None
-        self.output_name = output_name
-        self.output_tags = output_tags
+        self.output_name = arvargs.output_name
+        self.output_tags = arvargs.output_tags
         self.project_uuid = None
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
-        self.thread_count = thread_count
+        self.thread_count = arvargs.thread_count
         self.poll_interval = 12
-        self.default_storage_classes = default_storage_classes
+        self.loadingContext = None
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -111,25 +123,46 @@ class ArvCwlRunner(object):
             try:
                 methods = self.api._rootDesc.get('resources')[api]['methods']
                 if ('httpMethod' in methods['create'] and
-                    (work_api == api or work_api is None)):
+                    (arvargs.work_api == api or arvargs.work_api is None)):
                     self.work_api = api
                     break
             except KeyError:
                 pass
 
         if not self.work_api:
-            if work_api is None:
+            if arvargs.work_api is None:
                 raise Exception("No supported APIs")
             else:
                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
 
+        if self.work_api == "jobs":
+            logger.warn("""
+*******************************
+Using the deprecated 'jobs' API.
+
+To get rid of this warning:
+
+Users: read about migrating at
+http://doc.arvados.org/user/cwl/cwl-style.html#migrate
+and use the option --api=containers
+
+Admins: configure the cluster to disable the 'jobs' API as described at:
+http://doc.arvados.org/install/install-api-server.html#disable_api_methods
+*******************************""")
+
+        self.loadingContext = ArvLoadingContext(vars(arvargs))
+        self.loadingContext.fetcher_constructor = self.fetcher_constructor
+        self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
+        self.loadingContext.construct_tool_object = self.arv_make_tool
+
+
     def arv_make_tool(self, toolpath_object, loadingContext):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, loadingContext)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
             return ArvadosWorkflow(self, toolpath_object, loadingContext)
         else:
-            return cwltool.workflow.defaultMakeTool(toolpath_object, loadingContext)
+            return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
 
     def output_callback(self, out, processStatus):
         with self.workflow_eval_lock:
@@ -139,7 +172,7 @@ class ArvCwlRunner(object):
                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
             else:
-                logger.warn("Overall process status is %s", processStatus)
+                logger.error("Overall process status is %s", processStatus)
                 if self.pipeline:
                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
@@ -371,7 +404,7 @@ class ArvCwlRunner(object):
                                        'progress':1.0
                                    }).execute(num_retries=self.num_retries)
 
-    def arv_executor(self, tool, job_order, runtimeContext):
+    def arv_executor(self, tool, job_order, runtimeContext, logger=None):
         self.debug = runtimeContext.debug
 
         tool.visit(self.check_features)
@@ -404,12 +437,14 @@ class ArvCwlRunner(object):
         # Reload tool object which may have been updated by
         # upload_workflow_deps
         # Don't validate this time because it will just print redundant errors.
+        loadingContext = self.loadingContext.copy()
+        loadingContext.loader = tool.doc_loader
+        loadingContext.avsc_names = tool.doc_schema
+        loadingContext.metadata = tool.metadata
+        loadingContext.do_validate = False
+
         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
-                                  construct_tool_object=self.arv_make_tool,
-                                  loader=tool.doc_loader,
-                                  avsc_names=tool.doc_schema,
-                                  metadata=tool.metadata,
-                                  do_validate=False)
+                                  loadingContext)
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
@@ -443,6 +478,7 @@ class ArvCwlRunner(object):
         runtimeContext = runtimeContext.copy()
         runtimeContext.use_container = True
         runtimeContext.tmpdir_prefix = "tmp"
+        runtimeContext.work_api = self.work_api
 
         if self.work_api == "containers":
             if self.ignore_docker_for_reuse:
@@ -480,7 +516,6 @@ class ArvCwlRunner(object):
                                                 submit_runner_image=runtimeContext.submit_runner_image,
                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
                                                 merged_map=merged_map,
-                                                default_storage_classes=self.default_storage_classes,
                                                 priority=runtimeContext.priority,
                                                 secret_store=self.secret_store)
             elif self.work_api == "jobs":
@@ -592,7 +627,7 @@ class ArvCwlRunner(object):
             if self.output_tags is None:
                 self.output_tags = ""
 
-            storage_classes = kwargs.get("storage_classes").strip().split(",")
+            storage_classes = runtimeContext.storage_classes.strip().split(",")
             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
             self.set_crunch_output()
 
@@ -694,7 +729,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     parser.add_argument("--submit-runner-ram", type=int,
                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
-                        default=1024)
+                        default=None)
 
     parser.add_argument("--submit-runner-image", type=str,
                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
@@ -782,6 +817,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
         logger.error("Multiple storage classes are not supported currently.")
         return 1
 
+    arvargs.use_container = True
+    arvargs.relax_path_checks = True
+    arvargs.print_supported_versions = False
+
     if install_sig_handlers:
         arv_cmd.install_signal_handlers()
 
@@ -807,12 +846,11 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
         if api_client is None:
             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
             keep_client = api_client.keep
+            # Make an API object now so errors are reported early.
+            api_client.users().current().execute()
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
-        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
-                              num_retries=4, output_name=arvargs.output_name,
-                              output_tags=arvargs.output_tags, default_storage_classes=parser.get_default("storage_classes"),
-                              thread_count=arvargs.thread_count)
+        runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
     except Exception as e:
         logger.error(e)
         return 1
@@ -837,20 +875,11 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
     else:
         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
 
-    for key, val in six.iteritems(cwltool.argparser.get_default_args()):
+    for key, val in cwltool.argparser.get_default_args().items():
         if not hasattr(arvargs, key):
             setattr(arvargs, key, val)
 
-    arvargs.use_container = True
-    arvargs.relax_path_checks = True
-    arvargs.print_supported_versions = False
-
-    loadingContext = LoadingContext(vars(arvargs))
-    loadingContext.fetcher_constructor = runner.fetcher_constructor
-    loadingContext.resolver = partial(collectionResolver, api_client, num_retries=runner.num_retries)
-    loadingContext.construct_tool_object = runner.arv_make_tool
-
-    runtimeContext = RuntimeContext(vars(arvargs))
+    runtimeContext = ArvRuntimeContext(vars(arvargs))
     runtimeContext.make_fs_access = partial(CollectionFsAccess,
                              collection_cache=runner.collection_cache)
 
@@ -861,4 +890,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
                              logger_handler=arvados.log_handler,
-                             custom_schema_callback=add_arv_hints)
+                             custom_schema_callback=add_arv_hints,
+                             loadingContext=runner.loadingContext,
+                             runtimeContext=runtimeContext)