import logging
import json
import os
+import urllib
import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
from cwltool.process import get_feature, UnsupportedRequirement, shortname
-from cwltool.pathmapper import adjustFiles, adjustDirObjs
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
from cwltool.utils import aslist
import arvados.collection
from .arvdocker import arv_docker_get_image
from . import done
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location
from .fsaccess import CollectionFetcher
-from .pathmapper import NoFollowPathMapper
+from .pathmapper import NoFollowPathMapper, trim_listing
from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
mounts = {
self.outdir: {
"kind": "tmp"
+ },
+ self.tmpdir: {
+ "kind": "tmp"
}
}
scheduling_parameters = {}
- dirs = set()
- for f in self.pathmapper.files():
- pdh, p, tp, stg = self.pathmapper.mapper(f)
- if tp == "Directory" and '/' not in pdh:
- mounts[p] = {
- "kind": "collection",
- "portable_data_hash": pdh[5:]
- }
- dirs.add(pdh)
-
- for f in self.pathmapper.files():
- res, p, tp, stg = self.pathmapper.mapper(f)
- if res.startswith("keep:"):
- res = res[5:]
- elif res.startswith("/keep/"):
- res = res[6:]
- else:
+ rf = [self.pathmapper.mapper(f) for f in self.pathmapper.referenced_files]
+ rf.sort(key=lambda k: k.resolved)
+ prevdir = None
+ for resolved, target, tp, stg in rf:
+ if not stg:
continue
- sp = res.split("/", 1)
- pdh = sp[0]
- if pdh not in dirs:
- mounts[p] = {
- "kind": "collection",
- "portable_data_hash": pdh
- }
- if len(sp) == 2:
- mounts[p]["path"] = sp[1]
+ if prevdir and target.startswith(prevdir):
+ continue
+ if tp == "Directory":
+ targetdir = target
+ else:
+ targetdir = os.path.dirname(target)
+ sp = resolved.split("/", 1)
+ pdh = sp[0][5:] # remove "keep:"
+ mounts[targetdir] = {
+ "kind": "collection",
+ "portable_data_hash": pdh
+ }
+ if len(sp) == 2:
+ if tp == "Directory":
+ path = sp[1]
+ else:
+ path = os.path.dirname(sp[1])
+ if path and path != "/":
+ mounts[targetdir]["path"] = path
+ prevdir = targetdir + "/"
with Perf(metrics, "generatefiles %s" % self.name):
if self.generatefiles["listing"]:
container_request["environment"].update(self.environment)
if self.stdin:
- raise UnsupportedRequirement("Stdin redirection currently not suppported")
+ sp = self.stdin[6:].split("/", 1)
+ mounts["stdin"] = {"kind": "collection",
+ "portable_data_hash": sp[0],
+ "path": sp[1]}
if self.stderr:
- raise UnsupportedRequirement("Stderr redirection currently not suppported")
+ mounts["stderr"] = {"kind": "file",
+ "path": "%s/%s" % (self.outdir, self.stderr)}
if self.stdout:
mounts["stdout"] = {"kind": "file",
"""
adjustDirObjs(self.job_order, trim_listing)
+ adjustFileObjs(self.job_order, trim_anonymous_location)
+ adjustDirObjs(self.job_order, trim_anonymous_location)
container_req = {
"owner_uuid": self.arvrunner.project_uuid,