17417: Merge branch 'main' into 17417-add-arm64
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 2b2acd5688ff3475c24af8242ae0a7c5ef3ffcd9..71ef742e314633bab08b0f493f766227b19b7849 100644 (file)
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: Apache-2.0
@@ -16,51 +16,15 @@ import sys
 import re
 import pkg_resources  # part of setuptools
 
-### begin monkey patch ###
-# Monkey patch solution for bug #16169
-#
-# There is a bug in upstream cwltool where the version updater needs
-# to replace the document fragments in the loader index with the
-# updated ones, but actually it only does it for the root document.
-# Normally we just fix the bug in upstream but that's challenging
-# because current cwltool dropped support for Python 2.7 and we're
-# still supporting py2 in Arvados 2.0 (although py2 support will most
-# likely be dropped in Arvados 2.1).  Making a bugfix fork comes with
-# its own complications (it would need to be added to PyPi) so monkey
-# patching is the least disruptive fix (and is relatively safe because
-# our cwltool dependency is pinned to a specific version).  This
-# should be removed as soon as a bugfix goes into upstream cwltool and
-# we upgrade to it.
-#
-import cwltool.load_tool
-from cwltool.utils import visit_class
-from six.moves import urllib
-original_resolve_and_validate_document = cwltool.load_tool.resolve_and_validate_document
-def wrapped_resolve_and_validate_document(
-        loadingContext,            # type: LoadingContext
-        workflowobj,               # type: Union[CommentedMap, CommentedSeq]
-        uri,                       # type: Text
-        preprocess_only=False,     # type: bool
-        skip_schemas=None,         # type: Optional[bool]
-        ):
-    loadingContext, uri = original_resolve_and_validate_document(loadingContext, workflowobj, uri, preprocess_only, skip_schemas)
-    if loadingContext.do_update in (True, None):
-        fileuri = urllib.parse.urldefrag(uri)[0]
-        def update_index(pr):
-            loadingContext.loader.idx[pr["id"]] = pr
-        visit_class(loadingContext.loader.idx[fileuri], ("CommandLineTool", "Workflow", "ExpressionTool"), update_index)
-    return loadingContext, uri
-cwltool.load_tool.resolve_and_validate_document = wrapped_resolve_and_validate_document
-### end monkey patch ###
-
 from schema_salad.sourceline import SourceLine
 import schema_salad.validate as validate
 import cwltool.main
 import cwltool.workflow
 import cwltool.process
 import cwltool.argparser
+from cwltool.errors import WorkflowException
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing
 
 import arvados
 import arvados.config
@@ -213,9 +177,11 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     parser.add_argument("--enable-dev", action="store_true",
                         help="Enable loading and running development versions "
-                             "of CWL spec.", default=False)
+                             "of the CWL standards.", default=False)
     parser.add_argument('--storage-classes', default="default",
-                        help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
+                        help="Specify comma separated list of storage classes to be used when saving final workflow output to Keep.")
+    parser.add_argument('--intermediate-storage-classes', default="default",
+                        help="Specify comma separated list of storage classes to be used when saving intermediate workflow output to Keep.")
 
     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
@@ -229,16 +195,28 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         action="store_false", default=True,
                         help=argparse.SUPPRESS)
 
+    parser.add_argument("--disable-color", dest="enable_color",
+                        action="store_false", default=True,
+                        help=argparse.SUPPRESS)
+
     parser.add_argument("--disable-js-validation",
                         action="store_true", default=False,
                         help=argparse.SUPPRESS)
 
     parser.add_argument("--thread-count", type=int,
-                        default=1, help="Number of threads to use for job submit and output collection.")
+                        default=0, help="Number of threads to use for job submit and output collection.")
 
     parser.add_argument("--http-timeout", type=int,
                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
 
+    parser.add_argument(
+        "--skip-schemas",
+        action="store_true",
+        help="Skip loading of schemas",
+        default=False,
+        dest="skip_schemas",
+    )
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",
                         default=False, dest="trash_intermediate",
@@ -255,15 +233,12 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 def add_arv_hints():
     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
-    res10 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml')
-    res11 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml')
-    customschema10 = res10.read()
-    customschema11 = res11.read()
-    use_custom_schema("v1.0", "http://arvados.org/cwl", customschema10)
-    use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema11)
-    use_custom_schema("v1.1", "http://arvados.org/cwl", customschema11)
-    res10.close()
-    res11.close()
+    supported_versions = ["v1.0", "v1.1", "v1.2"]
+    for s in supported_versions:
+        res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-%s.yml' % s)
+        customschema = res.read().decode('utf-8')
+        use_custom_schema(s, "http://arvados.org/cwl", customschema)
+        res.close()
     cwltool.process.supportedProcessRequirements.extend([
         "http://arvados.org/cwl#RunInSingleContainer",
         "http://arvados.org/cwl#OutputDirType",
@@ -273,7 +248,9 @@ def add_arv_hints():
         "http://commonwl.org/cwltool#LoadListingRequirement",
         "http://arvados.org/cwl#IntermediateOutput",
         "http://arvados.org/cwl#ReuseRequirement",
-        "http://arvados.org/cwl#ClusterTarget"
+        "http://arvados.org/cwl#ClusterTarget",
+        "http://arvados.org/cwl#OutputStorageClass",
+        "http://arvados.org/cwl#ProcessProperties"
     ])
 
 def exit_signal_handler(sigcode, frame):
@@ -287,10 +264,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
     job_order_object = None
     arvargs = parser.parse_args(args)
 
-    if len(arvargs.storage_classes.strip().split(',')) > 1:
-        logger.error(str(u"Multiple storage classes are not supported currently."))
-        return 1
-
     arvargs.use_container = True
     arvargs.relax_path_checks = True
     arvargs.print_supported_versions = False
@@ -328,7 +301,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
             api_client.users().current().execute()
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
-        executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
+        executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4, stdout=stdout)
+    except WorkflowException as e:
+        logger.error(e, exc_info=(sys.exc_info()[1] if arvargs.debug else False))
+        return 1
     except Exception:
         logger.exception("Error creating the Arvados CWL Executor")
         return 1