From 9d209cb34089febeaadeab572a1b4c8d9d485741 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 29 Jun 2015 17:13:46 -0400 Subject: [PATCH] 6264: Uploads files and Docker images, can almost run jobs. --- sdk/python/arvados/commands/cwl_runner.py | 48 ++++++++++++++--------- sdk/python/arvados/commands/run.py | 12 +++--- sdk/python/bin/cwl-runner | 0 3 files changed, 35 insertions(+), 25 deletions(-) mode change 100644 => 100755 sdk/python/bin/cwl-runner diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py index 37995071a1..19fca27c74 100644 --- a/sdk/python/arvados/commands/cwl_runner.py +++ b/sdk/python/arvados/commands/cwl_runner.py @@ -11,6 +11,10 @@ import cwltool.main import threading import cwltool.docker import fnmatch +import logging +import re +import os +from cwltool.process import get_feature logger = logging.getLogger('arvados.cwl-runner') logger.setLevel(logging.INFO) @@ -29,7 +33,10 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image): if not images: imageId = cwltool.docker.get_image(dockerRequirement, pull_image) - arvados.commands.keepdocker.main(dockerRequirement["dockerImageId"]) + args = [image_name] + if image_tag: + args.append(image_tag) + arvados.commands.keepdocker.main(args) return dockerRequirement["dockerImageId"] @@ -81,14 +88,14 @@ class ArvadosJob(object): runtime_constraints = {} if self.stdin: - command["stdin"] = self.stdin + script_parameters["task.stdin"] = self.stdin if self.stdout: - command["stdout"] = self.stdout + script_parameters["task.stdout"] = self.stdout (docker_req, docker_is_req) = get_feature(self, "DockerRequirement") if docker_req and kwargs.get("use_container") is not False: - runtime_constraints["docker_image"] = arv_docker_get(docker_req, pull_image) + runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image) runtime_constraints["arvados_sdk_version"] = "master" response = self.arvrunner.api.jobs().create(body={ @@ -113,7 +120,7 @@ class ArvadosJob(object): class ArvPathMapper(cwltool.pathmapper.PathMapper): - def __init__(self, arvrunner, referenced_files, basedir): + def __init__(self, arvrunner, referenced_files, basedir, **kwargs): self._pathmap = {} uploadfiles = [] @@ -122,16 +129,19 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper): for src in referenced_files: ab = src if os.path.isabs(src) else os.path.join(basedir, src) st = arvados.commands.run.statfile("", ab) - if isinstance(st, arvados.commands.run.UploadFile): + if kwargs.get("conformance_test"): + self._pathmap[src] = (src, ab) + elif isinstance(st, arvados.commands.run.UploadFile): uploadfiles.append((src, ab, st)) elif isinstance(st, arvados.commands.run.ArvFile): self._pathmap[src] = (ab, st.fn) elif isinstance(st, basestring) and pdh_path.match(st): - self._pathmap[src] = (st, "$(file %s") % st) + self._pathmap[src] = (st, "$(file %s)" % st) else: - workflow.WorkflowException("Input file path '%s' is invalid", st) + raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid", st) - arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api) + if uploadfiles: + arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3) for src, ab, st in uploadfiles: self._pathmap[src] = (ab, st.fn) @@ -145,8 +155,8 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool): def makeJobRunner(self): return ArvadosJob(self.arvrunner) - def makePathMapper(self, reffiles, input_basedir): - return ArvadosCommandTool.ArvPathMapper(self.arvrunner, reffiles, input_basedir) + def makePathMapper(self, reffiles, input_basedir, **kwargs): + return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs) class ArvCwlRunner(object): @@ -174,12 +184,12 @@ class ArvCwlRunner(object): if "object_uuid" in event: if event["object_uuid"] in self.jobs and event["event_type"] == "update": if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"): - try: - self.cond.acquire() - self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"]) - self.cond.notify() - finally: - self.cond.release() + try: + self.cond.acquire() + self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"]) + self.cond.notify() + finally: + self.cond.release() def arvExecutor(self, t, job_order, input_basedir, **kwargs): events = arvados.events.subscribe(arvados.api('v1'), [["object_kind", "=", "arvados#job"]], self.on_message) @@ -204,7 +214,7 @@ class ArvCwlRunner(object): finally: self.cond.release() else: - raise workflow.WorkflowException("Workflow deadlocked.") + raise cwltool.workflow.WorkflowException("Workflow deadlocked.") while self.jobs: try: @@ -216,7 +226,7 @@ class ArvCwlRunner(object): events.close() if self.final_output is None: - raise workflow.WorkflowException("Workflow did not return a result.") + raise cwltool.workflow.WorkflowException("Workflow did not return a result.") return self.final_output diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py index 6b929ac3e9..4cbda4ae7d 100644 --- a/sdk/python/arvados/commands/run.py +++ b/sdk/python/arvados/commands/run.py @@ -101,10 +101,12 @@ def statfile(prefix, fn): return prefix+fn -def uploadfiles(files, api): +def uploadfiles(files, api, dry_run=False, num_retries=0, project=None): # Find the smallest path prefix that includes all the files that need to be uploaded. # This starts at the root and iteratively removes common parent directory prefixes # until all file pathes no longer have a common parent. + n = True + pathprefix = "/" while n: pathstep = None for c in files: @@ -133,12 +135,12 @@ def uploadfiles(files, api): logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files])) - if args.dry_run: + if dry_run: logger.info("$(input) is %s", pathprefix.rstrip('/')) pdh = "$(input)" else: files = sorted(files, key=lambda x: x.fn) - collection = arvados.CollectionWriter(api, num_retries=args.retries) + collection = arvados.CollectionWriter(api, num_retries=num_retries) stream = None for f in files: sp = os.path.split(f.fn) @@ -234,11 +236,9 @@ def main(arguments=None): command[i] = statfile(m.group(1), m.group(2)) break - n = True - pathprefix = "/" files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)] if files: - uploadfiles(files, api) + uploadfiles(files, api, dry_run=args.dry_run, num_retries=args.num_retries, project=project) for i in xrange(1, len(slots)): slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]] diff --git a/sdk/python/bin/cwl-runner b/sdk/python/bin/cwl-runner old mode 100644 new mode 100755 -- 2.30.2