10576: Working on fetch & url join for keep references.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 8fab9463d6ef2d4fc434a25b001963dbf8aa55b3..c7c186390bc18d9cbe24b58a03079940352d63e3 100644 (file)
@@ -21,13 +21,15 @@ import schema_salad
 
 import arvados
 import arvados.config
+from arvados.keep import KeepClient
+from arvados.errors import ApiError
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from. runner import Runner, upload_instance
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess
+from .fsaccess import CollectionFsAccess, CollectionFetcher
 from .perf import Perf
 from .pathmapper import FinalOutputPathMapper
 from ._version import __version__
@@ -49,7 +51,7 @@ class ArvCwlRunner(object):
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
+    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
         self.api = api_client
         self.processes = {}
         self.lock = threading.Lock()
@@ -57,7 +59,7 @@ class ArvCwlRunner(object):
         self.final_output = None
         self.final_status = None
         self.uploaded = {}
-        self.num_retries = 4
+        self.num_retries = num_retries
         self.uuid = None
         self.stop_polling = threading.Event()
         self.poll_api = None
@@ -72,7 +74,9 @@ class ArvCwlRunner(object):
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
-        for api in ["jobs", "containers"]:
+        self.work_api = None
+        expected_api = ["jobs", "containers"]
+        for api in expected_api:
             try:
                 methods = self.api._rootDesc.get('resources')[api]['methods']
                 if ('httpMethod' in methods['create'] and
@@ -81,11 +85,12 @@ class ArvCwlRunner(object):
                     break
             except KeyError:
                 pass
+
         if not self.work_api:
             if work_api is None:
                 raise Exception("No supported APIs")
             else:
-                raise Exception("Unsupported API '%s'" % work_api)
+                raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
 
     def arv_make_tool(self, toolpath_object, **kwargs):
         kwargs["work_api"] = self.work_api
@@ -150,7 +155,7 @@ class ArvCwlRunner(object):
                     continue
 
                 if self.work_api == "containers":
-                    table = self.poll_api.containers()
+                    table = self.poll_api.container_requests()
                 elif self.work_api == "jobs":
                     table = self.poll_api.jobs()
 
@@ -277,6 +282,12 @@ class ArvCwlRunner(object):
         if self.work_api == "containers":
             try:
                 current = self.api.containers().current().execute(num_retries=self.num_retries)
+            except ApiError as e:
+                # Status code 404 just means we're not running in a container.
+                if e.resp.status != 404:
+                    logger.info("Getting current container: %s", e)
+                return
+            try:
                 self.api.containers().update(uuid=current['uuid'],
                                              body={
                                                  'output': self.final_output_collection.portable_data_hash(),
@@ -308,14 +319,16 @@ class ArvCwlRunner(object):
             if self.work_api == "jobs":
                 tmpl = RunnerTemplate(self, tool, job_order,
                                       kwargs.get("enable_reuse"),
-                                      uuid=existing_uuid)
+                                      uuid=existing_uuid,
+                                      submit_runner_ram=kwargs.get("submit_runner_ram"))
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
                 return tmpl.uuid
             else:
                 return upload_workflow(self, tool, job_order,
                                        self.project_uuid,
-                                       uuid=existing_uuid)
+                                       uuid=existing_uuid,
+                                       submit_runner_ram=kwargs.get("submit_runner_ram"))
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
@@ -346,9 +359,11 @@ class ArvCwlRunner(object):
                                          self.output_callback,
                                          **kwargs).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
+                                                self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
             else:
-                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
+                                      self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
 
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
             # Create pipeline for local run
@@ -524,6 +539,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Compute checksum of contents while collecting outputs",
                         dest="compute_checksum")
 
+    parser.add_argument("--submit-runner-ram", type=int,
+                        help="RAM (in MiB) required for the workflow runner job (default 1024)",
+                        default=1024)
+
     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
@@ -568,7 +587,11 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
-        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
+        if keep_client is None:
+            keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
+        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
+                              num_retries=4, output_name=arvargs.output_name,
+                              output_tags=arvargs.output_tags)
     except Exception as e:
         logger.error(e)
         return 1
@@ -595,4 +618,9 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
                              makeTool=runner.arv_make_tool,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
-                             make_fs_access=partial(CollectionFsAccess, api_client=api_client))
+                             make_fs_access=partial(CollectionFsAccess,
+                                                    api_client=api_client,
+                                                    keep_client=keep_client),
+                             fetcher_constructor=partial(CollectionFetcher,
+                                                         api_client=api_client,
+                                                         keep_client=keep_client))