jsonldPredicate:
_id: "@type"
_type: "@vocab"
- clusterID:
+ cluster_id:
type: string?
doc: The cluster to run the container
- ownerUUID:
+ project_uuid:
type: string?
doc: The project that will own the container requests and intermediate collections
from .pathmapper import ArvPathMapper
from .context import ClusterTarget
from functools import partial
+from schema_salad.sourceline import SourceLine
+from cwltool.errors import WorkflowException
+
+def check_cluster_target(self, builder, runtimeContext):
+ cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
+ if cluster_target_req and runtimeContext.cluster_target_id != id(cluster_target_req):
+ with SourceLine(cluster_target_req, None, WorkflowException, runtimeContext.debug):
+ runtimeContext.cluster_target_id = id(cluster_target_req)
+ runtimeContext.submit_runner_cluster = builder.do_eval(cluster_target_req.get("cluster_id")) or runtimeContext.submit_runner_cluster
+ runtimeContext.project_uuid = builder.do_eval(cluster_target_req.get("project_uuid")) or runtimeContext.project_uuid
+ if runtimeContext.submit_runner_cluster and runtimeContext.submit_runner_cluster not in self.arvrunner.api._rootDesc["remoteHosts"]:
+ raise WorkflowException("Unknown or invalid cluster id '%s' known clusters are %s" % (runtimeContext.submit_runner_cluster,
+ ", ".join(self.arvrunner.api._rootDesc["remoteHosts"].keys())))
class ArvadosCommandTool(CommandLineTool):
"""Wrap cwltool CommandLineTool to override selected methods."""
runtimeContext = runtimeContext.copy()
- cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
- if cluster_target_req and runtimeContext.cluster_target_id != id(cluster_target_req):
- runtimeContext.cluster_target_id = id(cluster_target_req)
- runtimeContext.submit_runner_cluster = builder.do_eval(cluster_target_req.get("clusterID")) or runtimeContext.submit_runner_cluster
- runtimeContext.project_uuid = builder.do_eval(cluster_target_req.get("ownerUUID")) or runtimeContext.project_uuid
+ check_cluster_target(self, builder, runtimeContext)
if runtimeContext.work_api == "containers":
dockerReq, is_req = self.get_requirement("DockerRequirement")
def job(self, joborder, output_callback, runtimeContext):
- cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
- if cluster_target_req and runtimeContext.cluster_target_id != id(cluster_target_req):
- runtimeContext.cluster_target_id = id(cluster_target_req)
- runtimeContext.submit_runner_cluster = builder.do_eval(cluster_target_req.get("clusterID")) or runtimeContext.submit_runner_cluster
- runtimeContext.project_uuid = builder.do_eval(cluster_target_req.get("ownerUUID")) or runtimeContext.project_uuid
+ check_cluster_target(self, self._init_job(joborder, runtimeContext), runtimeContext)
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
if not req:
elif self.work_api == "jobs":
table = self.poll_api.jobs()
- try:
- proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.warn("Error checking states on API server: %s", e)
- remain_wait = self.poll_interval
- continue
+ pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
- for p in proc_states["items"]:
- self.on_message({
- "object_uuid": p["uuid"],
- "event_type": "update",
- "properties": {
- "new_attributes": p
- }
- })
+ while keys:
+ page = keys[:pageSize]
+ keys = keys[pageSize:]
+ try:
+ proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.warn("Error checking states on API server: %s", e)
+ remain_wait = self.poll_interval
+ continue
+
+ for p in proc_states["items"]:
+ self.on_message({
+ "object_uuid": p["uuid"],
+ "event_type": "update",
+ "properties": {
+ "new_attributes": p
+ }
+ })
finish_poll = time.time()
remain_wait = self.poll_interval - (finish_poll - begin_poll)
except:
self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
- if runnerjob:
- jobiter = iter((runnerjob,))
- else:
- if runtimeContext.cwl_runner_job is not None:
- self.uuid = runtimeContext.cwl_runner_job.get('uuid')
- jobiter = tool.job(job_order,
- self.output_callback,
- runtimeContext)
-
try:
self.workflow_eval_lock.acquire()
+ if runnerjob:
+ jobiter = iter((runnerjob,))
+ else:
+ if runtimeContext.cwl_runner_job is not None:
+ self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+ jobiter = tool.job(job_order,
+ self.output_callback,
+ runtimeContext)
+
# Holds the lock while this code runs and releases it when
# it is safe to do so in self.workflow_eval_lock.wait(),
# at which point on_message can update job state and
if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
logger.error("Interrupted, workflow will be cancelled")
else:
- logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
defaultTrashLifetime: Rails.application.config.default_trash_lifetime,
blobSignatureTtl: Rails.application.config.blob_signature_ttl,
maxRequestSize: Rails.application.config.max_request_size,
+ maxItemsPerResponse: Rails.application.config.max_items_per_response,
dockerImageFormats: Rails.application.config.docker_image_formats,
crunchLogBytesPerEvent: Rails.application.config.crunch_log_bytes_per_event,
crunchLogSecondsBetweenEvents: Rails.application.config.crunch_log_seconds_between_events,