7 import arvados.commands.run
8 import arvados.collection
10 from schema_salad.sourceline import SourceLine
12 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
13 from cwltool.workflow import WorkflowException
15 logger = logging.getLogger('arvados.cwl-runner')
17 def trim_listing(obj):
18 """Remove 'listing' field from Directory objects that are keep references.
20 When Directory objects represent Keep references, it is redundant and
21 potentially very expensive to pass fully enumerated Directory objects
22 between instances of cwl-runner (e.g. a submitting a job, or using the
23 RunInSingleContainer feature), so delete the 'listing' field when it is
28 if obj.get("location", "").startswith("keep:") and "listing" in obj:
32 class ArvPathMapper(PathMapper):
33 """Convert container-local paths to and from Keep collection ids."""
35 pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
36 pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
38 def __init__(self, arvrunner, referenced_files, input_basedir,
39 collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
40 self.arvrunner = arvrunner
41 self.input_basedir = input_basedir
42 self.collection_pattern = collection_pattern
43 self.file_pattern = file_pattern
45 self.referenced_files = [r["location"] for r in referenced_files]
46 self.single_collection = single_collection
47 super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
49 def visit(self, srcobj, uploadfiles):
50 src = srcobj["location"]
52 src = src[:src.index("#")]
54 if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
55 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
57 if src not in self._pathmap:
58 if src.startswith("file:"):
59 # Local FS ref, may need to be uploaded or may be on keep
61 ab = abspath(src, self.input_basedir)
62 st = arvados.commands.run.statfile("", ab,
63 fnPattern="keep:%s/%s",
64 dirPattern="keep:%s/%s",
66 with SourceLine(srcobj, "location", WorkflowException):
67 if isinstance(st, arvados.commands.run.UploadFile):
68 uploadfiles.add((src, ab, st))
69 elif isinstance(st, arvados.commands.run.ArvFile):
70 self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
72 raise WorkflowException("Input file path '%s' is invalid" % st)
73 elif src.startswith("_:"):
74 if srcobj["class"] == "File" and "contents" not in srcobj:
75 raise WorkflowException("File literal '%s' is missing `contents`" % src)
76 if srcobj["class"] == "Directory" and "listing" not in srcobj:
77 raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
79 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
81 with SourceLine(srcobj, "secondaryFiles", WorkflowException):
82 for l in srcobj.get("secondaryFiles", []):
83 self.visit(l, uploadfiles)
84 with SourceLine(srcobj, "listing", WorkflowException):
85 for l in srcobj.get("listing", []):
86 self.visit(l, uploadfiles)
88 def addentry(self, obj, c, path, subdirs):
89 if obj["location"] in self._pathmap:
90 src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
93 c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
94 for l in obj.get("secondaryFiles", []):
95 self.addentry(l, c, path, subdirs)
96 elif obj["class"] == "Directory":
97 for l in obj.get("listing", []):
98 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
99 subdirs.append((obj["location"], path + "/" + obj["basename"]))
100 elif obj["location"].startswith("_:") and "contents" in obj:
101 with c.open(path + "/" + obj["basename"], "w") as f:
102 f.write(obj["contents"].encode("utf-8"))
104 raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
106 def setup(self, referenced_files, basedir):
107 # type: (List[Any], unicode) -> None
111 if self.single_collection:
112 collection = arvados.collection.Collection(api_client=self.arvrunner.api,
113 keep_client=self.arvrunner.keep_client,
114 num_retries=self.arvrunner.num_retries)
116 already_uploaded = self.arvrunner.get_uploaded()
118 for k in referenced_files:
120 if loc in already_uploaded:
121 v = already_uploaded[loc]
122 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
123 if self.single_collection:
124 basename = k["basename"]
125 if basename not in collection:
126 self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
127 copied_files.add((loc, basename, v.type))
129 for srcobj in referenced_files:
130 self.visit(srcobj, uploadfiles)
132 arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
135 num_retries=self.arvrunner.num_retries,
136 fnPattern="keep:%s/%s",
138 project=self.arvrunner.project_uuid,
139 collection=collection)
141 for src, ab, st in uploadfiles:
142 self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
143 "Directory" if os.path.isdir(ab) else "File", True)
144 self.arvrunner.add_uploaded(src, self._pathmap[src])
146 for loc, basename, cls in copied_files:
147 fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
148 self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
150 for srcobj in referenced_files:
152 if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
153 c = arvados.collection.Collection(api_client=self.arvrunner.api,
154 keep_client=self.arvrunner.keep_client,
155 num_retries=self.arvrunner.num_retries)
156 for l in srcobj.get("listing", []):
157 self.addentry(l, c, ".", subdirs)
159 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
160 if not check["items"]:
161 c.save_new(owner_uuid=self.arvrunner.project_uuid)
163 ab = self.collection_pattern % c.portable_data_hash()
164 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
165 elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
166 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
168 c = arvados.collection.Collection(api_client=self.arvrunner.api,
169 keep_client=self.arvrunner.keep_client,
170 num_retries=self.arvrunner.num_retries )
171 self.addentry(srcobj, c, ".", subdirs)
173 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
174 if not check["items"]:
175 c.save_new(owner_uuid=self.arvrunner.project_uuid)
177 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
178 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
180 if srcobj.get("secondaryFiles"):
181 ab = self.collection_pattern % c.portable_data_hash()
182 self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
185 for loc, sub in subdirs:
186 # subdirs will all start with "./", strip it off
187 ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
188 self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
189 ab, "Directory", True)
193 def reversemap(self, target):
194 if target.startswith("keep:"):
195 return (target, target)
196 elif self.keepdir and target.startswith(self.keepdir):
197 return (target, "keep:" + target[len(self.keepdir)+1:])
199 return super(ArvPathMapper, self).reversemap(target)
201 class StagingPathMapper(PathMapper):
204 def visit(self, obj, stagedir, basedir, copy=False, staged=False):
205 # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
206 loc = obj["location"]
207 tgt = os.path.join(stagedir, obj["basename"])
208 if obj["class"] == "Directory":
209 self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
210 if loc.startswith("_:") or self._follow_dirs:
211 self.visitlisting(obj.get("listing", []), tgt, basedir)
212 elif obj["class"] == "File":
213 if loc in self._pathmap:
215 if "contents" in obj and loc.startswith("_:"):
216 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
219 self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
221 self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
222 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
225 class VwdPathMapper(StagingPathMapper):
226 def setup(self, referenced_files, basedir):
227 # type: (List[Any], unicode) -> None
229 # Go through each file and set the target to its own directory along
230 # with any secondary files.
231 self.visitlisting(referenced_files, self.stagedir, basedir)
233 for path, (ab, tgt, type, staged) in self._pathmap.items():
234 if type in ("File", "Directory") and ab.startswith("keep:"):
235 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
238 class NoFollowPathMapper(StagingPathMapper):
240 def setup(self, referenced_files, basedir):
241 # type: (List[Any], unicode) -> None
242 self.visitlisting(referenced_files, self.stagedir, basedir)