14198: Add paging on container list, check for valid cluster id
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 1 Nov 2018 18:42:12 +0000 (14:42 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 21 Nov 2018 18:05:25 +0000 (13:05 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/arv-cwl-schema.yml
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/executor.py
services/api/app/controllers/arvados/v1/schema_controller.rb

index 94eaf9560cb1a91abf5781a4a80c73d3cea665f4..902b1ffba299240438c60c8a0a866db598b2a101 100644 (file)
@@ -247,9 +247,9 @@ $graph:
       jsonldPredicate:
         _id: "@type"
         _type: "@vocab"
-    clusterID:
+    cluster_id:
       type: string?
       doc: The cluster to run the container
-    ownerUUID:
+    project_uuid:
       type: string?
       doc: The project that will own the container requests and intermediate collections
index 83307d33101d8742955cecb0fb697fa51a540f4a..130b42b5cc4ae1ded368a4cf4cfdc748532887d1 100644 (file)
@@ -8,6 +8,19 @@ from .arvcontainer import ArvadosContainer
 from .pathmapper import ArvPathMapper
 from .context import ClusterTarget
 from functools import partial
+from schema_salad.sourceline import SourceLine
+from cwltool.errors import WorkflowException
+
+def check_cluster_target(self, builder, runtimeContext):
+    cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
+    if cluster_target_req and runtimeContext.cluster_target_id != id(cluster_target_req):
+        with SourceLine(cluster_target_req, None, WorkflowException, runtimeContext.debug):
+            runtimeContext.cluster_target_id = id(cluster_target_req)
+            runtimeContext.submit_runner_cluster = builder.do_eval(cluster_target_req.get("cluster_id")) or runtimeContext.submit_runner_cluster
+            runtimeContext.project_uuid = builder.do_eval(cluster_target_req.get("project_uuid")) or runtimeContext.project_uuid
+            if runtimeContext.submit_runner_cluster and runtimeContext.submit_runner_cluster not in self.arvrunner.api._rootDesc["remoteHosts"]:
+                raise WorkflowException("Unknown or invalid cluster id '%s' known clusters are %s" % (runtimeContext.submit_runner_cluster,
+                                                                                                      ", ".join(self.arvrunner.api._rootDesc["remoteHosts"].keys())))
 
 class ArvadosCommandTool(CommandLineTool):
     """Wrap cwltool CommandLineTool to override selected methods."""
@@ -45,11 +58,7 @@ class ArvadosCommandTool(CommandLineTool):
 
         runtimeContext = runtimeContext.copy()
 
-        cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
-        if cluster_target_req and runtimeContext.cluster_target_id != id(cluster_target_req):
-            runtimeContext.cluster_target_id = id(cluster_target_req)
-            runtimeContext.submit_runner_cluster = builder.do_eval(cluster_target_req.get("clusterID")) or runtimeContext.submit_runner_cluster
-            runtimeContext.project_uuid = builder.do_eval(cluster_target_req.get("ownerUUID")) or runtimeContext.project_uuid
+        check_cluster_target(self, builder, runtimeContext)
 
         if runtimeContext.work_api == "containers":
             dockerReq, is_req = self.get_requirement("DockerRequirement")
index 2f114f4ffb1c72176aab8e43d0d93659a71feb9f..b689e94efa879cda4b455e9e2e16b6706ef374fc 100644 (file)
@@ -132,11 +132,7 @@ class ArvadosWorkflow(Workflow):
 
     def job(self, joborder, output_callback, runtimeContext):
 
-        cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
-        if cluster_target_req and runtimeContext.cluster_target_id != id(cluster_target_req):
-            runtimeContext.cluster_target_id = id(cluster_target_req)
-            runtimeContext.submit_runner_cluster = builder.do_eval(cluster_target_req.get("clusterID")) or runtimeContext.submit_runner_cluster
-            runtimeContext.project_uuid = builder.do_eval(cluster_target_req.get("ownerUUID")) or runtimeContext.project_uuid
+        check_cluster_target(self, self._init_job(joborder, runtimeContext), runtimeContext)
 
         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
         if not req:
index bf81853be4202c9dc66c86ef165e1d2bdaec9724..8c2023e187061e04737905deeefd1f4b1408aa69 100644 (file)
@@ -326,21 +326,26 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                 elif self.work_api == "jobs":
                     table = self.poll_api.jobs()
 
-                try:
-                    proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
-                except Exception as e:
-                    logger.warn("Error checking states on API server: %s", e)
-                    remain_wait = self.poll_interval
-                    continue
+                pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
 
-                for p in proc_states["items"]:
-                    self.on_message({
-                        "object_uuid": p["uuid"],
-                        "event_type": "update",
-                        "properties": {
-                            "new_attributes": p
-                        }
-                    })
+                while keys:
+                    page = keys[:pageSize]
+                    keys = keys[pageSize:]
+                    try:
+                        proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
+                    except Exception as e:
+                        logger.warn("Error checking states on API server: %s", e)
+                        remain_wait = self.poll_interval
+                        continue
+
+                    for p in proc_states["items"]:
+                        self.on_message({
+                            "object_uuid": p["uuid"],
+                            "event_type": "update",
+                            "properties": {
+                                "new_attributes": p
+                            }
+                        })
                 finish_poll = time.time()
                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
         except:
@@ -631,17 +636,17 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
 
-        if runnerjob:
-            jobiter = iter((runnerjob,))
-        else:
-            if runtimeContext.cwl_runner_job is not None:
-                self.uuid = runtimeContext.cwl_runner_job.get('uuid')
-            jobiter = tool.job(job_order,
-                               self.output_callback,
-                               runtimeContext)
-
         try:
             self.workflow_eval_lock.acquire()
+            if runnerjob:
+                jobiter = iter((runnerjob,))
+            else:
+                if runtimeContext.cwl_runner_job is not None:
+                    self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+                jobiter = tool.job(job_order,
+                                   self.output_callback,
+                                   runtimeContext)
+
             # Holds the lock while this code runs and releases it when
             # it is safe to do so in self.workflow_eval_lock.wait(),
             # at which point on_message can update job state and
@@ -681,7 +686,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
                 logger.error("Interrupted, workflow will be cancelled")
             else:
-                logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
index 2d0bc114fbb4549da0a8696111bfead0a9ea564a..771ef2b1fba0c4009630b1e02a1df3d3b33b8247 100644 (file)
@@ -50,6 +50,7 @@ class Arvados::V1::SchemaController < ApplicationController
         defaultTrashLifetime: Rails.application.config.default_trash_lifetime,
         blobSignatureTtl: Rails.application.config.blob_signature_ttl,
         maxRequestSize: Rails.application.config.max_request_size,
+        maxItemsPerResponse: Rails.application.config.max_items_per_response,
         dockerImageFormats: Rails.application.config.docker_image_formats,
         crunchLogBytesPerEvent: Rails.application.config.crunch_log_bytes_per_event,
         crunchLogSecondsBetweenEvents: Rails.application.config.crunch_log_seconds_between_events,