Merge branch 'patch-1' of https://github.com/mr-c/arvados into mr-c-patch-1
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 605d3300520e78ec47eabf9d7e3af12ed11fa8e7..6f2255b3f8b18f104a00fc2d4982171c51dbb0da 100644 (file)
@@ -4,7 +4,10 @@
 # SPDX-License-Identifier: Apache-2.0
 
 # Implement cwl-runner interface for submitting and running work on Arvados, using
-# either the Crunch jobs API or Crunch containers API.
+# the Crunch containers API.
+
+from future.utils import viewitems
+from builtins import str
 
 import argparse
 import logging
@@ -20,7 +23,7 @@ import cwltool.workflow
 import cwltool.process
 import cwltool.argparser
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing
 
 import arvados
 import arvados.config
@@ -33,10 +36,9 @@ from .perf import Perf
 from ._version import __version__
 from .executor import ArvCwlExecutor
 
-# These arn't used directly in this file but
+# These aren't used directly in this file but
 # other code expects to import them from here
 from .arvcontainer import ArvadosContainer
-from .arvjob import ArvadosJob
 from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
 from .util import get_current_container
@@ -66,9 +68,9 @@ def versionstring():
 def arg_parser():  # type: () -> argparse.ArgumentParser
     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
 
-    parser.add_argument("--basedir", type=str,
+    parser.add_argument("--basedir",
                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
-    parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
+    parser.add_argument("--outdir", default=os.path.abspath('.'),
                         help="Output directory, default current directory")
 
     parser.add_argument("--eval-timeout",
@@ -94,32 +96,32 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-reuse", action="store_true",
                         default=True, dest="enable_reuse",
-                        help="Enable job or container reuse (default)")
+                        help="Enable container reuse (default)")
     exgroup.add_argument("--disable-reuse", action="store_false",
                         default=True, dest="enable_reuse",
-                        help="Disable job or container reuse")
+                        help="Disable container reuse")
 
-    parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
-    parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
-    parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
+    parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow containers, if not provided, will go to home project.")
+    parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None)
+    parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
-                        help="Ignore Docker image version when deciding whether to reuse past jobs.",
+                        help="Ignore Docker image version when deciding whether to reuse past containers.",
                         default=False)
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
                         default=True, dest="submit")
-    exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
+    exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits containers to Arvados).",
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
                          dest="create_workflow")
-    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
-    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
+    exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
 
     exgroup = parser.add_mutually_exclusive_group()
-    exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
+    exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
                         default=True, dest="wait")
-    exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
+    exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner and exit.",
                         default=True, dest="wait")
 
     exgroup = parser.add_mutually_exclusive_group()
@@ -128,10 +130,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
                         default=True, dest="log_timestamps")
 
-    parser.add_argument("--api", type=str,
+    parser.add_argument("--api",
                         default=None, dest="work_api",
-                        choices=("jobs", "containers"),
-                        help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
+                        choices=("containers",),
+                        help="Select work submission API.  Only supports 'containers'")
 
     parser.add_argument("--compute-checksum", action="store_true", default=False,
                         help="Compute checksum of contents while collecting outputs",
@@ -141,34 +143,41 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
                         default=None)
 
-    parser.add_argument("--submit-runner-image", type=str,
+    parser.add_argument("--submit-runner-image",
                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
                         default=None)
 
     parser.add_argument("--always-submit-runner", action="store_true",
-                        help="Always submit a runner to manage the workflow, even when running only a single CommandLineTool",
+                        help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
                         default=False)
 
     exgroup = parser.add_mutually_exclusive_group()
-    exgroup.add_argument("--submit-request-uuid", type=str,
+    exgroup.add_argument("--submit-request-uuid",
+                         default=None,
+                         help="Update and commit to supplied container request instead of creating a new one.",
+                         metavar="UUID")
+    exgroup.add_argument("--submit-runner-cluster",
+                         help="Submit workflow runner to a remote cluster",
+                         default=None,
+                         metavar="CLUSTER_ID")
+
+    parser.add_argument("--collection-cache-size", type=int,
                         default=None,
-                        help="Update and commit to supplied container request instead of creating a new one (containers API only).")
-    exgroup.add_argument("--submit-runner-cluster", type=str,
-                        help="Submit toplevel runner to a remote cluster (containers API only)",
-                        default=None)
+                        help="Collection cache size (in MiB, default 256).")
 
-    parser.add_argument("--name", type=str,
+    parser.add_argument("--name",
                         help="Name to use for workflow execution instance.",
                         default=None)
 
-    parser.add_argument("--on-error", type=str,
-                        help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
-                        "Default is 'continue'.", default="continue", choices=("stop", "continue"))
+    parser.add_argument("--on-error",
+                        help="Desired workflow behavior when a step fails.  One of 'stop' (do not submit any more steps) or "
+                        "'continue' (may submit other steps that are not downstream from the error). Default is 'continue'.",
+                        default="continue", choices=("stop", "continue"))
 
     parser.add_argument("--enable-dev", action="store_true",
                         help="Enable loading and running development versions "
-                             "of CWL spec.", default=False)
-    parser.add_argument('--storage-classes', default="default", type=str,
+                             "of the CWL standards.", default=False)
+    parser.add_argument('--storage-classes', default="default",
                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
 
     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
@@ -176,7 +185,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=0)
 
     parser.add_argument("--priority", type=int,
-                        help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
+                        help="Workflow priority (range 1..1000, higher has precedence over lower)",
                         default=DEFAULT_PRIORITY)
 
     parser.add_argument("--disable-validate", dest="do_validate",
@@ -188,7 +197,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help=argparse.SUPPRESS)
 
     parser.add_argument("--thread-count", type=int,
-                        default=4, help="Number of threads to use for job submit and output collection.")
+                        default=1, help="Number of threads to use for job submit and output collection.")
 
     parser.add_argument("--http-timeout", type=int,
                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
@@ -201,7 +210,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=False, dest="trash_intermediate",
                         help="Do not trash intermediate outputs (default).")
 
-    parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
+    parser.add_argument("workflow", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
     return parser
@@ -209,9 +218,15 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 def add_arv_hints():
     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
-    res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
-    use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
-    res.close()
+    res10 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml')
+    res11 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml')
+    customschema10 = res10.read().decode('utf-8')
+    customschema11 = res11.read().decode('utf-8')
+    use_custom_schema("v1.0", "http://arvados.org/cwl", customschema10)
+    use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema11)
+    use_custom_schema("v1.1", "http://arvados.org/cwl", customschema11)
+    res10.close()
+    res11.close()
     cwltool.process.supportedProcessRequirements.extend([
         "http://arvados.org/cwl#RunInSingleContainer",
         "http://arvados.org/cwl#OutputDirType",
@@ -225,7 +240,7 @@ def add_arv_hints():
     ])
 
 def exit_signal_handler(sigcode, frame):
-    logger.error("Caught signal {}, exiting.".format(sigcode))
+    logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
     sys.exit(-sigcode)
 
 def main(args, stdout, stderr, api_client=None, keep_client=None,
@@ -236,7 +251,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
     arvargs = parser.parse_args(args)
 
     if len(arvargs.storage_classes.strip().split(',')) > 1:
-        logger.error("Multiple storage classes are not supported currently.")
+        logger.error(str(u"Multiple storage classes are not supported currently."))
         return 1
 
     arvargs.use_container = True
@@ -249,12 +264,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'
-        elif arvargs.update_workflow.find('-p5p6p-') == 5:
-            want_api = 'jobs'
         else:
             want_api = None
         if want_api and arvargs.work_api and want_api != arvargs.work_api:
-            logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
+            logger.error(str(u'--update-workflow arg {!r} uses {!r} API, but --api={!r} specified').format(
                 arvargs.update_workflow, want_api, arvargs.work_api))
             return 1
         arvargs.work_api = want_api
@@ -264,7 +277,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
 
     add_arv_hints()
 
-    for key, val in cwltool.argparser.get_default_args().items():
+    for key, val in viewitems(cwltool.argparser.get_default_args()):
         if not hasattr(arvargs, key):
             setattr(arvargs, key, val)
 
@@ -279,10 +292,12 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
-    except Exception as e:
-        logger.error(e)
+    except Exception:
+        logger.exception("Error creating the Arvados CWL Executor")
         return 1
 
+    # Note that unless in debug mode, some stack traces related to user
+    # workflow errors may be suppressed.
     if arvargs.debug:
         logger.setLevel(logging.DEBUG)
         logging.getLogger('arvados').setLevel(logging.DEBUG)
@@ -303,6 +318,15 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
     else:
         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
 
+    if stdout is sys.stdout:
+        # cwltool.main has code to work around encoding issues with
+        # sys.stdout and unix pipes (they default to ASCII encoding,
+        # we want utf-8), so when stdout is sys.stdout set it to None
+        # to take advantage of that.  Don't override it for all cases
+        # since we still want to be able to capture stdout for the
+        # unit tests.
+        stdout = None
+
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
@@ -312,4 +336,5 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
                              logger_handler=arvados.log_handler,
                              custom_schema_callback=add_arv_hints,
                              loadingContext=executor.loadingContext,
-                             runtimeContext=executor.runtimeContext)
+                             runtimeContext=executor.runtimeContext,
+                             input_required=not (arvargs.create_workflow or arvargs.update_workflow))