Merge branch '11369-cwl-crunch2-capacity' refs #11369
[arvados.git] / sdk / cwl / arvados_cwl / pathmapper.py
1 import re
2 import logging
3 import uuid
4 import os
5 import urllib
6
7 import arvados.commands.run
8 import arvados.collection
9
10 from schema_salad.sourceline import SourceLine
11
12 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
13 from cwltool.workflow import WorkflowException
14
15 logger = logging.getLogger('arvados.cwl-runner')
16
17 def trim_listing(obj):
18     """Remove 'listing' field from Directory objects that are keep references.
19
20     When Directory objects represent Keep references, it is redundant and
21     potentially very expensive to pass fully enumerated Directory objects
22     between instances of cwl-runner (e.g. a submitting a job, or using the
23     RunInSingleContainer feature), so delete the 'listing' field when it is
24     safe to do so.
25
26     """
27
28     if obj.get("location", "").startswith("keep:") and "listing" in obj:
29         del obj["listing"]
30
31
32 class ArvPathMapper(PathMapper):
33     """Convert container-local paths to and from Keep collection ids."""
34
35     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
36     pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
37
38     def __init__(self, arvrunner, referenced_files, input_basedir,
39                  collection_pattern, file_pattern, name=None, **kwargs):
40         self.arvrunner = arvrunner
41         self.input_basedir = input_basedir
42         self.collection_pattern = collection_pattern
43         self.file_pattern = file_pattern
44         self.name = name
45         self.referenced_files = [r["location"] for r in referenced_files]
46         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
47
48     def visit(self, srcobj, uploadfiles):
49         src = srcobj["location"]
50         if "#" in src:
51             src = src[:src.index("#")]
52
53         if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
54             self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
55
56         if src not in self._pathmap:
57             if src.startswith("file:"):
58                 # Local FS ref, may need to be uploaded or may be on keep
59                 # mount.
60                 ab = abspath(src, self.input_basedir)
61                 st = arvados.commands.run.statfile("", ab,
62                                                    fnPattern="keep:%s/%s",
63                                                    dirPattern="keep:%s/%s")
64                 with SourceLine(srcobj, "location", WorkflowException):
65                     if isinstance(st, arvados.commands.run.UploadFile):
66                         uploadfiles.add((src, ab, st))
67                     elif isinstance(st, arvados.commands.run.ArvFile):
68                         self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
69                     else:
70                         raise WorkflowException("Input file path '%s' is invalid" % st)
71             elif src.startswith("_:"):
72                 if srcobj["class"] == "File" and "contents" not in srcobj:
73                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
74                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
75                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
76             else:
77                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
78
79         with SourceLine(srcobj, "secondaryFiles", WorkflowException):
80             for l in srcobj.get("secondaryFiles", []):
81                 self.visit(l, uploadfiles)
82         with SourceLine(srcobj, "listing", WorkflowException):
83             for l in srcobj.get("listing", []):
84                 self.visit(l, uploadfiles)
85
86     def addentry(self, obj, c, path, subdirs):
87         if obj["location"] in self._pathmap:
88             src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
89             if srcpath == "":
90                 srcpath = "."
91             c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
92             for l in obj.get("secondaryFiles", []):
93                 self.addentry(l, c, path, subdirs)
94         elif obj["class"] == "Directory":
95             for l in obj.get("listing", []):
96                 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
97             subdirs.append((obj["location"], path + "/" + obj["basename"]))
98         elif obj["location"].startswith("_:") and "contents" in obj:
99             with c.open(path + "/" + obj["basename"], "w") as f:
100                 f.write(obj["contents"].encode("utf-8"))
101         else:
102             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
103
104     def setup(self, referenced_files, basedir):
105         # type: (List[Any], unicode) -> None
106         uploadfiles = set()
107
108         already_uploaded = self.arvrunner.get_uploaded()
109         for k in referenced_files:
110             loc = k["location"]
111             if loc in already_uploaded:
112                 v = already_uploaded[loc]
113                 self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), "File", True)
114
115         for srcobj in referenced_files:
116             self.visit(srcobj, uploadfiles)
117
118         if uploadfiles:
119             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
120                                              self.arvrunner.api,
121                                              dry_run=False,
122                                              num_retries=self.arvrunner.num_retries,
123                                              fnPattern="keep:%s/%s",
124                                              name=self.name,
125                                              project=self.arvrunner.project_uuid)
126
127         for src, ab, st in uploadfiles:
128             self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
129                                            "Directory" if os.path.isdir(ab) else "File", True)
130             self.arvrunner.add_uploaded(src, self._pathmap[src])
131
132         for srcobj in referenced_files:
133             subdirs = []
134             if srcobj["class"] == "Directory":
135                 if srcobj["location"] not in self._pathmap:
136                     c = arvados.collection.Collection(api_client=self.arvrunner.api,
137                                                       keep_client=self.arvrunner.keep_client,
138                                                       num_retries=self.arvrunner.num_retries)
139                     for l in srcobj.get("listing", []):
140                         self.addentry(l, c, ".", subdirs)
141
142                     check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
143                     if not check["items"]:
144                         c.save_new(owner_uuid=self.arvrunner.project_uuid)
145
146                     ab = self.collection_pattern % c.portable_data_hash()
147                     self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
148             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
149                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
150
151                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
152                                                   keep_client=self.arvrunner.keep_client,
153                                                   num_retries=self.arvrunner.num_retries                                                  )
154                 self.addentry(srcobj, c, ".", subdirs)
155
156                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
157                 if not check["items"]:
158                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
159
160                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
161                 self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
162                                                               ab, "File", True)
163                 if srcobj.get("secondaryFiles"):
164                     ab = self.collection_pattern % c.portable_data_hash()
165                     self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
166
167             if subdirs:
168                 for loc, sub in subdirs:
169                     # subdirs will all start with "./", strip it off
170                     ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
171                     self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
172                                                    ab, "Directory", True)
173
174         self.keepdir = None
175
176     def reversemap(self, target):
177         if target.startswith("keep:"):
178             return (target, target)
179         elif self.keepdir and target.startswith(self.keepdir):
180             return (target, "keep:" + target[len(self.keepdir)+1:])
181         else:
182             return super(ArvPathMapper, self).reversemap(target)
183
184 class StagingPathMapper(PathMapper):
185     _follow_dirs = True
186
187     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
188         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
189         loc = obj["location"]
190         tgt = os.path.join(stagedir, obj["basename"])
191         if obj["class"] == "Directory":
192             self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
193             if loc.startswith("_:") or self._follow_dirs:
194                 self.visitlisting(obj.get("listing", []), tgt, basedir)
195         elif obj["class"] == "File":
196             if loc in self._pathmap:
197                 return
198             if "contents" in obj and loc.startswith("_:"):
199                 self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
200             else:
201                 if copy:
202                     self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
203                 else:
204                     self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
205                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
206
207
208 class VwdPathMapper(StagingPathMapper):
209     def setup(self, referenced_files, basedir):
210         # type: (List[Any], unicode) -> None
211
212         # Go through each file and set the target to its own directory along
213         # with any secondary files.
214         self.visitlisting(referenced_files, self.stagedir, basedir)
215
216         for path, (ab, tgt, type, staged) in self._pathmap.items():
217             if type in ("File", "Directory") and ab.startswith("keep:"):
218                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
219
220
221 class NoFollowPathMapper(StagingPathMapper):
222     _follow_dirs = False
223     def setup(self, referenced_files, basedir):
224         # type: (List[Any], unicode) -> None
225         self.visitlisting(referenced_files, self.stagedir, basedir)