10111: Merge branch 'master' into 10111-collection-labels
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 1e109920fabaae4fa208615cbbaabd8f9161c3a1..46436b54dc730163faa7c431960cd9d76ebbda0f 100644 (file)
@@ -32,14 +32,14 @@ from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from ._version import __version__
 
 from cwltool.pack import pack
-from cwltool.process import shortname, UnsupportedRequirement, getListing
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
@@ -80,6 +80,8 @@ class ArvCwlRunner(object):
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
+        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+
         self.work_api = None
         expected_api = ["jobs", "containers"]
         for api in expected_api:
@@ -102,7 +104,8 @@ class ArvCwlRunner(object):
         kwargs["work_api"] = self.work_api
         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
                                                 api_client=self.api,
-                                                keep_client=self.keep_client)
+                                                fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+                                                num_retries=self.num_retries)
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
@@ -203,12 +206,6 @@ class ArvCwlRunner(object):
         if isinstance(obj, dict):
             if obj.get("writable"):
                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
-            if obj.get("class") == "CommandLineTool":
-                if self.work_api == "containers":
-                    if obj.get("stdin"):
-                        raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
-                    if obj.get("stderr"):
-                        raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
             if obj.get("class") == "DockerRequirement":
                 if obj.get("dockerOutputDirectory"):
                     # TODO: can be supported by containers API, but not jobs API.
@@ -237,7 +234,6 @@ class ArvCwlRunner(object):
                                               keep_client=self.keep_client,
                                               num_retries=self.num_retries)
 
-        srccollections = {}
         for k,v in generatemapper.items():
             if k.startswith("_:"):
                 if v.type == "Directory":
@@ -251,20 +247,13 @@ class ArvCwlRunner(object):
                 raise Exception("Output source is not in keep or a literal")
             sp = k.split("/")
             srccollection = sp[0][5:]
-            if srccollection not in srccollections:
-                try:
-                    srccollections[srccollection] = arvados.collection.CollectionReader(
-                        srccollection,
-                        api_client=self.api,
-                        keep_client=self.keep_client,
-                        num_retries=self.num_retries)
-                except arvados.errors.ArgumentError as e:
-                    logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
-                    raise
-            reader = srccollections[srccollection]
             try:
+                reader = self.collection_cache.get(srccollection)
                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
+            except arvados.errors.ArgumentError as e:
+                logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+                raise
             except IOError as e:
                 logger.warn("While preparing output collection: %s", e)
 
@@ -337,8 +326,7 @@ class ArvCwlRunner(object):
         self.project_uuid = kwargs.get("project_uuid")
         self.pipeline = None
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
-                                                                 api_client=self.api,
-                                                                 keep_client=self.keep_client)
+                                                                 collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
         if not kwargs.get("name"):
@@ -521,7 +509,7 @@ class ArvCwlRunner(object):
             self.set_crunch_output()
 
         if kwargs.get("compute_checksum"):
-            adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
+            adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
         return (self.final_output, self.final_status)
@@ -551,7 +539,12 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
                         type=float,
                         default=20)
-    parser.add_argument("--version", action="store_true", help="Print version and exit")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--print-dot", action="store_true",
+                         help="Print workflow visualization in graphviz format and exit")
+    exgroup.add_argument("--version", action="store_true", help="Print version and exit")
+    exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
@@ -623,24 +616,29 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
 
+    parser.add_argument("--enable-dev", action="store_true",
+                        help="Enable loading and running development versions "
+                             "of CWL spec.", default=False)
+
     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
     return parser
 
 def add_arv_hints():
-    cache = {}
-    cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+    cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
-    cache["http://arvados.org/cwl"] = res.read()
+    use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
     res.close()
-    document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
-    _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
-    for n in extnames.names:
-        if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
-            cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
-        document_loader.idx["http://arvados.org/cwl#"+n] = {}
+    cwltool.process.supportedProcessRequirements.extend([
+        "http://arvados.org/cwl#RunInSingleContainer",
+        "http://arvados.org/cwl#OutputDirType",
+        "http://arvados.org/cwl#RuntimeConstraints",
+        "http://arvados.org/cwl#PartitionRequirement",
+        "http://arvados.org/cwl#APIRequirement",
+        "http://commonwl.org/cwltool#LoadListingRequirement"
+    ])
 
 def main(args, stdout, stderr, api_client=None, keep_client=None):
     parser = arg_parser()
@@ -707,6 +705,9 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     arvargs.relax_path_checks = True
     arvargs.validate = None
 
+    make_fs_access = partial(CollectionFsAccess,
+                           collection_cache=runner.collection_cache)
+
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
@@ -714,12 +715,11 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
                              makeTool=runner.arv_make_tool,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
-                             make_fs_access=partial(CollectionFsAccess,
-                                                    api_client=api_client,
-                                                    keep_client=keep_client),
+                             make_fs_access=make_fs_access,
                              fetcher_constructor=partial(CollectionFetcher,
                                                          api_client=api_client,
-                                                         keep_client=keep_client,
+                                                         fs_access=make_fs_access(""),
                                                          num_retries=runner.num_retries),
                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
-                             logger_handler=arvados.log_handler)
+                             logger_handler=arvados.log_handler,
+                             custom_schema_callback=add_arv_hints)