-from cwltool.process import get_feature, adjustFiles, scandeps
-from arvados.api import OrderedJsonModel
-
-logger = logging.getLogger('arvados.cwl-runner')
-logger.setLevel(logging.INFO)
-
-crunchrunner_pdh = "ff6fc71e593081ef9733afacaeee15ea+140"
-crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
-certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
-
-tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-
-
-def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
- if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
- dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
-
- sp = dockerRequirement["dockerImageId"].split(":")
- image_name = sp[0]
- image_tag = sp[1] if len(sp) > 1 else None
-
- images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
- image_name=image_name,
- image_tag=image_tag)
-
- if not images:
- imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
- args = ["--project-uuid="+project_uuid, image_name]
- if image_tag:
- args.append(image_tag)
- logger.info("Uploading Docker image %s", ":".join(args[1:]))
- arvados.commands.keepdocker.main(args)
-
- return dockerRequirement["dockerImageId"]
-
-
-class CollectionFsAccess(cwltool.process.StdFsAccess):
- def __init__(self, basedir):
- self.collections = {}
- self.basedir = basedir
-
- def get_collection(self, path):
- p = path.split("/")
- if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
- pdh = p[0][5:]
- if pdh not in self.collections:
- self.collections[pdh] = arvados.collection.CollectionReader(pdh)
- return (self.collections[pdh], "/".join(p[1:]))
- else:
- return (None, path)
-
- def _match(self, collection, patternsegments, parent):
- if not patternsegments:
- return []
-
- if not isinstance(collection, arvados.collection.RichCollectionBase):
- return []
-
- ret = []
- # iterate over the files and subcollections in 'collection'
- for filename in collection:
- if patternsegments[0] == '.':
- # Pattern contains something like "./foo" so just shift
- # past the "./"
- ret.extend(self._match(collection, patternsegments[1:], parent))
- elif fnmatch.fnmatch(filename, patternsegments[0]):
- cur = os.path.join(parent, filename)
- if len(patternsegments) == 1:
- ret.append(cur)
- else:
- ret.extend(self._match(collection[filename], patternsegments[1:], cur))
- return ret
-
- def glob(self, pattern):
- collection, rest = self.get_collection(pattern)
- patternsegments = rest.split("/")
- return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
-
- def open(self, fn, mode):
- collection, rest = self.get_collection(fn)
- if collection:
- return collection.open(rest, mode)
- else:
- return open(self._abs(fn), mode)
-
- def exists(self, fn):
- collection, rest = self.get_collection(fn)
- if collection:
- return collection.exists(rest)
- else:
- return os.path.exists(self._abs(fn))
-
-class ArvadosJob(object):
- def __init__(self, runner):
- self.arvrunner = runner
- self.running = False
-
- def run(self, dry_run=False, pull_image=True, **kwargs):
- script_parameters = {
- "command": self.command_line
- }
- runtime_constraints = {}
-
- if self.generatefiles:
- vwd = arvados.collection.Collection()
- script_parameters["task.vwd"] = {}
- for t in self.generatefiles:
- if isinstance(self.generatefiles[t], dict):
- src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
- vwd.copy(rest, t, source_collection=src)
- else:
- with vwd.open(t, "w") as f:
- f.write(self.generatefiles[t])
- vwd.save_new()
- for t in self.generatefiles:
- script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-
- script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
- if self.environment:
- script_parameters["task.env"].update(self.environment)
-
- if self.stdin:
- script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
-
- if self.stdout:
- script_parameters["task.stdout"] = self.stdout
-
- (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
- if docker_req and kwargs.get("use_container") is not False:
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
-
- resources = self.builder.resources
- if resources is not None:
- runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
- runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
- runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
-
- try:
- response = self.arvrunner.api.jobs().create(body={
- "owner_uuid": self.arvrunner.project_uuid,
- "script": "crunchrunner",
- "repository": "arvados",
- "script_version": "master",
- "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
- "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
- "runtime_constraints": runtime_constraints
- }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
-
- self.arvrunner.jobs[response["uuid"]] = self
-
- self.arvrunner.pipeline["components"][self.name] = {"job": response}
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
-
- logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
- except Exception as e:
- logger.error("Got error %s" % str(e))
- self.output_callback({}, "permanentFail")
-
- def update_pipeline_component(self, record):
- self.arvrunner.pipeline["components"][self.name] = {"job": record}
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
-
- def done(self, record):
- try:
- self.update_pipeline_component(record)
- except:
- pass
-
- try:
- if record["state"] == "Complete":
- processStatus = "success"
- else:
- processStatus = "permanentFail"
-
- try:
- outputs = {}
- if record["output"]:
- logc = arvados.collection.Collection(record["log"])
- log = logc.open(logc.keys()[0])
- tmpdir = None
- outdir = None
- keepdir = None
- for l in log:
- # Determine the tmpdir, outdir and keepdir paths from
- # the job run. Unfortunately, we can't take the first
- # values we find (which are expected to be near the
- # top) and stop scanning because if the node fails and
- # the job restarts on a different node these values
- # will different runs, and we need to know about the
- # final run that actually produced output.
-
- g = tmpdirre.match(l)
- if g:
- tmpdir = g.group(1)
- g = outdirre.match(l)
- if g:
- outdir = g.group(1)
- g = keepre.match(l)
- if g:
- keepdir = g.group(1)
-
- colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
- # check if collection already exists with same owner, name and content
- collection_exists = self.arvrunner.api.collections().list(
- filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
- ['portable_data_hash', '=', record["output"]],
- ["name", "=", colname]]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collection_exists["items"]:
- # Create a collection located in the same project as the
- # pipeline with the contents of the output.
- # First, get output record.
- collections = self.arvrunner.api.collections().list(
- limit=1,
- filters=[['portable_data_hash', '=', record["output"]]],
- select=["manifest_text"]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collections["items"]:
- raise WorkflowException(
- "Job output '%s' cannot be found on API server" % (
- record["output"]))
-
- # Create new collection in the parent project
- # with the output contents.
- self.arvrunner.api.collections().create(body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": colname,
- "portable_data_hash": record["output"],
- "manifest_text": collections["items"][0]["manifest_text"]
- }, ensure_unique_name=True).execute(
- num_retries=self.arvrunner.num_retries)
-
- self.builder.outdir = outdir
- self.builder.pathmapper.keepdir = keepdir
- outputs = self.collect_outputs("keep:" + record["output"])
- except WorkflowException as e:
- logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
- processStatus = "permanentFail"
- except Exception as e:
- logger.exception("Got unknown exception while collecting job outputs:")
- processStatus = "permanentFail"
-
- self.output_callback(outputs, processStatus)
- finally:
- del self.arvrunner.jobs[record["uuid"]]
-
-
-class RunnerJob(object):
- def __init__(self, runner, tool, job_order, enable_reuse):
- self.arvrunner = runner
- self.tool = tool
- self.job_order = job_order
- self.running = False
- self.enable_reuse = enable_reuse
-
- def update_pipeline_component(self, record):
- pass
-
- def upload_docker(self, tool):
- if isinstance(tool, cwltool.draft2tool.CommandLineTool):
- (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
- if docker_req:
- arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
- elif isinstance(tool, cwltool.workflow.Workflow):
- for s in tool.steps:
- self.upload_docker(s.embedded_tool)
-
- def run(self, dry_run=False, pull_image=True, **kwargs):
- self.upload_docker(self.tool)
-
- workflowfiles = set()
- jobfiles = set()
- workflowfiles.add(self.tool.tool["id"])
-
- self.name = os.path.basename(self.tool.tool["id"])
-
- def visitFiles(files, path):
- files.add(path)
- return path
-
- document_loader, _, _ = cwltool.process.get_schema()
- def loadref(b, u):
- return document_loader.resolve_ref(u, base_url=b)[0]
-
- sc = scandeps("", self.tool.tool,
- set(("$import", "run")),
- set(("$include", "$schemas", "path")),
- loadref)
- adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
- adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
-
- workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
- "%s",
- "%s/%s",
- name=self.name,
- **kwargs)
-
- jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
- "%s",
- "%s/%s",
- name=os.path.basename(self.job_order.get("id", "#")),
- **kwargs)
-
- adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
-
- if "id" in self.job_order:
- del self.job_order["id"]
-
- self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
-
- response = self.arvrunner.api.jobs().create(body={
- "script": "cwl-runner",
- "script_version": "8654-arv-jobs-cwl-runner",
- "repository": "arvados",
- "script_parameters": self.job_order,
- "runtime_constraints": {
- "docker_image": "arvados/jobs"
- }
- }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
-
- self.arvrunner.jobs[response["uuid"]] = self
-
- logger.info("Submitted job %s", response["uuid"])
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
-
- def done(self, record):
- if record["state"] == "Complete":
- processStatus = "success"
- else:
- processStatus = "permanentFail"