20933: Use acrContainerImage where available
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 08a05d571cb8e41bb48265489fcec9f13b1e6100..fd3b7a5d16b6e62909f6b1391ead5198aafe01bf 100644 (file)
@@ -28,14 +28,15 @@ from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing
 
 import arvados
 import arvados.config
+import arvados.logging
 from arvados.keep import KeepClient
 from arvados.errors import ApiError
 import arvados.commands._util as arv_cmd
-from arvados.api import OrderedJsonModel
 
 from .perf import Perf
 from ._version import __version__
 from .executor import ArvCwlExecutor
+from .fsaccess import workflow_uuid_pattern
 
 # These aren't used directly in this file but
 # other code expects to import them from here
@@ -67,7 +68,10 @@ def versionstring():
 
 
 def arg_parser():  # type: () -> argparse.ArgumentParser
-    parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
+    parser = argparse.ArgumentParser(
+        description='Arvados executor for Common Workflow Language',
+        parents=[arv_cmd.retry_opt],
+    )
 
     parser.add_argument("--basedir",
                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
@@ -119,6 +123,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
     exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
 
+    exgroup.add_argument("--print-keep-deps", action="store_true", help="To assist copying, print a list of Keep collections that this workflow depends on.")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
                         default=True, dest="wait")
@@ -199,6 +205,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         action="store_false", default=True,
                         help=argparse.SUPPRESS)
 
+    parser.add_argument("--disable-git", dest="git_info",
+                        action="store_false", default=True,
+                        help=argparse.SUPPRESS)
+
     parser.add_argument("--disable-color", dest="enable_color",
                         action="store_false", default=True,
                         help=argparse.SUPPRESS)
@@ -207,12 +217,25 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         action="store_true", default=False,
                         help=argparse.SUPPRESS)
 
+    parser.add_argument("--fast-parser", dest="fast_parser",
+                        action="store_true", default=False,
+                        help=argparse.SUPPRESS)
+
     parser.add_argument("--thread-count", type=int,
                         default=0, help="Number of threads to use for job submit and output collection.")
 
     parser.add_argument("--http-timeout", type=int,
                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
 
+    parser.add_argument("--defer-downloads", action="store_true", default=False,
+                        help="When submitting a workflow, defer downloading HTTP URLs to workflow launch instead of downloading to Keep before submit.")
+
+    parser.add_argument("--varying-url-params", type=str, default="",
+                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+    parser.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
     exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
@@ -266,6 +289,8 @@ def add_arv_hints():
         "http://commonwl.org/cwltool#CUDARequirement",
         "http://arvados.org/cwl#UsePreemptible",
         "http://arvados.org/cwl#OutputCollectionProperties",
+        "http://arvados.org/cwl#KeepCacheTypeRequirement",
+        "http://arvados.org/cwl#OutOfMemoryRetry",
     ])
 
 def exit_signal_handler(sigcode, frame):
@@ -301,7 +326,9 @@ def main(args=sys.argv[1:],
             return 1
         arvargs.work_api = want_api
 
-    if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+    workflow_op = arvargs.create_workflow or arvargs.update_workflow or arvargs.print_keep_deps
+
+    if workflow_op and not arvargs.job_order:
         job_order_object = ({}, "")
 
     add_arv_hints()
@@ -313,14 +340,32 @@ def main(args=sys.argv[1:],
     try:
         if api_client is None:
             api_client = arvados.safeapi.ThreadSafeApiCache(
-                api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
-                keep_params={"num_retries": 4})
+                api_params={
+                    'num_retries': arvargs.retries,
+                    'timeout': arvargs.http_timeout,
+                },
+                keep_params={
+                    'num_retries': arvargs.retries,
+                },
+                version='v1',
+            )
             keep_client = api_client.keep
             # Make an API object now so errors are reported early.
             api_client.users().current().execute()
         if keep_client is None:
-            keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
-        executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4, stdout=stdout)
+            block_cache = arvados.keep.KeepBlockCache(disk_cache=True)
+            keep_client = arvados.keep.KeepClient(
+                api_client=api_client,
+                block_cache=block_cache,
+                num_retries=arvargs.retries,
+            )
+        executor = ArvCwlExecutor(
+            api_client,
+            arvargs,
+            keep_client=keep_client,
+            num_retries=arvargs.retries,
+            stdout=stdout,
+        )
     except WorkflowException as e:
         logger.error(e, exc_info=(sys.exc_info()[1] if arvargs.debug else False))
         return 1
@@ -330,9 +375,25 @@ def main(args=sys.argv[1:],
 
     # Note that unless in debug mode, some stack traces related to user
     # workflow errors may be suppressed.
+
+    # Set the logging on most modules INFO (instead of default which is WARNING)
+    logger.setLevel(logging.INFO)
+    logging.getLogger('arvados').setLevel(logging.INFO)
+    logging.getLogger('arvados.keep').setLevel(logging.WARNING)
+    # API retries are filtered to the INFO level and can be noisy, but as long as
+    # they succeed we don't need to see warnings about it.
+    googleapiclient_http_logger = logging.getLogger('googleapiclient.http')
+    googleapiclient_http_logger.addFilter(arvados.logging.GoogleHTTPClientFilter())
+    googleapiclient_http_logger.setLevel(logging.WARNING)
+
     if arvargs.debug:
         logger.setLevel(logging.DEBUG)
         logging.getLogger('arvados').setLevel(logging.DEBUG)
+        # In debug mode show logs about retries, but we arn't
+        # debugging the google client so we don't need to see
+        # everything.
+        googleapiclient_http_logger.setLevel(logging.NOTSET)
+        logging.getLogger('googleapiclient').setLevel(logging.INFO)
 
     if arvargs.quiet:
         logger.setLevel(logging.WARN)
@@ -359,6 +420,13 @@ def main(args=sys.argv[1:],
         # unit tests.
         stdout = None
 
+    executor.loadingContext.default_docker_image = arvargs.submit_runner_image or "arvados/jobs:"+__version__
+
+    if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow) or arvargs.workflow.startswith("keep:"):
+        executor.loadingContext.do_validate = False
+        if arvargs.submit and not workflow_op:
+            executor.fast_submit = True
+
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
@@ -369,4 +437,4 @@ def main(args=sys.argv[1:],
                              custom_schema_callback=add_arv_hints,
                              loadingContext=executor.loadingContext,
                              runtimeContext=executor.toplevel_runtimeContext,
-                             input_required=not (arvargs.create_workflow or arvargs.update_workflow))
+                             input_required=not workflow_op)