#!/usr/bin/env python
+# Crunch script integration for running arvados-cwl-runner (importing
+# arvados_cwl module) inside a crunch job.
+#
+# This gets the job record, transforms the script parameters into a valid CWL
+# input object, then executes the CWL runner to run the underlying workflow or
+# tool. When the workflow completes, record the output object in an output
+# collection for this runner job.
+
import arvados
import arvados_cwl
import arvados.collection
from arvados.api import OrderedJsonModel
from cwltool.process import adjustFiles
-print cwltool.main.versionstring()
+# Print package versions
+logging.info(cwltool.main.versionstring())
api = arvados.api("v1")
t = cwltool.main.load_tool(job_order_object, False, True, runner.arvMakeTool, True)
- np = argparse.Namespace()
- np.project_uuid = arvados.current_job()["owner_uuid"]
- np.enable_reuse = True
- np.submit = False
- np.debug = True
- np.quiet = False
- outputObj = runner.arvExecutor(t, job_order_object, "", np, cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]})
+ args = argparse.Namespace()
+ args.project_uuid = arvados.current_job()["owner_uuid"]
+ args.enable_reuse = True
+ args.submit = False
+ args.debug = True
+ args.quiet = False
+ outputObj = runner.arvExecutor(t, job_order_object, "", args, cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]})
files = {}
def capture(path):
#!/usr/bin/env python
+# Implement cwl-runner interface for submitting and running jobs on Arvados.
+
import argparse
import arvados
import arvados.events
def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
+ """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
class CollectionFsAccess(cwltool.process.StdFsAccess):
+ """Implement the cwltool FsAccess interface for Arvados Collections."""
+
def __init__(self, basedir):
self.collections = {}
self.basedir = basedir
return os.path.exists(self._abs(fn))
class ArvadosJob(object):
+ """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
def __init__(self, runner):
self.arvrunner = runner
self.running = False
class RunnerJob(object):
+ """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+
def __init__(self, runner, tool, job_order, enable_reuse):
self.arvrunner = runner
self.tool = tool
del self.arvrunner.jobs[record["uuid"]]
class ArvPathMapper(cwltool.pathmapper.PathMapper):
+ """Convert container-local paths to and from Keep collection ids."""
+
def __init__(self, arvrunner, referenced_files, basedir,
collection_pattern, file_pattern, name=None, **kwargs):
self._pathmap = arvrunner.get_uploaded()
class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
+ """Wrap cwltool CommandLineTool to override selected methods."""
+
def __init__(self, arvrunner, toolpath_object, **kwargs):
super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
self.arvrunner = arvrunner
class ArvCwlRunner(object):
+ """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
+ complete, and report output."""
+
def __init__(self, api_client):
self.api = api_client
self.jobs = {}
return self.final_output
def versionstring():
+ """Print version string of key packages for provenance and debugging."""
+
cwlpkg = pkg_resources.require("cwltool")
arvpkg = pkg_resources.require("arvados-python-client")
arvcwlpkg = pkg_resources.require("arvados-cwl-runner")