Merge branch '12278-cwl-debug-flag' closes #12278
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6 import logging
7 import uuid
8 import os
9 import urllib
10
11 import arvados.commands.run
12 import arvados.collection
13
14 from schema_salad.sourceline import SourceLine
15
16 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
17 from cwltool.workflow import WorkflowException
18
19 logger = logging.getLogger('arvados.cwl-runner')
20
21 def trim_listing(obj):
22     """Remove 'listing' field from Directory objects that are keep references.
23
24     When Directory objects represent Keep references, it is redundant and
25     potentially very expensive to pass fully enumerated Directory objects
26     between instances of cwl-runner (e.g. a submitting a job, or using the
27     RunInSingleContainer feature), so delete the 'listing' field when it is
28     safe to do so.
29
30     """
31
32     if obj.get("location", "").startswith("keep:") and "listing" in obj:
33         del obj["listing"]
34
35
36 class ArvPathMapper(PathMapper):
37     """Convert container-local paths to and from Keep collection ids."""
38
39     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
40     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
41
42     def __init__(self, arvrunner, referenced_files, input_basedir,
43                  collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
44         self.arvrunner = arvrunner
45         self.input_basedir = input_basedir
46         self.collection_pattern = collection_pattern
47         self.file_pattern = file_pattern
48         self.name = name
49         self.referenced_files = [r["location"] for r in referenced_files]
50         self.single_collection = single_collection
51         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
52
53     def visit(self, srcobj, uploadfiles):
54         src = srcobj["location"]
55         if "#" in src:
56             src = src[:src.index("#")]
57
58         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
59             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
60
61         debug = logger.isEnabledFor(logging.DEBUG)
62
63         if src not in self._pathmap:
64             if src.startswith("file:"):
65                 # Local FS ref, may need to be uploaded or may be on keep
66                 # mount.
67                 ab = abspath(src, self.input_basedir)
68                 st = arvados.commands.run.statfile("", ab,
69                                                    fnPattern="keep:%s/%s",
70                                                    dirPattern="keep:%s/%s",
71                                                    raiseOSError=True)
72                 with SourceLine(srcobj, "location", WorkflowException, debug):
73                     if isinstance(st, arvados.commands.run.UploadFile):
74                         uploadfiles.add((src, ab, st))
75                     elif isinstance(st, arvados.commands.run.ArvFile):
76                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
77                     else:
78                         raise WorkflowException("Input file path '%s' is invalid" % st)
79             elif src.startswith("_:"):
80                 if srcobj["class"] == "File" and "contents" not in srcobj:
81                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
82                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
83                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
84             else:
85                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
86
87         with SourceLine(srcobj, "secondaryFiles", WorkflowException, debug):
88             for l in srcobj.get("secondaryFiles", []):
89                 self.visit(l, uploadfiles)
90         with SourceLine(srcobj, "listing", WorkflowException, debug):
91             for l in srcobj.get("listing", []):
92                 self.visit(l, uploadfiles)
93
94     def addentry(self, obj, c, path, subdirs):
95         if obj["location"] in self._pathmap:
96             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
97             if srcpath == "":
98                 srcpath = "."
99             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
100             for l in obj.get("secondaryFiles", []):
101                 self.addentry(l, c, path, subdirs)
102         elif obj["class"] == "Directory":
103             for l in obj.get("listing", []):
104                 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
105             subdirs.append((obj["location"], path + "/" + obj["basename"]))
106         elif obj["location"].startswith("_:") and "contents" in obj:
107             with c.open(path + "/" + obj["basename"], "w") as f:
108                 f.write(obj["contents"].encode("utf-8"))
109         else:
110             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
111
112     def setup(self, referenced_files, basedir):
113         # type: (List[Any], unicode) -> None
114         uploadfiles = set()
115
116         collection = None
117         if self.single_collection:
118             collection = arvados.collection.Collection(api_client=self.arvrunner.api,
119                                                        keep_client=self.arvrunner.keep_client,
120                                                        num_retries=self.arvrunner.num_retries)
121
122         already_uploaded = self.arvrunner.get_uploaded()
123         copied_files = set()
124         for k in referenced_files:
125             loc = k["location"]
126             if loc in already_uploaded:
127                 v = already_uploaded[loc]
128                 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
129                 if self.single_collection:
130                     basename = k["basename"]
131                     if basename not in collection:
132                         self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
133                         copied_files.add((loc, basename, v.type))
134
135         for srcobj in referenced_files:
136             self.visit(srcobj, uploadfiles)
137
138         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
139                                          self.arvrunner.api,
140                                          dry_run=False,
141                                          num_retries=self.arvrunner.num_retries,
142                                          fnPattern="keep:%s/%s",
143                                          name=self.name,
144                                          project=self.arvrunner.project_uuid,
145                                          collection=collection)
146
147         for src, ab, st in uploadfiles:
148             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
149                                            "Directory" if os.path.isdir(ab) else "File", True)
150             self.arvrunner.add_uploaded(src, self._pathmap[src])
151
152         for loc, basename, cls in copied_files:
153             fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
154             self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
155
156         for srcobj in referenced_files:
157             subdirs = []
158             if srcobj["class"] == "Directory" and srcobj["location"] not in self._pathmap:
159                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
160                                                   keep_client=self.arvrunner.keep_client,
161                                                   num_retries=self.arvrunner.num_retries)
162                 for l in srcobj.get("listing", []):
163                     self.addentry(l, c, ".", subdirs)
164
165                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
166                 if not check["items"]:
167                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
168
169                 ab = self.collection_pattern % c.portable_data_hash()
170                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
171             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
172                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
173
174                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
175                                                   keep_client=self.arvrunner.keep_client,
176                                                   num_retries=self.arvrunner.num_retries                                                  )
177                 self.addentry(srcobj, c, ".", subdirs)
178
179                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
180                 if not check["items"]:
181                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
182
183                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
184                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
185                                                               ab, "File", True)
186                 if srcobj.get("secondaryFiles"):
187                     ab = self.collection_pattern % c.portable_data_hash()
188                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
189
190             if subdirs:
191                 for loc, sub in subdirs:
192                     # subdirs will all start with "./", strip it off
193                     ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
194                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
195                                                    ab, "Directory", True)
196
197         self.keepdir = None
198
199     def reversemap(self, target):
200         p = super(ArvPathMapper, self).reversemap(target)
201         if p:
202             return p
203         elif target.startswith("keep:"):
204             return (target, target)
205         elif self.keepdir and target.startswith(self.keepdir):
206             kp = "keep:" + target[len(self.keepdir)+1:]
207             return (kp, kp)
208         else:
209             return None
210
211 class StagingPathMapper(PathMapper):
212     _follow_dirs = True
213
214     def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
215         self.targets = set()
216         super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
217
218     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
219         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
220         loc = obj["location"]
221         tgt = os.path.join(stagedir, obj["basename"])
222         basetgt, baseext = os.path.splitext(tgt)
223         n = 1
224         while tgt in self.targets:
225             n += 1
226             tgt = "%s_%i%s" % (basetgt, n, baseext)
227         self.targets.add(tgt)
228         if obj["class"] == "Directory":
229             self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
230             if loc.startswith("_:") or self._follow_dirs:
231                 self.visitlisting(obj.get("listing", []), tgt, basedir)
232         elif obj["class"] == "File":
233             if loc in self._pathmap:
234                 return
235             if "contents" in obj and loc.startswith("_:"):
236                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
237             else:
238                 if copy:
239                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
240                 else:
241                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
242                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
243
244
245 class VwdPathMapper(StagingPathMapper):
246     def setup(self, referenced_files, basedir):
247         # type: (List[Any], unicode) -> None
248
249         # Go through each file and set the target to its own directory along
250         # with any secondary files.
251         self.visitlisting(referenced_files, self.stagedir, basedir)
252
253         for path, (ab, tgt, type, staged) in self._pathmap.items():
254             if type in ("File", "Directory") and ab.startswith("keep:"):
255                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
256
257
258 class NoFollowPathMapper(StagingPathMapper):
259     _follow_dirs = False
260     def setup(self, referenced_files, basedir):
261         # type: (List[Any], unicode) -> None
262         self.visitlisting(referenced_files, self.stagedir, basedir)