Merge branch '18842-arv-mount-disk-config' refs #18842
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 4bfe272789062018d47e33e1b46394a725f45e97..550ecba1c100c95df9fc5358564d6bcd4fe9bacc 100644 (file)
@@ -22,6 +22,7 @@ import cwltool.main
 import cwltool.workflow
 import cwltool.process
 import cwltool.argparser
+from cwltool.errors import WorkflowException
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing
 
@@ -35,6 +36,7 @@ from arvados.api import OrderedJsonModel
 from .perf import Perf
 from ._version import __version__
 from .executor import ArvCwlExecutor
+from .fsaccess import workflow_uuid_pattern
 
 # These aren't used directly in this file but
 # other code expects to import them from here
@@ -151,6 +153,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="When invoked with --submit --wait, always submit a runner to manage the workflow, even when only running a single CommandLineTool",
                         default=False)
 
+    parser.add_argument("--match-submitter-images", action="store_true",
+                        default=False, dest="match_local_docker",
+                        help="Where Arvados has more than one Docker image of the same name, use image from the Docker instance on the submitting node.")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--submit-request-uuid",
                          default=None,
@@ -178,7 +184,9 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Enable loading and running development versions "
                              "of the CWL standards.", default=False)
     parser.add_argument('--storage-classes', default="default",
-                        help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
+                        help="Specify comma separated list of storage classes to be used when saving final workflow output to Keep.")
+    parser.add_argument('--intermediate-storage-classes', default="default",
+                        help="Specify comma separated list of storage classes to be used when saving intermediate workflow output to Keep.")
 
     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
@@ -192,6 +200,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         action="store_false", default=True,
                         help=argparse.SUPPRESS)
 
+    parser.add_argument("--disable-git", dest="git_info",
+                        action="store_false", default=True,
+                        help=argparse.SUPPRESS)
+
     parser.add_argument("--disable-color", dest="enable_color",
                         action="store_false", default=True,
                         help=argparse.SUPPRESS)
@@ -201,11 +213,28 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help=argparse.SUPPRESS)
 
     parser.add_argument("--thread-count", type=int,
-                        default=1, help="Number of threads to use for job submit and output collection.")
+                        default=0, help="Number of threads to use for job submit and output collection.")
 
     parser.add_argument("--http-timeout", type=int,
                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
 
+    parser.add_argument("--defer-downloads", action="store_true", default=False,
+                        help="When submitting a workflow, defer downloading HTTP URLs to workflow launch instead of downloading to Keep before submit.")
+
+    parser.add_argument("--varying-url-params", type=str, default="",
+                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+    parser.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
+    exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
+    exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.")
+
     parser.add_argument(
         "--skip-schemas",
         action="store_true",
@@ -245,24 +274,29 @@ def add_arv_hints():
         "http://commonwl.org/cwltool#LoadListingRequirement",
         "http://arvados.org/cwl#IntermediateOutput",
         "http://arvados.org/cwl#ReuseRequirement",
-        "http://arvados.org/cwl#ClusterTarget"
+        "http://arvados.org/cwl#ClusterTarget",
+        "http://arvados.org/cwl#OutputStorageClass",
+        "http://arvados.org/cwl#ProcessProperties",
+        "http://commonwl.org/cwltool#CUDARequirement",
+        "http://arvados.org/cwl#UsePreemptible",
+        "http://arvados.org/cwl#OutputCollectionProperties",
     ])
 
 def exit_signal_handler(sigcode, frame):
     logger.error(str(u"Caught signal {}, exiting.").format(sigcode))
     sys.exit(-sigcode)
 
-def main(args, stdout, stderr, api_client=None, keep_client=None,
+def main(args=sys.argv[1:],
+         stdout=sys.stdout,
+         stderr=sys.stderr,
+         api_client=None,
+         keep_client=None,
          install_sig_handlers=True):
     parser = arg_parser()
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
-    if len(arvargs.storage_classes.strip().split(',')) > 1:
-        logger.error(str(u"Multiple storage classes are not supported currently."))
-        return 1
-
     arvargs.use_container = True
     arvargs.relax_path_checks = True
     arvargs.print_supported_versions = False
@@ -299,8 +333,12 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
             # Make an API object now so errors are reported early.
             api_client.users().current().execute()
         if keep_client is None:
-            keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
-        executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
+            block_cache = arvados.keep.KeepBlockCache(disk_cache=True)
+            keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4, block_cache=block_cache)
+        executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4, stdout=stdout)
+    except WorkflowException as e:
+        logger.error(e, exc_info=(sys.exc_info()[1] if arvargs.debug else False))
+        return 1
     except Exception:
         logger.exception("Error creating the Arvados CWL Executor")
         return 1
@@ -336,6 +374,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
         # unit tests.
         stdout = None
 
+    if arvargs.submit and (arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow)):
+        executor.loadingContext.do_validate = False
+        executor.fast_submit = True
+
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
@@ -345,5 +387,5 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
                              logger_handler=arvados.log_handler,
                              custom_schema_callback=add_arv_hints,
                              loadingContext=executor.loadingContext,
-                             runtimeContext=executor.runtimeContext,
+                             runtimeContext=executor.toplevel_runtimeContext,
                              input_required=not (arvargs.create_workflow or arvargs.update_workflow))