from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from. runner import Runner, upload_instance
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess
from .perf import Perf
from .pathmapper import FinalOutputPathMapper
+from ._version import __version__
from cwltool.pack import pack
-from cwltool.process import shortname, UnsupportedRequirement
+from cwltool.process import shortname, UnsupportedRequirement, getListing
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
from cwltool.draft2tool import compute_checksums
from arvados.api import OrderedJsonModel
"""
- def __init__(self, api_client, work_api=None, keep_client=None):
+ def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
self.api = api_client
self.processes = {}
self.lock = threading.Lock()
self.uploaded = {}
self.num_retries = 4
self.uuid = None
- self.work_api = work_api
self.stop_polling = threading.Event()
self.poll_api = None
self.pipeline = None
self.final_output_collection = None
+ self.output_name = output_name
+ self.project_uuid = None
+
if keep_client is not None:
self.keep_client = keep_client
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
- if self.work_api is None:
- # todo: autodetect API to use.
- self.work_api = "jobs"
-
- if self.work_api not in ("containers", "jobs"):
- raise Exception("Unsupported API '%s'" % self.work_api)
+ for api in ["jobs", "containers"]:
+ try:
+ methods = self.api._rootDesc.get('resources')[api]['methods']
+ if ('httpMethod' in methods['create'] and
+ (work_api == api or work_api is None)):
+ self.work_api = api
+ break
+ except KeyError:
+ pass
+ if not self.work_api:
+ if work_api is None:
+ raise Exception("No supported APIs")
+ else:
+ raise Exception("Unsupported API '%s'" % work_api)
def arv_make_tool(self, toolpath_object, **kwargs):
kwargs["work_api"] = self.work_api
final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
- logger.info("Final output collection %s (%s)", final.portable_data_hash(), final.manifest_locator())
+ logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
+ final.api_response()["name"],
+ final.manifest_locator())
self.final_output_collection = final
kwargs["docker_outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
+ upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
+
runnerjob = None
if kwargs.get("submit"):
if self.work_api == "containers":
self.output_callback,
**kwargs).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+ runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
else:
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
+ runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
# Create pipeline for local run
logger.info("Pipeline instance %s", self.pipeline["uuid"])
if runnerjob and not kwargs.get("wait"):
- runnerjob.run()
+ runnerjob.run(wait=kwargs.get("wait"))
return runnerjob.uuid
self.poll_api = arvados.api('v1')
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
- if kwargs.get("submit"):
+ if kwargs.get("submit") and isinstance(runnerjob, Runner):
logger.info("Final output collection %s", runnerjob.final_output)
else:
- self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])),
- self.final_output)
+ if self.output_name is None:
+ self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
+ self.make_output_collection(self.output_name, self.final_output)
if kwargs.get("compute_checksum"):
+ adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
return self.final_output
arvpkg = pkg_resources.require("arvados-python-client")
cwlpkg = pkg_resources.require("cwltool")
- return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+ return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
"arvados-python-client", arvpkg[0].version,
"cwltool", cwlpkg[0].version)
help="")
parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
+ parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
parser.add_argument("--ignore-docker-for-reuse", action="store_true",
help="Ignore Docker image version when deciding whether to reuse past jobs.",
default=False)
try:
if api_client is None:
api_client=arvados.api('v1', model=OrderedJsonModel())
- runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client)
+ runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
except Exception as e:
logger.error(e)
return 1