#
# SPDX-License-Identifier: Apache-2.0
+from __future__ import division
+from builtins import next
+from builtins import object
+from builtins import str
+from future.utils import viewvalues
+
import argparse
import logging
import os
def __init__(self, runtime_status_update_func):
super(RuntimeStatusLoggingHandler, self).__init__()
self.runtime_status_update = runtime_status_update_func
+ self.updatingRuntimeStatus = False
def emit(self, record):
kind = None
kind = 'error'
elif record.levelno >= logging.WARNING:
kind = 'warning'
- if kind is not None:
- log_msg = record.getMessage()
- if '\n' in log_msg:
- # If the logged message is multi-line, use its first line as status
- # and the rest as detail.
- status, detail = log_msg.split('\n', 1)
- self.runtime_status_update(
- kind,
- "%s: %s" % (record.name, status),
- detail
- )
- else:
- self.runtime_status_update(
- kind,
- "%s: %s" % (record.name, record.getMessage())
- )
+ if kind is not None and self.updatingRuntimeStatus is not True:
+ self.updatingRuntimeStatus = True
+ try:
+ log_msg = record.getMessage()
+ if '\n' in log_msg:
+ # If the logged message is multi-line, use its first line as status
+ # and the rest as detail.
+ status, detail = log_msg.split('\n', 1)
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, status),
+ detail
+ )
+ else:
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, record.getMessage())
+ )
+ finally:
+ self.updatingRuntimeStatus = False
+
class ArvCwlExecutor(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
if self.work_api == "jobs":
- logger.warn("""
+ logger.warning("""
*******************************
Using the deprecated 'jobs' API.
self.loadingContext.fetcher_constructor = self.fetcher_constructor
self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
self.loadingContext.construct_tool_object = self.arv_make_tool
+ self.loadingContext.do_update = False
# Add a custom logging handler to the root logger for runtime status reporting
# if running inside a container
if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
root_logger = logging.getLogger('')
+
+ # Remove existing RuntimeStatusLoggingHandlers if they exist
+ handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
+ root_logger.handlers = handlers
+
handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
root_logger.addHandler(handler)
if self.stop_polling.is_set():
break
with self.workflow_eval_lock:
- keys = list(self.processes.keys())
+ keys = list(self.processes)
if not keys:
remain_wait = self.poll_interval
continue
keys = keys[pageSize:]
try:
proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.warn("Error checking states on API server: %s", e)
+ except Exception:
+ logger.exception("Error checking states on API server: %s")
remain_wait = self.poll_interval
continue
for i in self.intermediate_output_collections:
try:
self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
- except:
- logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
- if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+ except Exception:
+ logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ except (KeyboardInterrupt, SystemExit):
break
def check_features(self, obj):
"Option 'dockerOutputDirectory' must be an absolute path.")
if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
- for v in obj.itervalues():
+ for v in viewvalues(obj):
self.check_features(v)
elif isinstance(obj, list):
for i,v in enumerate(obj):
num_retries=self.num_retries)
for k,v in generatemapper.items():
- if k.startswith("_:"):
- if v.type == "Directory":
+ if v.type == "Directory" and v.resolved.startswith("_:"):
continue
- if v.type == "CreateFile":
- with final.open(v.target, "wb") as f:
- f.write(v.resolved.encode("utf-8"))
+ if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
+ with final.open(v.target, "wb") as f:
+ f.write(v.resolved.encode("utf-8"))
continue
- if not k.startswith("keep:"):
+ if not v.resolved.startswith("keep:"):
raise Exception("Output source is not in keep or a literal")
- sp = k.split("/")
+ sp = v.resolved.split("/")
srccollection = sp[0][5:]
try:
reader = self.collection_cache.get(srccollection)
logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
raise
except IOError as e:
- logger.warn("While preparing output collection: %s", e)
+ logger.error("While preparing output collection: %s", e)
+ raise
def rewrite(fileobj):
fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
adjustFileObjs(outputObj, rewrite)
with final.open("cwl.output.json", "w") as f:
- json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
+ res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
+ f.write(res)
final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
body={
'is_trashed': True
}).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.info("Setting container output: %s", e)
+ except Exception:
+ logger.exception("Setting container output")
+ return
elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
body={
uuid=existing_uuid,
submit_runner_ram=runtimeContext.submit_runner_ram,
name=runtimeContext.name,
- merged_map=merged_map)
+ merged_map=merged_map,
+ loadingContext=loadingContext)
tmpl.save()
# cwltool.main will write our return value to stdout.
return (tmpl.uuid, "success")
visited.add(m.group(1))
estimated_size[0] += int(m.group(2))
visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
- runtimeContext.collection_cache_size = max(((estimated_size[0]*192) / (1024*1024))+1, 256)
+ runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
if self.work_api == "containers":
if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
runtimeContext.runnerjob = tool.tool["id"]
- runnerjob = tool.job(job_order,
- self.output_callback,
- runtimeContext).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
+ tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
submit_runner_ram=runtimeContext.submit_runner_ram,
collection_cache_size=runtimeContext.collection_cache_size,
collection_cache_is_default=self.should_estimate_cache_size)
elif self.work_api == "jobs":
- runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
+ tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
submit_runner_ram=runtimeContext.submit_runner_ram,
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
logger.info("Pipeline instance %s", self.pipeline["uuid"])
- if runnerjob and not runtimeContext.wait:
- submitargs = runtimeContext.copy()
- submitargs.submit = False
- runnerjob.run(submitargs)
+ if runtimeContext.cwl_runner_job is not None:
+ self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+
+ jobiter = tool.job(job_order,
+ self.output_callback,
+ runtimeContext)
+
+ if runtimeContext.submit and not runtimeContext.wait:
+ runnerjob = next(jobiter)
+ runnerjob.run(runtimeContext)
return (runnerjob.uuid, "success")
current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
try:
self.workflow_eval_lock.acquire()
- if runnerjob:
- jobiter = iter((runnerjob,))
- else:
- if runtimeContext.cwl_runner_job is not None:
- self.uuid = runtimeContext.cwl_runner_job.get('uuid')
- jobiter = tool.job(job_order,
- self.output_callback,
- runtimeContext)
# Holds the lock while this code runs and releases it when
# it is safe to do so in self.workflow_eval_lock.wait(),
except:
if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
logger.error("Interrupted, workflow will be cancelled")
+ elif isinstance(sys.exc_info()[1], WorkflowException):
+ logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
else:
- logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ logger.exception("Workflow execution failed")
+
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
- if runnerjob and runnerjob.uuid and self.work_api == "containers":
- self.api.container_requests().update(uuid=runnerjob.uuid,
+ if runtimeContext.submit and isinstance(tool, Runner):
+ runnerjob = tool
+ if runnerjob.uuid and self.work_api == "containers":
+ self.api.container_requests().update(uuid=runnerjob.uuid,
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.workflow_eval_lock.release()
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
- if runtimeContext.submit and isinstance(runnerjob, Runner):
- logger.info("Final output collection %s", runnerjob.final_output)
+ if runtimeContext.submit and isinstance(tool, Runner):
+ logger.info("Final output collection %s", tool.final_output)
else:
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))