from . import done
from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
from .fsaccess import CollectionFetcher
+from .pathmapper import NoFollowPathMapper
+from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
class ArvadosContainer(object):
"""Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
dirs = set()
for f in self.pathmapper.files():
- _, p, tp = self.pathmapper.mapper(f)
- if tp == "Directory" and '/' not in p[6:]:
+ pdh, p, tp, stg = self.pathmapper.mapper(f)
+ if tp == "Directory" and '/' not in pdh:
mounts[p] = {
"kind": "collection",
- "portable_data_hash": p[6:]
+ "portable_data_hash": pdh[5:]
}
- dirs.add(p[6:])
+ dirs.add(pdh)
+
for f in self.pathmapper.files():
- _, p, tp = self.pathmapper.mapper(f)
- if p[6:].split("/")[0] not in dirs:
+ res, p, tp, stg = self.pathmapper.mapper(f)
+ if res.startswith("keep:"):
+ res = res[5:]
+ elif res.startswith("/keep/"):
+ res = res[6:]
+ else:
+ continue
+ sp = res.split("/", 1)
+ pdh = sp[0]
+ if pdh not in dirs:
mounts[p] = {
"kind": "collection",
- "portable_data_hash": p[6:]
+ "portable_data_hash": pdh
}
-
- if self.generatefiles["listing"]:
- raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
+ if len(sp) == 2:
+ mounts[p]["path"] = sp[1]
+
+ with Perf(metrics, "generatefiles %s" % self.name):
+ if self.generatefiles["listing"]:
+ vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
+ separateDirs=False)
+
+ with Perf(metrics, "createfiles %s" % self.name):
+ for f, p in generatemapper.items():
+ if not p.target:
+ pass
+ elif p.type in ("File", "Directory"):
+ source, path = self.arvrunner.fs_access.get_collection(p.resolved)
+ vwd.copy(path, p.target, source_collection=source)
+ elif p.type == "CreateFile":
+ with vwd.open(p.target, "w") as n:
+ n.write(p.resolved.encode("utf-8"))
+
+ with Perf(metrics, "generatefiles.save_new %s" % self.name):
+ vwd.save_new()
+
+ for f, p in generatemapper.items():
+ if not p.target:
+ continue
+ mountpoint = "%s/%s" % (self.outdir, p.target)
+ mounts[mountpoint] = {"kind": "collection",
+ "portable_data_hash": vwd.portable_data_hash(),
+ "path": p.target}
container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
if self.environment:
runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
if "keep_cache" in runtime_req:
- runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"]
+ runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
if partition_req: