import re
import os
import sys
+import functools
-from cwltool.process import get_feature
+from cwltool.process import get_feature, adjustFiles, scandeps
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
class ArvPathMapper(cwltool.pathmapper.PathMapper):
def __init__(self, arvrunner, referenced_files, basedir,
- collection_pattern, file_pattern, **kwargs):
+ collection_pattern, file_pattern, name=None, **kwargs):
self._pathmap = arvrunner.get_uploaded()
uploadfiles = []
dry_run=kwargs.get("dry_run"),
num_retries=3,
fnPattern=file_pattern,
+ name=name,
project=arvrunner.project_uuid)
for src, ab, st in uploadfiles:
pass
def submit(self, tool, job_order, input_basedir, args, **kwargs):
- files = set()
- def visitFiles(self, path):
+ workflowfiles = set()
+ jobfiles = set()
+ workflowfiles.add(tool.tool["id"])
+
+ def visitFiles(files, path):
files.add(path)
+ return path
- adjustFiles(process.scandeps("", tool.tool,
- set(("run")),
- set(("$schemas", "path"))),
- visitFiles)
- adjustFiles(job_order, visitFiles)
+ document_loader, _, _ = cwltool.process.get_schema()
+ def loadref(b, u):
+ return document_loader.resolve_ref(u, base_url=b)[0]
- mapper = ArvPathMapper(self, files, "",
- "$(task.keep)/%s",
- "$(task.keep)/%s/%s",
- **kwargs)
+ adjustFiles(scandeps("", tool.tool,
+ set(("run",)),
+ set(("$schemas", "path")),
+ loadref),
+ functools.partial(visitFiles, workflowfiles))
+ adjustFiles(job_order, functools.partial(visitFiles, jobfiles))
+
+ workflowmapper = ArvPathMapper(self, workflowfiles, "",
+ "%s",
+ "%s/%s",
+ name=os.path.basename(tool.tool["id"]),
+ **kwargs)
+
+ jobmapper = ArvPathMapper(self, jobfiles, "",
+ "%s",
+ "%s/%s",
+ name=os.path.basename(job_order.get("id", "#")),
+ **kwargs)
- job_order = adjustFiles(job_order, lambda p: mapper.mapper(p))
+ adjustFiles(job_order, lambda p: jobmapper.mapper(p)[1])
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ job_order["cwl:tool"] = workflowmapper.mapper(tool.tool["id"])[1]
response = self.api.jobs().create(body={
"script": "cwl-runner",
"docker_image": "arvados/jobs"
}
}, find_or_create=args.enable_reuse).execute(num_retries=self.num_retries)
- print response["uuid"]
- return None
+
+ logger.info("Submitted job %s", response["uuid"])
+
+ return
def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
+ useruuid = self.api.users().current().execute()["uuid"]
+ self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+
if args.submit:
- self.submit(tool, job_order, input_basedir, args, **kwargs)
- return
+ return self.submit(tool, job_order, input_basedir, args, **kwargs)
events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
kwargs["outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
- useruuid = self.api.users().current().execute()["uuid"]
- self.project_uuid = args.project_uuid if args.project_uuid else useruuid
-
if kwargs.get("conformance_test"):
return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
else:
default=True, dest="enable_reuse",
help="")
parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
- parser.add_argument("--submit", type=str, help="Submit job and print job uuid.")
+ parser.add_argument("--submit", action="store_true", help="Submit job and print job uuid.",
+ default=False)
try:
- runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+ if api_client is None:
+ api_client=arvados.api('v1', model=OrderedJsonModel())
+ runner = ArvCwlRunner(api_client)
except Exception as e:
logger.error(e)
return 1
- return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)
+ return cwltool.main.main(args,
+ stdout=stdout,
+ stderr=stderr,
+ executor=runner.arvExecutor,
+ makeTool=runner.arvMakeTool,
+ parser=parser)
return prefix+fn
-def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)"):
+def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
# Find the smallest path prefix that includes all the files that need to be uploaded.
# This starts at the root and iteratively removes common parent directory prefixes
# until all file pathes no longer have a common parent.
stream = sp[0]
collection.start_new_stream(stream)
collection.write_file(f.fn, sp[1])
- item = api.collections().create(body={"owner_uuid": project, "manifest_text": collection.manifest_text()}).execute()
+ body = {"owner_uuid": project, "manifest_text": collection.manifest_text()}
+ if name is not None:
+ body["name"] = name
+ item = api.collections().create(body=body).execute()
pdh = item["portable_data_hash"]
logger.info("Uploaded to %s", item["uuid"])