67436aee89965e61dcbb647463b43b891378229b
[arvados.git] / sdk / python / arvados / util.py
1 import fcntl
2 import hashlib
3 import os
4 import re
5 import subprocess
6 import errno
7 import sys
8 from arvados.collection import *
9
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
11
12 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
13 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
14 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
15 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
16 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
17 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
18 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
19
20 def clear_tmpdir(path=None):
21     """
22     Ensure the given directory (or TASK_TMPDIR if none given)
23     exists and is empty.
24     """
25     if path == None:
26         path = arvados.current_task().tmpdir
27     if os.path.exists(path):
28         p = subprocess.Popen(['rm', '-rf', path])
29         stdout, stderr = p.communicate(None)
30         if p.returncode != 0:
31             raise Exception('rm -rf %s: %s' % (path, stderr))
32     os.mkdir(path)
33
34 def run_command(execargs, **kwargs):
35     kwargs.setdefault('stdin', subprocess.PIPE)
36     kwargs.setdefault('stdout', subprocess.PIPE)
37     kwargs.setdefault('stderr', sys.stderr)
38     kwargs.setdefault('close_fds', True)
39     kwargs.setdefault('shell', False)
40     p = subprocess.Popen(execargs, **kwargs)
41     stdoutdata, stderrdata = p.communicate(None)
42     if p.returncode != 0:
43         raise errors.CommandFailedError(
44             "run_command %s exit %d:\n%s" %
45             (execargs, p.returncode, stderrdata))
46     return stdoutdata, stderrdata
47
48 def git_checkout(url, version, path):
49     if not re.search('^/', path):
50         path = os.path.join(arvados.current_job().tmpdir, path)
51     if not os.path.exists(path):
52         run_command(["git", "clone", url, path],
53                     cwd=os.path.dirname(path))
54     run_command(["git", "checkout", version],
55                 cwd=path)
56     return path
57
58 def tar_extractor(path, decompress_flag):
59     return subprocess.Popen(["tar",
60                              "-C", path,
61                              ("-x%sf" % decompress_flag),
62                              "-"],
63                             stdout=None,
64                             stdin=subprocess.PIPE, stderr=sys.stderr,
65                             shell=False, close_fds=True)
66
67 def tarball_extract(tarball, path):
68     """Retrieve a tarball from Keep and extract it to a local
69     directory.  Return the absolute path where the tarball was
70     extracted. If the top level of the tarball contained just one
71     file or directory, return the absolute path of that single
72     item.
73
74     tarball -- collection locator
75     path -- where to extract the tarball: absolute, or relative to job tmp
76     """
77     if not re.search('^/', path):
78         path = os.path.join(arvados.current_job().tmpdir, path)
79     lockfile = open(path + '.lock', 'w')
80     fcntl.flock(lockfile, fcntl.LOCK_EX)
81     try:
82         os.stat(path)
83     except OSError:
84         os.mkdir(path)
85     already_have_it = False
86     try:
87         if os.readlink(os.path.join(path, '.locator')) == tarball:
88             already_have_it = True
89     except OSError:
90         pass
91     if not already_have_it:
92
93         # emulate "rm -f" (i.e., if the file does not exist, we win)
94         try:
95             os.unlink(os.path.join(path, '.locator'))
96         except OSError:
97             if os.path.exists(os.path.join(path, '.locator')):
98                 os.unlink(os.path.join(path, '.locator'))
99
100         for f in CollectionReader(tarball).all_files():
101             if re.search('\.(tbz|tar.bz2)$', f.name()):
102                 p = tar_extractor(path, 'j')
103             elif re.search('\.(tgz|tar.gz)$', f.name()):
104                 p = tar_extractor(path, 'z')
105             elif re.search('\.tar$', f.name()):
106                 p = tar_extractor(path, '')
107             else:
108                 raise errors.AssertionError(
109                     "tarball_extract cannot handle filename %s" % f.name())
110             while True:
111                 buf = f.read(2**20)
112                 if len(buf) == 0:
113                     break
114                 p.stdin.write(buf)
115             p.stdin.close()
116             p.wait()
117             if p.returncode != 0:
118                 lockfile.close()
119                 raise errors.CommandFailedError(
120                     "tar exited %d" % p.returncode)
121         os.symlink(tarball, os.path.join(path, '.locator'))
122     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
123     lockfile.close()
124     if len(tld_extracts) == 1:
125         return os.path.join(path, tld_extracts[0])
126     return path
127
128 def zipball_extract(zipball, path):
129     """Retrieve a zip archive from Keep and extract it to a local
130     directory.  Return the absolute path where the archive was
131     extracted. If the top level of the archive contained just one
132     file or directory, return the absolute path of that single
133     item.
134
135     zipball -- collection locator
136     path -- where to extract the archive: absolute, or relative to job tmp
137     """
138     if not re.search('^/', path):
139         path = os.path.join(arvados.current_job().tmpdir, path)
140     lockfile = open(path + '.lock', 'w')
141     fcntl.flock(lockfile, fcntl.LOCK_EX)
142     try:
143         os.stat(path)
144     except OSError:
145         os.mkdir(path)
146     already_have_it = False
147     try:
148         if os.readlink(os.path.join(path, '.locator')) == zipball:
149             already_have_it = True
150     except OSError:
151         pass
152     if not already_have_it:
153
154         # emulate "rm -f" (i.e., if the file does not exist, we win)
155         try:
156             os.unlink(os.path.join(path, '.locator'))
157         except OSError:
158             if os.path.exists(os.path.join(path, '.locator')):
159                 os.unlink(os.path.join(path, '.locator'))
160
161         for f in CollectionReader(zipball).all_files():
162             if not re.search('\.zip$', f.name()):
163                 raise errors.NotImplementedError(
164                     "zipball_extract cannot handle filename %s" % f.name())
165             zip_filename = os.path.join(path, os.path.basename(f.name()))
166             zip_file = open(zip_filename, 'wb')
167             while True:
168                 buf = f.read(2**20)
169                 if len(buf) == 0:
170                     break
171                 zip_file.write(buf)
172             zip_file.close()
173
174             p = subprocess.Popen(["unzip",
175                                   "-q", "-o",
176                                   "-d", path,
177                                   zip_filename],
178                                  stdout=None,
179                                  stdin=None, stderr=sys.stderr,
180                                  shell=False, close_fds=True)
181             p.wait()
182             if p.returncode != 0:
183                 lockfile.close()
184                 raise errors.CommandFailedError(
185                     "unzip exited %d" % p.returncode)
186             os.unlink(zip_filename)
187         os.symlink(zipball, os.path.join(path, '.locator'))
188     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
189     lockfile.close()
190     if len(tld_extracts) == 1:
191         return os.path.join(path, tld_extracts[0])
192     return path
193
194 def collection_extract(collection, path, files=[], decompress=True):
195     """Retrieve a collection from Keep and extract it to a local
196     directory.  Return the absolute path where the collection was
197     extracted.
198
199     collection -- collection locator
200     path -- where to extract: absolute, or relative to job tmp
201     """
202     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
203     if matches:
204         collection_hash = matches.group(1)
205     else:
206         collection_hash = hashlib.md5(collection).hexdigest()
207     if not re.search('^/', path):
208         path = os.path.join(arvados.current_job().tmpdir, path)
209     lockfile = open(path + '.lock', 'w')
210     fcntl.flock(lockfile, fcntl.LOCK_EX)
211     try:
212         os.stat(path)
213     except OSError:
214         os.mkdir(path)
215     already_have_it = False
216     try:
217         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
218             already_have_it = True
219     except OSError:
220         pass
221
222     # emulate "rm -f" (i.e., if the file does not exist, we win)
223     try:
224         os.unlink(os.path.join(path, '.locator'))
225     except OSError:
226         if os.path.exists(os.path.join(path, '.locator')):
227             os.unlink(os.path.join(path, '.locator'))
228
229     files_got = []
230     for s in CollectionReader(collection).all_streams():
231         stream_name = s.name()
232         for f in s.all_files():
233             if (files == [] or
234                 ((f.name() not in files_got) and
235                  (f.name() in files or
236                   (decompress and f.decompressed_name() in files)))):
237                 outname = f.decompressed_name() if decompress else f.name()
238                 files_got += [outname]
239                 if os.path.exists(os.path.join(path, stream_name, outname)):
240                     continue
241                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
242                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
243                 for buf in (f.readall_decompressed() if decompress
244                             else f.readall()):
245                     outfile.write(buf)
246                 outfile.close()
247     if len(files_got) < len(files):
248         raise errors.AssertionError(
249             "Wanted files %s but only got %s from %s" %
250             (files, files_got,
251              [z.name() for z in CollectionReader(collection).all_files()]))
252     os.symlink(collection_hash, os.path.join(path, '.locator'))
253
254     lockfile.close()
255     return path
256
257 def mkdir_dash_p(path):
258     if not os.path.isdir(path):
259         try:
260             os.makedirs(path)
261         except OSError as e:
262             if e.errno == errno.EEXIST and os.path.isdir(path):
263                 # It is not an error if someone else creates the
264                 # directory between our exists() and makedirs() calls.
265                 pass
266             else:
267                 raise
268
269 def stream_extract(stream, path, files=[], decompress=True):
270     """Retrieve a stream from Keep and extract it to a local
271     directory.  Return the absolute path where the stream was
272     extracted.
273
274     stream -- StreamReader object
275     path -- where to extract: absolute, or relative to job tmp
276     """
277     if not re.search('^/', path):
278         path = os.path.join(arvados.current_job().tmpdir, path)
279     lockfile = open(path + '.lock', 'w')
280     fcntl.flock(lockfile, fcntl.LOCK_EX)
281     try:
282         os.stat(path)
283     except OSError:
284         os.mkdir(path)
285
286     files_got = []
287     for f in stream.all_files():
288         if (files == [] or
289             ((f.name() not in files_got) and
290              (f.name() in files or
291               (decompress and f.decompressed_name() in files)))):
292             outname = f.decompressed_name() if decompress else f.name()
293             files_got += [outname]
294             if os.path.exists(os.path.join(path, outname)):
295                 os.unlink(os.path.join(path, outname))
296             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
297             outfile = open(os.path.join(path, outname), 'wb')
298             for buf in (f.readall_decompressed() if decompress
299                         else f.readall()):
300                 outfile.write(buf)
301             outfile.close()
302     if len(files_got) < len(files):
303         raise errors.AssertionError(
304             "Wanted files %s but only got %s from %s" %
305             (files, files_got, [z.name() for z in stream.all_files()]))
306     lockfile.close()
307     return path
308
309 def listdir_recursive(dirname, base=None):
310     allfiles = []
311     for ent in sorted(os.listdir(dirname)):
312         ent_path = os.path.join(dirname, ent)
313         ent_base = os.path.join(base, ent) if base else ent
314         if os.path.isdir(ent_path):
315             allfiles += listdir_recursive(ent_path, ent_base)
316         else:
317             allfiles += [ent_base]
318     return allfiles
319
320 def is_hex(s, *length_args):
321     """is_hex(s[, length[, max_length]]) -> boolean
322
323     Return True if s is a string of hexadecimal digits.
324     If one length argument is given, the string must contain exactly
325     that number of digits.
326     If two length arguments are given, the string must contain a number of
327     digits between those two lengths, inclusive.
328     Return False otherwise.
329     """
330     num_length_args = len(length_args)
331     if num_length_args > 2:
332         raise ArgumentError("is_hex accepts up to 3 arguments ({} given)".
333                             format(1 + num_length_args))
334     elif num_length_args == 2:
335         good_len = (length_args[0] <= len(s) <= length_args[1])
336     elif num_length_args == 1:
337         good_len = (len(s) == length_args[0])
338     else:
339         good_len = True
340     return bool(good_len and HEX_RE.match(s))
341
342 def list_all(fn, num_retries=0, **kwargs):
343     items = []
344     offset = 0
345     items_available = sys.maxint
346     while len(items) < items_available:
347         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
348         items += c['items']
349         items_available = c['items_available']
350         offset = c['offset'] + len(c['items'])
351     return items