import logging
import json
import os
+import urllib
import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
from cwltool.process import get_feature, UnsupportedRequirement, shortname
-from cwltool.pathmapper import adjustFiles, adjustDirObjs
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
from cwltool.utils import aslist
import arvados.collection
from .arvdocker import arv_docker_get_image
from . import done
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location
from .fsaccess import CollectionFetcher
+from .pathmapper import NoFollowPathMapper, trim_listing
+from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
class ArvadosContainer(object):
"""Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
mounts = {
self.outdir: {
"kind": "tmp"
+ },
+ self.tmpdir: {
+ "kind": "tmp"
}
}
scheduling_parameters = {}
- dirs = set()
- for f in self.pathmapper.files():
- _, p, tp = self.pathmapper.mapper(f)
- if tp == "Directory" and '/' not in p[6:]:
- mounts[p] = {
- "kind": "collection",
- "portable_data_hash": p[6:]
- }
- dirs.add(p[6:])
- for f in self.pathmapper.files():
- _, p, tp = self.pathmapper.mapper(f)
- if p[6:].split("/")[0] not in dirs:
- mounts[p] = {
- "kind": "collection",
- "portable_data_hash": p[6:]
- }
-
- if self.generatefiles["listing"]:
- raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
+ rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
+ rf.sort(key=lambda k: k.resolved)
+ prevdir = None
+ for resolved, target, tp, stg in rf:
+ if not stg:
+ continue
+ if prevdir and target.startswith(prevdir):
+ continue
+ if tp == "Directory":
+ targetdir = target
+ else:
+ targetdir = os.path.dirname(target)
+ sp = resolved.split("/", 1)
+ pdh = sp[0][5:] # remove "keep:"
+ mounts[targetdir] = {
+ "kind": "collection",
+ "portable_data_hash": pdh
+ }
+ if len(sp) == 2:
+ if tp == "Directory":
+ path = sp[1]
+ else:
+ path = os.path.dirname(sp[1])
+ if path and path != "/":
+ mounts[targetdir]["path"] = path
+ prevdir = targetdir + "/"
+
+ with Perf(metrics, "generatefiles %s" % self.name):
+ if self.generatefiles["listing"]:
+ vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
+ separateDirs=False)
+
+ with Perf(metrics, "createfiles %s" % self.name):
+ for f, p in generatemapper.items():
+ if not p.target:
+ pass
+ elif p.type in ("File", "Directory"):
+ source, path = self.arvrunner.fs_access.get_collection(p.resolved)
+ vwd.copy(path, p.target, source_collection=source)
+ elif p.type == "CreateFile":
+ with vwd.open(p.target, "w") as n:
+ n.write(p.resolved.encode("utf-8"))
+
+ with Perf(metrics, "generatefiles.save_new %s" % self.name):
+ vwd.save_new()
+
+ for f, p in generatemapper.items():
+ if not p.target:
+ continue
+ mountpoint = "%s/%s" % (self.outdir, p.target)
+ mounts[mountpoint] = {"kind": "collection",
+ "portable_data_hash": vwd.portable_data_hash(),
+ "path": p.target}
container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
if self.environment:
container_request["environment"].update(self.environment)
if self.stdin:
- raise UnsupportedRequirement("Stdin redirection currently not suppported")
+ sp = self.stdin[6:].split("/", 1)
+ mounts["stdin"] = {"kind": "collection",
+ "portable_data_hash": sp[0],
+ "path": sp[1]}
if self.stderr:
- raise UnsupportedRequirement("Stderr redirection currently not suppported")
+ mounts["stderr"] = {"kind": "file",
+ "path": "%s/%s" % (self.outdir, self.stderr)}
if self.stdout:
mounts["stdout"] = {"kind": "file",
runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
if "keep_cache" in runtime_req:
- runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"]
+ runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
if partition_req:
self.uuid = response["uuid"]
self.arvrunner.processes[self.uuid] = self
- logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-
if response["state"] == "Final":
+ logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
self.done(response)
+ else:
+ logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
except Exception as e:
logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
self.output_callback({}, "permanentFail")
"""
adjustDirObjs(self.job_order, trim_listing)
+ adjustFileObjs(self.job_order, trim_anonymous_location)
+ adjustDirObjs(self.job_order, trim_anonymous_location)
container_req = {
"owner_uuid": self.arvrunner.project_uuid,