21356: Remove all Python 2/3 compatibility imports
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 341929454ad71a87912e566dfdd86153f7f0881e..30d91b4094265d098b1344b2310470e184565794 100644 (file)
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: Apache-2.0
@@ -6,15 +6,13 @@
 # Implement cwl-runner interface for submitting and running work on Arvados, using
 # the Crunch containers API.
 
-from future.utils import viewitems
-from builtins import str
-
 import argparse
+import importlib.metadata
+import importlib.resources
 import logging
 import os
 import sys
 import re
-import pkg_resources  # part of setuptools
 
 from schema_salad.sourceline import SourceLine
 import schema_salad.validate as validate
@@ -22,19 +20,21 @@ import cwltool.main
 import cwltool.workflow
 import cwltool.process
 import cwltool.argparser
+from cwltool.errors import WorkflowException
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing
 
 import arvados
 import arvados.config
+import arvados.logging
 from arvados.keep import KeepClient
 from arvados.errors import ApiError
 import arvados.commands._util as arv_cmd
-from arvados.api import OrderedJsonModel
 
 from .perf import Perf
 from ._version import __version__
 from .executor import ArvCwlExecutor
+from .fsaccess import workflow_uuid_pattern
 
 # These aren't used directly in this file but
 # other code expects to import them from here
@@ -55,18 +55,18 @@ arvados.log_handler.setFormatter(logging.Formatter(
 
 def versionstring():
     """Print version string of key packages for provenance and debugging."""
-
-    arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
-    arvpkg = pkg_resources.require("arvados-python-client")
-    cwlpkg = pkg_resources.require("cwltool")
-
-    return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
-                                    "arvados-python-client", arvpkg[0].version,
-                                    "cwltool", cwlpkg[0].version)
-
+    return "{} {}, arvados-python-client {}, cwltool {}".format(
+        sys.argv[0],
+        importlib.metadata.version('arvados-cwl-runner'),
+        importlib.metadata.version('arvados-python-client'),
+        importlib.metadata.version('cwltool'),
+    )
 
 def arg_parser():  # type: () -> argparse.ArgumentParser
-    parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
+    parser = argparse.ArgumentParser(
+        description='Arvados executor for Common Workflow Language',
+        parents=[arv_cmd.retry_opt],
+    )
 
     parser.add_argument("--basedir",
                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
@@ -118,6 +118,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
     exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
 
+    exgroup.add_argument("--print-keep-deps", action="store_true", help="To assist copying, print a list of Keep collections that this workflow depends on.")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
                         default=True, dest="wait")
@@ -151,6 +153,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
                         default=False)
 
+    parser.add_argument("--match-submitter-images", action="store_true",
+                        default=False, dest="match_local_docker",
+                        help="Where Arvados has more than one Docker image of the same name, use image from the Docker instance on the submitting node.")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--submit-request-uuid",
                          default=None,
@@ -178,7 +184,9 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Enable loading and running development versions "
                              "of the CWL standards.", default=False)
     parser.add_argument('--storage-classes', default="default",
-                        help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
+                        help="Specify comma separated list of storage classes to be used when saving final workflow output to Keep.")
+    parser.add_argument('--intermediate-storage-classes', default="default",
+                        help="Specify comma separated list of storage classes to be used when saving intermediate workflow output to Keep.")
 
     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
@@ -192,16 +200,45 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         action="store_false", default=True,
                         help=argparse.SUPPRESS)
 
+    parser.add_argument("--disable-git", dest="git_info",
+                        action="store_false", default=True,
+                        help=argparse.SUPPRESS)
+
+    parser.add_argument("--disable-color", dest="enable_color",
+                        action="store_false", default=True,
+                        help=argparse.SUPPRESS)
+
     parser.add_argument("--disable-js-validation",
                         action="store_true", default=False,
                         help=argparse.SUPPRESS)
 
+    parser.add_argument("--fast-parser", dest="fast_parser",
+                        action="store_true", default=False,
+                        help=argparse.SUPPRESS)
+
     parser.add_argument("--thread-count", type=int,
-                        default=1, help="Number of threads to use for job submit and output collection.")
+                        default=0, help="Number of threads to use for job submit and output collection.")
 
     parser.add_argument("--http-timeout", type=int,
                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
 
+    parser.add_argument("--defer-downloads", action="store_true", default=False,
+                        help="When submitting a workflow, defer downloading HTTP URLs to workflow launch instead of downloading to Keep before submit.")
+
+    parser.add_argument("--varying-url-params", type=str, default="",
+                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+    parser.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
+    exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
+    exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.")
+
     parser.add_argument(
         "--skip-schemas",
         action="store_true",
@@ -218,6 +255,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=False, dest="trash_intermediate",
                         help="Do not trash intermediate outputs (default).")
 
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--enable-usage-report", dest="enable_usage_report", default=None, action="store_true", help="Create usage_report.html with a summary of each step's resource usage.")
+    exgroup.add_argument("--disable-usage-report", dest="enable_usage_report", default=None, action="store_false", help="Disable usage report.")
+
     parser.add_argument("workflow", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
@@ -226,15 +267,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 def add_arv_hints():
     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
-    res10 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml')
-    res11 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml')
-    customschema10 = res10.read().decode('utf-8')
-    customschema11 = res11.read().decode('utf-8')
-    use_custom_schema("v1.0", "http://arvados.org/cwl", customschema10)
-    use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema11)
-    use_custom_schema("v1.1", "http://arvados.org/cwl", customschema11)
-    res10.close()
-    res11.close()
+    supported_versions = ["v1.0", "v1.1", "v1.2"]
+    for s in supported_versions:
+        customschema = importlib.resources.read_text(__name__, f'arv-cwl-schema-{s}.yml', 'utf-8')
+        use_custom_schema(s, "http://arvados.org/cwl", customschema)
     cwltool.process.supportedProcessRequirements.extend([
         "http://arvados.org/cwl#RunInSingleContainer",
         "http://arvados.org/cwl#OutputDirType",
@@ -244,24 +280,31 @@ def add_arv_hints():
         "http://commonwl.org/cwltool#LoadListingRequirement",
         "http://arvados.org/cwl#IntermediateOutput",
         "http://arvados.org/cwl#ReuseRequirement",
-        "http://arvados.org/cwl#ClusterTarget"
+        "http://arvados.org/cwl#ClusterTarget",
+        "http://arvados.org/cwl#OutputStorageClass",
+        "http://arvados.org/cwl#ProcessProperties",
+        "http://commonwl.org/cwltool#CUDARequirement",
+        "http://arvados.org/cwl#UsePreemptible",
+        "http://arvados.org/cwl#OutputCollectionProperties",
+        "http://arvados.org/cwl#KeepCacheTypeRequirement",
+        "http://arvados.org/cwl#OutOfMemoryRetry",
     ])
 
 def exit_signal_handler(sigcode, frame):
     logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
     sys.exit(-sigcode)
 
-def main(args, stdout, stderr, api_client=None, keep_client=None,
+def main(args=sys.argv[1:],
+         stdout=sys.stdout,
+         stderr=sys.stderr,
+         api_client=None,
+         keep_client=None,
          install_sig_handlers=True):
     parser = arg_parser()
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
-    if len(arvargs.storage_classes.strip().split(',')) > 1:
-        logger.error(str(u"Multiple storage classes are not supported currently."))
-        return 1
-
     arvargs.use_container = True
     arvargs.relax_path_checks = True
     arvargs.print_supported_versions = False
@@ -280,35 +323,74 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
             return 1
         arvargs.work_api = want_api
 
-    if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+    workflow_op = arvargs.create_workflow or arvargs.update_workflow or arvargs.print_keep_deps
+
+    if workflow_op and not arvargs.job_order:
         job_order_object = ({}, "")
 
     add_arv_hints()
 
-    for key, val in viewitems(cwltool.argparser.get_default_args()):
+    for key, val in cwltool.argparser.get_default_args().items():
         if not hasattr(arvargs, key):
             setattr(arvargs, key, val)
 
     try:
         if api_client is None:
             api_client = arvados.safeapi.ThreadSafeApiCache(
-                api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
-                keep_params={"num_retries": 4})
+                api_params={
+                    'num_retries': arvargs.retries,
+                    'timeout': arvargs.http_timeout,
+                },
+                keep_params={
+                    'num_retries': arvargs.retries,
+                },
+                version='v1',
+            )
             keep_client = api_client.keep
             # Make an API object now so errors are reported early.
             api_client.users().current().execute()
         if keep_client is None:
-            keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
-        executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
+            block_cache = arvados.keep.KeepBlockCache(disk_cache=True)
+            keep_client = arvados.keep.KeepClient(
+                api_client=api_client,
+                block_cache=block_cache,
+                num_retries=arvargs.retries,
+            )
+        executor = ArvCwlExecutor(
+            api_client,
+            arvargs,
+            keep_client=keep_client,
+            num_retries=arvargs.retries,
+            stdout=stdout,
+        )
+    except WorkflowException as e:
+        logger.error(e, exc_info=(sys.exc_info()[1] if arvargs.debug else False))
+        return 1
     except Exception:
         logger.exception("Error creating the Arvados CWL Executor")
         return 1
 
     # Note that unless in debug mode, some stack traces related to user
     # workflow errors may be suppressed.
+
+    # Set the logging on most modules INFO (instead of default which is WARNING)
+    logger.setLevel(logging.INFO)
+    logging.getLogger('arvados').setLevel(logging.INFO)
+    logging.getLogger('arvados.keep').setLevel(logging.WARNING)
+    # API retries are filtered to the INFO level and can be noisy, but as long as
+    # they succeed we don't need to see warnings about it.
+    googleapiclient_http_logger = logging.getLogger('googleapiclient.http')
+    googleapiclient_http_logger.addFilter(arvados.logging.GoogleHTTPClientFilter())
+    googleapiclient_http_logger.setLevel(logging.WARNING)
+
     if arvargs.debug:
         logger.setLevel(logging.DEBUG)
         logging.getLogger('arvados').setLevel(logging.DEBUG)
+        # In debug mode show logs about retries, but we arn't
+        # debugging the google client so we don't need to see
+        # everything.
+        googleapiclient_http_logger.setLevel(logging.NOTSET)
+        logging.getLogger('googleapiclient').setLevel(logging.INFO)
 
     if arvargs.quiet:
         logger.setLevel(logging.WARN)
@@ -335,6 +417,13 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
         # unit tests.
         stdout = None
 
+    executor.loadingContext.default_docker_image = arvargs.submit_runner_image or "arvados/jobs:"+__version__
+
+    if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow) or arvargs.workflow.startswith("keep:"):
+        executor.loadingContext.do_validate = False
+        if arvargs.submit and not workflow_op:
+            executor.fast_submit = True
+
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
@@ -344,5 +433,5 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
                              logger_handler=arvados.log_handler,
                              custom_schema_callback=add_arv_hints,
                              loadingContext=executor.loadingContext,
-                             runtimeContext=executor.runtimeContext,
-                             input_required=not (arvargs.create_workflow or arvargs.update_workflow))
+                             runtimeContext=executor.toplevel_runtimeContext,
+                             input_required=not workflow_op)