import re
from functools import partial
import pkg_resources # part of setuptools
+import Queue
+import time
from cwltool.errors import WorkflowException
import cwltool.main
"""
- def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
+ def __init__(self, api_client, work_api=None, keep_client=None,
+ output_name=None, output_tags=None, num_retries=4,
+ parallel_submit_count=4):
self.api = api_client
self.processes = {}
- self.lock = threading.Lock()
- self.cond = threading.Condition(self.lock)
+ self.in_flight = 0
+ self.workflow_eval_lock = threading.Condition(threading.RLock())
self.final_output = None
self.final_status = None
self.uploaded = {}
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
+ self.runnable_queue = Queue.Queue()
+ self.runnable_queue_threads = []
+ self.parallel_submit_count = parallel_submit_count
+ self.poll_interval = 12
if keep_client is not None:
self.keep_client = keep_client
return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
def output_callback(self, out, processStatus):
- if processStatus == "success":
- logger.info("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Complete"}).execute(num_retries=self.num_retries)
- else:
- logger.warn("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
- self.final_status = processStatus
- self.final_output = out
+ with self.workflow_eval_lock:
+ if processStatus == "success":
+ logger.info("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Complete"}).execute(num_retries=self.num_retries)
+ else:
+ logger.warn("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ self.final_status = processStatus
+ self.final_output = out
+
+ def runnable_queue_func(self):
+ while True:
+ task = self.runnable_queue.get()
+ if task is None:
+ return
+ task()
+
+ def start_run(self, runnable, kwargs):
+ with self.workflow_eval_lock:
+ self.in_flight += 1
+ self.runnable_queue.put(partial(runnable.run, **kwargs))
+
+ def process_submitted(self, container):
+ with self.workflow_eval_lock:
+ self.processes[container.uuid] = container
+ self.in_flight -= 1
+
+ def process_done(self, uuid):
+ with self.workflow_eval_lock:
+ if uuid in self.processes:
+ del self.processes[uuid]
+
+ def wrapped_callback(self, cb, obj, st):
+ with self.workflow_eval_lock:
+ cb(obj, st)
+
+ def get_wrapped_callback(self, cb):
+ return partial(self.wrapped_callback, cb)
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.processes and event["event_type"] == "update":
if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
uuid = event["object_uuid"]
- with self.lock:
+ with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is Running", self.label(j), uuid)
j.running = True
j.update_pipeline_component(event["properties"]["new_attributes"])
elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
uuid = event["object_uuid"]
- try:
- self.cond.acquire()
+ with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
- with Perf(metrics, "done %s" % j.name):
- j.done(event["properties"]["new_attributes"])
- self.cond.notify()
- finally:
- self.cond.release()
+ def done_cb():
+ j.done(event["properties"]["new_attributes"])
+ with self.workflow_eval_lock:
+ self.workflow_eval_lock.notify()
+ self.runnable_queue.put(done_cb)
+
def label(self, obj):
return "[%s %s]" % (self.work_api[0:-1], obj.name)
"""
try:
+ remain_wait = self.poll_interval
while True:
- self.stop_polling.wait(15)
+ if remain_wait > 0:
+ self.stop_polling.wait(remain_wait)
if self.stop_polling.is_set():
break
- with self.lock:
- keys = self.processes.keys()
+ with self.workflow_eval_lock:
+ keys = list(self.processes.keys())
if not keys:
+ remain_wait = self.poll_interval
continue
+ begin_poll = time.time()
if self.work_api == "containers":
table = self.poll_api.container_requests()
elif self.work_api == "jobs":
proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
except Exception as e:
logger.warn("Error checking states on API server: %s", e)
+ remain_wait = self.poll_interval
continue
for p in proc_states["items"]:
"new_attributes": p
}
})
+ finish_poll = time.time()
+ remain_wait = self.poll_interval - (finish_poll - begin_poll)
except:
- logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
- self.cond.acquire()
- self.processes.clear()
- self.cond.notify()
- self.cond.release()
+ logger.exception("Fatal error in state polling thread.")
+ with self.workflow_eval_lock:
+ self.processes.clear()
+ self.workflow_eval_lock.notify()
finally:
self.stop_polling.set()
if not obj.get("dockerOutputDirectory").startswith('/'):
raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
"Option 'dockerOutputDirectory' must be an absolute path.")
+ if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
+ raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
for v in obj.itervalues():
self.check_features(v)
elif isinstance(obj, list):
make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
-
+ self.secret_store = kwargs.get("secret_store")
self.trash_intermediate = kwargs["trash_intermediate"]
if self.trash_intermediate and self.work_api != "containers":
# Reload tool object which may have been updated by
# upload_workflow_deps
+ # Don't validate this time because it will just print redundant errors.
tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
makeTool=self.arv_make_tool,
loader=tool.doc_loader,
avsc_names=tool.doc_schema,
- metadata=tool.metadata)
+ metadata=tool.metadata,
+ do_validate=False)
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % kwargs["name"],
submit_runner_image=kwargs.get("submit_runner_image"),
intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
merged_map=merged_map,
- priority=kwargs.get("priority"))
+ priority=kwargs.get("priority"),
+ secret_store=self.secret_store)
elif self.work_api == "jobs":
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
self.output_name,
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
+ for r in xrange(0, self.parallel_submit_count):
+ t = threading.Thread(target=self.runnable_queue_func)
+ self.runnable_queue_threads.append(t)
+ t.start()
+
if runnerjob:
jobiter = iter((runnerjob,))
else:
**kwargs)
try:
- self.cond.acquire()
- # Will continue to hold the lock for the duration of this code
- # except when in cond.wait(), at which point on_message can update
- # job state and process output callbacks.
+ self.workflow_eval_lock.acquire()
+ # Holds the lock while this code runs and releases it when
+ # it is safe to do so in self.workflow_eval_lock.wait(),
+ # at which point on_message can update job state and
+ # process output callbacks.
loopperf = Perf(metrics, "jobiter")
loopperf.__enter__()
if runnable:
with Perf(metrics, "run"):
- runnable.run(**kwargs)
+ self.start_run(runnable, kwargs)
else:
- if self.processes:
- self.cond.wait(1)
+ if (self.in_flight + len(self.processes)) > 0:
+ self.workflow_eval_lock.wait(3)
else:
logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
break
loopperf.__exit__()
while self.processes:
- self.cond.wait(1)
+ self.workflow_eval_lock.wait(3)
except UnsupportedRequirement:
raise
self.api.container_requests().update(uuid=runnerjob.uuid,
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
- self.cond.release()
+ self.workflow_eval_lock.release()
+ try:
+ # Drain queue
+ while not self.runnable_queue.empty():
+ self.runnable_queue.get()
+ except Queue.Empty:
+ pass
self.stop_polling.set()
self.polling_thread.join()
+ for t in self.runnable_queue_threads:
+ self.runnable_queue.put(None)
+ for t in self.runnable_queue_threads:
+ t.join()
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--print-dot", action="store_true",
help="Print workflow visualization in graphviz format and exit")
- exgroup.add_argument("--version", action="store_true", help="Print version and exit")
+ exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
exgroup = parser.add_mutually_exclusive_group()
help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
default=DEFAULT_PRIORITY)
+ parser.add_argument("--disable-validate", dest="do_validate",
+ action="store_false", default=True,
+ help=argparse.SUPPRESS)
+
+ parser.add_argument("--disable-js-validation",
+ action="store_true", default=False,
+ help=argparse.SUPPRESS)
+
+ parser.add_argument("--parallel-submit-count", type=int,
+ default=4, help="Submit requests in parallel (default 4)")
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--trash-intermediate", action="store_true",
default=False, dest="trash_intermediate",
job_order_object = None
arvargs = parser.parse_args(args)
- if arvargs.version:
- print versionstring()
- return
-
if arvargs.update_workflow:
if arvargs.update_workflow.find('-7fd4e-') == 5:
want_api = 'containers'
try:
if api_client is None:
- api_client=arvados.api('v1', model=OrderedJsonModel())
+ api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
+ keep_client = api_client.keep
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,