X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e502060ffe4f68d33e2cca8f8d7544ce40d53eb7..31d31c010bb6b5170e3962fdd50c6d393cfe6076:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 2842e8a114..3c7de77ebf 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -11,6 +11,7 @@ import threading import hashlib import copy import json +import re from functools import partial import pkg_resources # part of setuptools @@ -31,14 +32,14 @@ from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies from .arvtool import ArvadosCommandTool from .arvworkflow import ArvadosWorkflow, upload_workflow -from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver +from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache from .perf import Perf -from .pathmapper import FinalOutputPathMapper +from .pathmapper import NoFollowPathMapper from ._version import __version__ from cwltool.pack import pack -from cwltool.process import shortname, UnsupportedRequirement, getListing -from cwltool.pathmapper import adjustFileObjs, adjustDirObjs +from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema +from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing from cwltool.draft2tool import compute_checksums from arvados.api import OrderedJsonModel @@ -79,6 +80,8 @@ class ArvCwlRunner(object): else: self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries) + self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries) + self.work_api = None expected_api = ["jobs", "containers"] for api in expected_api: @@ -101,7 +104,8 @@ class ArvCwlRunner(object): kwargs["work_api"] = self.work_api kwargs["fetcher_constructor"] = partial(CollectionFetcher, api_client=self.api, - keep_client=self.keep_client) + fs_access=CollectionFsAccess("", collection_cache=self.collection_cache), + num_retries=self.num_retries) if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": return ArvadosCommandTool(self, toolpath_object, **kwargs) elif "class" in toolpath_object and toolpath_object["class"] == "Workflow": @@ -200,17 +204,8 @@ class ArvCwlRunner(object): def check_features(self, obj): if isinstance(obj, dict): - if obj.get("class") == "InitialWorkDirRequirement": - if self.work_api == "containers": - raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers") if obj.get("writable"): raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported") - if obj.get("class") == "CommandLineTool": - if self.work_api == "containers": - if obj.get("stdin"): - raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers") - if obj.get("stderr"): - raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers") if obj.get("class") == "DockerRequirement": if obj.get("dockerOutputDirectory"): # TODO: can be supported by containers API, but not jobs API. @@ -233,13 +228,12 @@ class ArvCwlRunner(object): adjustDirObjs(outputObj, capture) adjustFileObjs(outputObj, capture) - generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False) + generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False) final = arvados.collection.Collection(api_client=self.api, keep_client=self.keep_client, num_retries=self.num_retries) - srccollections = {} for k,v in generatemapper.items(): if k.startswith("_:"): if v.type == "Directory": @@ -253,20 +247,13 @@ class ArvCwlRunner(object): raise Exception("Output source is not in keep or a literal") sp = k.split("/") srccollection = sp[0][5:] - if srccollection not in srccollections: - try: - srccollections[srccollection] = arvados.collection.CollectionReader( - srccollection, - api_client=self.api, - keep_client=self.keep_client, - num_retries=self.num_retries) - except arvados.errors.ArgumentError as e: - logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e) - raise - reader = srccollections[srccollection] try: + reader = self.collection_cache.get(srccollection) srcpath = "/".join(sp[1:]) if len(sp) > 1 else "." final.copy(srcpath, v.target, source_collection=reader, overwrite=False) + except arvados.errors.ArgumentError as e: + logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e) + raise except IOError as e: logger.warn("While preparing output collection: %s", e) @@ -317,6 +304,10 @@ class ArvCwlRunner(object): body={ 'output': self.final_output_collection.portable_data_hash(), }).execute(num_retries=self.num_retries) + self.api.collections().update(uuid=self.final_output_collection.manifest_locator(), + body={ + 'is_trashed': True + }).execute(num_retries=self.num_retries) except Exception as e: logger.info("Setting container output: %s", e) elif self.work_api == "jobs" and "TASK_UUID" in os.environ: @@ -335,8 +326,7 @@ class ArvCwlRunner(object): self.project_uuid = kwargs.get("project_uuid") self.pipeline = None make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, - api_client=self.api, - keep_client=self.keep_client) + collection_cache=self.collection_cache) self.fs_access = make_fs_access(kwargs["basedir"]) if not kwargs.get("name"): @@ -519,7 +509,7 @@ class ArvCwlRunner(object): self.set_crunch_output() if kwargs.get("compute_checksum"): - adjustDirObjs(self.final_output, partial(getListing, self.fs_access)) + adjustDirObjs(self.final_output, partial(get_listing, self.fs_access)) adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access)) return (self.final_output, self.final_status) @@ -627,16 +617,19 @@ def arg_parser(): # type: () -> argparse.ArgumentParser return parser def add_arv_hints(): - cache = {} + cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") + cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml') - cache["http://arvados.org/cwl"] = res.read() + use_custom_schema("v1.0", "http://arvados.org/cwl", res.read()) res.close() - document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0") - _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache) - for n in extnames.names: - if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""): - cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, "")) - document_loader.idx["http://arvados.org/cwl#"+n] = {} + cwltool.process.supportedProcessRequirements.extend([ + "http://arvados.org/cwl#RunInSingleContainer", + "http://arvados.org/cwl#OutputDirType", + "http://arvados.org/cwl#RuntimeConstraints", + "http://arvados.org/cwl#PartitionRequirement", + "http://arvados.org/cwl#APIRequirement", + "http://commonwl.org/cwltool#LoadListingRequirement" + ]) def main(args, stdout, stderr, api_client=None, keep_client=None): parser = arg_parser() @@ -703,6 +696,9 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): arvargs.relax_path_checks = True arvargs.validate = None + make_fs_access = partial(CollectionFsAccess, + collection_cache=runner.collection_cache) + return cwltool.main.main(args=arvargs, stdout=stdout, stderr=stderr, @@ -710,12 +706,11 @@ def main(args, stdout, stderr, api_client=None, keep_client=None): makeTool=runner.arv_make_tool, versionfunc=versionstring, job_order_object=job_order_object, - make_fs_access=partial(CollectionFsAccess, - api_client=api_client, - keep_client=keep_client), + make_fs_access=make_fs_access, fetcher_constructor=partial(CollectionFetcher, api_client=api_client, - keep_client=keep_client, + fs_access=make_fs_access(""), num_retries=runner.num_retries), resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries), - logger_handler=arvados.log_handler) + logger_handler=arvados.log_handler, + custom_schema_callback=add_arv_hints)