from .fsaccess import CollectionFsAccess
from .perf import Perf
from .pathmapper import FinalOutputPathMapper
+from ._version import __version__
from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement, getListing
self.uploaded = {}
self.num_retries = 4
self.uuid = None
- self.work_api = work_api
self.stop_polling = threading.Event()
self.poll_api = None
self.pipeline = None
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
- if self.work_api is None:
- # todo: autodetect API to use.
- self.work_api = "jobs"
-
- if self.work_api not in ("containers", "jobs"):
- raise Exception("Unsupported API '%s'" % self.work_api)
+ for api in ["jobs", "containers"]:
+ try:
+ methods = self.api._rootDesc.get('resources')[api]['methods']
+ if ('httpMethod' in methods['create'] and
+ (work_api == api or work_api is None)):
+ self.work_api = api
+ break
+ except KeyError:
+ pass
+ if not self.work_api:
+ if work_api is None:
+ raise Exception("No supported APIs")
+ else:
+ raise Exception("Unsupported API '%s'" % work_api)
def arv_make_tool(self, toolpath_object, **kwargs):
kwargs["work_api"] = self.work_api
try:
self.cond.acquire()
j = self.processes[uuid]
- logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+ txt = self.work_api[0].upper() + self.work_api[1:-1]
+ logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
with Perf(metrics, "done %s" % j.name):
j.done(event["properties"]["new_attributes"])
self.cond.notify()
srccollections = {}
for k,v in generatemapper.items():
+ if k.startswith("_:"):
+ if v.type == "Directory":
+ continue
+ if v.type == "CreateFile":
+ with final.open(v.target, "wb") as f:
+ f.write(v.resolved.encode("utf-8"))
+ continue
+
+ if not k.startswith("keep:"):
+ raise Exception("Output source is not in keep or a literal")
sp = k.split("/")
srccollection = sp[0][5:]
if srccollection not in srccollections:
- srccollections[srccollection] = arvados.collection.CollectionReader(
- srccollection,
- api_client=self.api,
- keep_client=self.keep_client,
- num_retries=self.num_retries)
+ try:
+ srccollections[srccollection] = arvados.collection.CollectionReader(
+ srccollection,
+ api_client=self.api,
+ keep_client=self.keep_client,
+ num_retries=self.num_retries)
+ except arvados.errors.ArgumentError as e:
+ logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+ raise
reader = srccollections[srccollection]
try:
srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
def rewrite(fileobj):
fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
- for k in ("basename", "size", "listing"):
+ for k in ("basename", "listing", "contents"):
if k in fileobj:
del fileobj[k]
final.api_response()["name"],
final.manifest_locator())
- self.final_output_collection = final
+ def finalcollection(fileobj):
+ fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+
+ adjustDirObjs(outputObj, finalcollection)
+ adjustFileObjs(outputObj, finalcollection)
+
+ return (outputObj, final)
+
+ def set_crunch_output(self):
+ if self.work_api == "containers":
+ try:
+ current = self.api.containers().current().execute(num_retries=self.num_retries)
+ self.api.containers().update(uuid=current['uuid'],
+ body={
+ 'output': self.final_output_collection.portable_data_hash(),
+ }).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.info("Setting container output: %s", e)
+ elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
+ self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
+ body={
+ 'output': self.final_output_collection.portable_data_hash(),
+ 'success': self.final_status == "success",
+ 'progress':1.0
+ }).execute(num_retries=self.num_retries)
def arv_executor(self, tool, job_order, **kwargs):
self.debug = kwargs.get("debug")
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
- if self.final_status != "success":
- raise WorkflowException("Workflow failed.")
-
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
else:
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
- self.make_output_collection(self.output_name, self.final_output)
+ self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.final_output)
+ self.set_crunch_output()
+
+ if self.final_status != "success":
+ raise WorkflowException("Workflow failed.")
if kwargs.get("compute_checksum"):
adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
arvpkg = pkg_resources.require("arvados-python-client")
cwlpkg = pkg_resources.require("cwltool")
- return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+ return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
"arvados-python-client", arvpkg[0].version,
"cwltool", cwlpkg[0].version)
parser.add_argument("--api", type=str,
default=None, dest="work_api",
- help="Select work submission API, one of 'jobs' or 'containers'.")
+ help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
parser.add_argument("--compute-checksum", action="store_true", default=False,
help="Compute checksum of contents while collecting outputs",