7 import arvados.commands.run
8 import arvados.collection
10 from schema_salad.sourceline import SourceLine
12 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
13 from cwltool.workflow import WorkflowException
15 logger = logging.getLogger('arvados.cwl-runner')
17 def trim_listing(obj):
18 """Remove 'listing' field from Directory objects that are keep references.
20 When Directory objects represent Keep references, it is redundant and
21 potentially very expensive to pass fully enumerated Directory objects
22 between instances of cwl-runner (e.g. a submitting a job, or using the
23 RunInSingleContainer feature), so delete the 'listing' field when it is
28 if obj.get("location", "").startswith("keep:") and "listing" in obj:
32 class ArvPathMapper(PathMapper):
33 """Convert container-local paths to and from Keep collection ids."""
35 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
36 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
38 def __init__(self, arvrunner, referenced_files, input_basedir,
39 collection_pattern, file_pattern, name=None, **kwargs):
40 self.arvrunner = arvrunner
41 self.input_basedir = input_basedir
42 self.collection_pattern = collection_pattern
43 self.file_pattern = file_pattern
45 self.referenced_files = [r["location"] for r in referenced_files]
46 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
48 def visit(self, srcobj, uploadfiles):
49 src = srcobj["location"]
51 src = src[:src.index("#")]
53 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
54 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
56 if src not in self._pathmap:
57 if src.startswith("file:"):
58 # Local FS ref, may need to be uploaded or may be on keep
60 ab = abspath(src, self.input_basedir)
61 st = arvados.commands.run.statfile("", ab,
62 fnPattern="keep:%s/%s",
63 dirPattern="keep:%s/%s")
64 with SourceLine(srcobj, "location", WorkflowException):
65 if isinstance(st, arvados.commands.run.UploadFile):
66 uploadfiles.add((src, ab, st))
67 elif isinstance(st, arvados.commands.run.ArvFile):
68 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
70 raise WorkflowException("Input file path '%s' is invalid" % st)
71 elif src.startswith("_:"):
72 if srcobj["class"] == "File" and "contents" not in srcobj:
73 raise WorkflowException("File literal '%s' is missing `contents`" % src)
74 if srcobj["class"] == "Directory" and "listing" not in srcobj:
75 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
77 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
79 with SourceLine(srcobj, "secondaryFiles", WorkflowException):
80 for l in srcobj.get("secondaryFiles", []):
81 self.visit(l, uploadfiles)
82 with SourceLine(srcobj, "listing", WorkflowException):
83 for l in srcobj.get("listing", []):
84 self.visit(l, uploadfiles)
86 def addentry(self, obj, c, path, subdirs):
87 if obj["location"] in self._pathmap:
88 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
91 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
92 for l in obj.get("secondaryFiles", []):
93 self.addentry(l, c, path, subdirs)
94 elif obj["class"] == "Directory":
95 for l in obj.get("listing", []):
96 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
97 subdirs.append((obj["location"], path + "/" + obj["basename"]))
98 elif obj["location"].startswith("_:") and "contents" in obj:
99 with c.open(path + "/" + obj["basename"], "w") as f:
100 f.write(obj["contents"].encode("utf-8"))
102 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
104 def setup(self, referenced_files, basedir):
105 # type: (List[Any], unicode) -> None
108 already_uploaded = self.arvrunner.get_uploaded()
109 for k in referenced_files:
111 if loc in already_uploaded:
112 v = already_uploaded[loc]
113 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), "File", True)
115 for srcobj in referenced_files:
116 self.visit(srcobj, uploadfiles)
119 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
122 num_retries=self.arvrunner.num_retries,
123 fnPattern="keep:%s/%s",
125 project=self.arvrunner.project_uuid)
127 for src, ab, st in uploadfiles:
128 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
129 "Directory" if os.path.isdir(ab) else "File", True)
130 self.arvrunner.add_uploaded(src, self._pathmap[src])
132 for srcobj in referenced_files:
134 if srcobj["class"] == "Directory":
135 if srcobj["location"] not in self._pathmap:
136 c = arvados.collection.Collection(api_client=self.arvrunner.api,
137 keep_client=self.arvrunner.keep_client,
138 num_retries=self.arvrunner.num_retries)
139 for l in srcobj.get("listing", []):
140 self.addentry(l, c, ".", subdirs)
142 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
143 if not check["items"]:
144 c.save_new(owner_uuid=self.arvrunner.project_uuid)
146 ab = self.collection_pattern % c.portable_data_hash()
147 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
148 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
149 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
151 c = arvados.collection.Collection(api_client=self.arvrunner.api,
152 keep_client=self.arvrunner.keep_client,
153 num_retries=self.arvrunner.num_retries )
154 self.addentry(srcobj, c, ".", subdirs)
156 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
157 if not check["items"]:
158 c.save_new(owner_uuid=self.arvrunner.project_uuid)
160 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
161 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
163 if srcobj.get("secondaryFiles"):
164 ab = self.collection_pattern % c.portable_data_hash()
165 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
168 for loc, sub in subdirs:
169 # subdirs will all start with "./", strip it off
170 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
171 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
172 ab, "Directory", True)
176 def reversemap(self, target):
177 if target.startswith("keep:"):
178 return (target, target)
179 elif self.keepdir and target.startswith(self.keepdir):
180 return (target, "keep:" + target[len(self.keepdir)+1:])
182 return super(ArvPathMapper, self).reversemap(target)
184 class StagingPathMapper(PathMapper):
187 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
188 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
189 loc = obj["location"]
190 tgt = os.path.join(stagedir, obj["basename"])
191 if obj["class"] == "Directory":
192 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
193 if loc.startswith("_:") or self._follow_dirs:
194 self.visitlisting(obj.get("listing", []), tgt, basedir)
195 elif obj["class"] == "File":
196 if loc in self._pathmap:
198 if "contents" in obj and loc.startswith("_:"):
199 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
202 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
204 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
205 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
208 class VwdPathMapper(StagingPathMapper):
209 def setup(self, referenced_files, basedir):
210 # type: (List[Any], unicode) -> None
212 # Go through each file and set the target to its own directory along
213 # with any secondary files.
214 self.visitlisting(referenced_files, self.stagedir, basedir)
216 for path, (ab, tgt, type, staged) in self._pathmap.items():
217 if type in ("File", "Directory") and ab.startswith("keep:"):
218 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
221 class NoFollowPathMapper(StagingPathMapper):
223 def setup(self, referenced_files, basedir):
224 # type: (List[Any], unicode) -> None
225 self.visitlisting(referenced_files, self.stagedir, basedir)