Merge branch 'master' into 3898-job-state-attr
[arvados.git] / sdk / python / arvados / util.py
1 import fcntl
2 import hashlib
3 import os
4 import re
5 import subprocess
6 import errno
7 import sys
8 from arvados.collection import *
9
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
11
12 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
13 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
14 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
15 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
16 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
17 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
18
19 def clear_tmpdir(path=None):
20     """
21     Ensure the given directory (or TASK_TMPDIR if none given)
22     exists and is empty.
23     """
24     if path == None:
25         path = arvados.current_task().tmpdir
26     if os.path.exists(path):
27         p = subprocess.Popen(['rm', '-rf', path])
28         stdout, stderr = p.communicate(None)
29         if p.returncode != 0:
30             raise Exception('rm -rf %s: %s' % (path, stderr))
31     os.mkdir(path)
32
33 def run_command(execargs, **kwargs):
34     kwargs.setdefault('stdin', subprocess.PIPE)
35     kwargs.setdefault('stdout', subprocess.PIPE)
36     kwargs.setdefault('stderr', sys.stderr)
37     kwargs.setdefault('close_fds', True)
38     kwargs.setdefault('shell', False)
39     p = subprocess.Popen(execargs, **kwargs)
40     stdoutdata, stderrdata = p.communicate(None)
41     if p.returncode != 0:
42         raise errors.CommandFailedError(
43             "run_command %s exit %d:\n%s" %
44             (execargs, p.returncode, stderrdata))
45     return stdoutdata, stderrdata
46
47 def git_checkout(url, version, path):
48     if not re.search('^/', path):
49         path = os.path.join(arvados.current_job().tmpdir, path)
50     if not os.path.exists(path):
51         run_command(["git", "clone", url, path],
52                     cwd=os.path.dirname(path))
53     run_command(["git", "checkout", version],
54                 cwd=path)
55     return path
56
57 def tar_extractor(path, decompress_flag):
58     return subprocess.Popen(["tar",
59                              "-C", path,
60                              ("-x%sf" % decompress_flag),
61                              "-"],
62                             stdout=None,
63                             stdin=subprocess.PIPE, stderr=sys.stderr,
64                             shell=False, close_fds=True)
65
66 def tarball_extract(tarball, path):
67     """Retrieve a tarball from Keep and extract it to a local
68     directory.  Return the absolute path where the tarball was
69     extracted. If the top level of the tarball contained just one
70     file or directory, return the absolute path of that single
71     item.
72
73     tarball -- collection locator
74     path -- where to extract the tarball: absolute, or relative to job tmp
75     """
76     if not re.search('^/', path):
77         path = os.path.join(arvados.current_job().tmpdir, path)
78     lockfile = open(path + '.lock', 'w')
79     fcntl.flock(lockfile, fcntl.LOCK_EX)
80     try:
81         os.stat(path)
82     except OSError:
83         os.mkdir(path)
84     already_have_it = False
85     try:
86         if os.readlink(os.path.join(path, '.locator')) == tarball:
87             already_have_it = True
88     except OSError:
89         pass
90     if not already_have_it:
91
92         # emulate "rm -f" (i.e., if the file does not exist, we win)
93         try:
94             os.unlink(os.path.join(path, '.locator'))
95         except OSError:
96             if os.path.exists(os.path.join(path, '.locator')):
97                 os.unlink(os.path.join(path, '.locator'))
98
99         for f in CollectionReader(tarball).all_files():
100             if re.search('\.(tbz|tar.bz2)$', f.name()):
101                 p = tar_extractor(path, 'j')
102             elif re.search('\.(tgz|tar.gz)$', f.name()):
103                 p = tar_extractor(path, 'z')
104             elif re.search('\.tar$', f.name()):
105                 p = tar_extractor(path, '')
106             else:
107                 raise errors.AssertionError(
108                     "tarball_extract cannot handle filename %s" % f.name())
109             while True:
110                 buf = f.read(2**20)
111                 if len(buf) == 0:
112                     break
113                 p.stdin.write(buf)
114             p.stdin.close()
115             p.wait()
116             if p.returncode != 0:
117                 lockfile.close()
118                 raise errors.CommandFailedError(
119                     "tar exited %d" % p.returncode)
120         os.symlink(tarball, os.path.join(path, '.locator'))
121     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
122     lockfile.close()
123     if len(tld_extracts) == 1:
124         return os.path.join(path, tld_extracts[0])
125     return path
126
127 def zipball_extract(zipball, path):
128     """Retrieve a zip archive from Keep and extract it to a local
129     directory.  Return the absolute path where the archive was
130     extracted. If the top level of the archive contained just one
131     file or directory, return the absolute path of that single
132     item.
133
134     zipball -- collection locator
135     path -- where to extract the archive: absolute, or relative to job tmp
136     """
137     if not re.search('^/', path):
138         path = os.path.join(arvados.current_job().tmpdir, path)
139     lockfile = open(path + '.lock', 'w')
140     fcntl.flock(lockfile, fcntl.LOCK_EX)
141     try:
142         os.stat(path)
143     except OSError:
144         os.mkdir(path)
145     already_have_it = False
146     try:
147         if os.readlink(os.path.join(path, '.locator')) == zipball:
148             already_have_it = True
149     except OSError:
150         pass
151     if not already_have_it:
152
153         # emulate "rm -f" (i.e., if the file does not exist, we win)
154         try:
155             os.unlink(os.path.join(path, '.locator'))
156         except OSError:
157             if os.path.exists(os.path.join(path, '.locator')):
158                 os.unlink(os.path.join(path, '.locator'))
159
160         for f in CollectionReader(zipball).all_files():
161             if not re.search('\.zip$', f.name()):
162                 raise errors.NotImplementedError(
163                     "zipball_extract cannot handle filename %s" % f.name())
164             zip_filename = os.path.join(path, os.path.basename(f.name()))
165             zip_file = open(zip_filename, 'wb')
166             while True:
167                 buf = f.read(2**20)
168                 if len(buf) == 0:
169                     break
170                 zip_file.write(buf)
171             zip_file.close()
172
173             p = subprocess.Popen(["unzip",
174                                   "-q", "-o",
175                                   "-d", path,
176                                   zip_filename],
177                                  stdout=None,
178                                  stdin=None, stderr=sys.stderr,
179                                  shell=False, close_fds=True)
180             p.wait()
181             if p.returncode != 0:
182                 lockfile.close()
183                 raise errors.CommandFailedError(
184                     "unzip exited %d" % p.returncode)
185             os.unlink(zip_filename)
186         os.symlink(zipball, os.path.join(path, '.locator'))
187     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
188     lockfile.close()
189     if len(tld_extracts) == 1:
190         return os.path.join(path, tld_extracts[0])
191     return path
192
193 def collection_extract(collection, path, files=[], decompress=True):
194     """Retrieve a collection from Keep and extract it to a local
195     directory.  Return the absolute path where the collection was
196     extracted.
197
198     collection -- collection locator
199     path -- where to extract: absolute, or relative to job tmp
200     """
201     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
202     if matches:
203         collection_hash = matches.group(1)
204     else:
205         collection_hash = hashlib.md5(collection).hexdigest()
206     if not re.search('^/', path):
207         path = os.path.join(arvados.current_job().tmpdir, path)
208     lockfile = open(path + '.lock', 'w')
209     fcntl.flock(lockfile, fcntl.LOCK_EX)
210     try:
211         os.stat(path)
212     except OSError:
213         os.mkdir(path)
214     already_have_it = False
215     try:
216         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
217             already_have_it = True
218     except OSError:
219         pass
220
221     # emulate "rm -f" (i.e., if the file does not exist, we win)
222     try:
223         os.unlink(os.path.join(path, '.locator'))
224     except OSError:
225         if os.path.exists(os.path.join(path, '.locator')):
226             os.unlink(os.path.join(path, '.locator'))
227
228     files_got = []
229     for s in CollectionReader(collection).all_streams():
230         stream_name = s.name()
231         for f in s.all_files():
232             if (files == [] or
233                 ((f.name() not in files_got) and
234                  (f.name() in files or
235                   (decompress and f.decompressed_name() in files)))):
236                 outname = f.decompressed_name() if decompress else f.name()
237                 files_got += [outname]
238                 if os.path.exists(os.path.join(path, stream_name, outname)):
239                     continue
240                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
241                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
242                 for buf in (f.readall_decompressed() if decompress
243                             else f.readall()):
244                     outfile.write(buf)
245                 outfile.close()
246     if len(files_got) < len(files):
247         raise errors.AssertionError(
248             "Wanted files %s but only got %s from %s" %
249             (files, files_got,
250              [z.name() for z in CollectionReader(collection).all_files()]))
251     os.symlink(collection_hash, os.path.join(path, '.locator'))
252
253     lockfile.close()
254     return path
255
256 def mkdir_dash_p(path):
257     if not os.path.isdir(path):
258         try:
259             os.makedirs(path)
260         except OSError as e:
261             if e.errno == errno.EEXIST and os.path.isdir(path):
262                 # It is not an error if someone else creates the
263                 # directory between our exists() and makedirs() calls.
264                 pass
265             else:
266                 raise
267
268 def stream_extract(stream, path, files=[], decompress=True):
269     """Retrieve a stream from Keep and extract it to a local
270     directory.  Return the absolute path where the stream was
271     extracted.
272
273     stream -- StreamReader object
274     path -- where to extract: absolute, or relative to job tmp
275     """
276     if not re.search('^/', path):
277         path = os.path.join(arvados.current_job().tmpdir, path)
278     lockfile = open(path + '.lock', 'w')
279     fcntl.flock(lockfile, fcntl.LOCK_EX)
280     try:
281         os.stat(path)
282     except OSError:
283         os.mkdir(path)
284
285     files_got = []
286     for f in stream.all_files():
287         if (files == [] or
288             ((f.name() not in files_got) and
289              (f.name() in files or
290               (decompress and f.decompressed_name() in files)))):
291             outname = f.decompressed_name() if decompress else f.name()
292             files_got += [outname]
293             if os.path.exists(os.path.join(path, outname)):
294                 os.unlink(os.path.join(path, outname))
295             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
296             outfile = open(os.path.join(path, outname), 'wb')
297             for buf in (f.readall_decompressed() if decompress
298                         else f.readall()):
299                 outfile.write(buf)
300             outfile.close()
301     if len(files_got) < len(files):
302         raise errors.AssertionError(
303             "Wanted files %s but only got %s from %s" %
304             (files, files_got, [z.name() for z in stream.all_files()]))
305     lockfile.close()
306     return path
307
308 def listdir_recursive(dirname, base=None):
309     allfiles = []
310     for ent in sorted(os.listdir(dirname)):
311         ent_path = os.path.join(dirname, ent)
312         ent_base = os.path.join(base, ent) if base else ent
313         if os.path.isdir(ent_path):
314             allfiles += listdir_recursive(ent_path, ent_base)
315         else:
316             allfiles += [ent_base]
317     return allfiles
318
319 def is_hex(s, *length_args):
320     """is_hex(s[, length[, max_length]]) -> boolean
321
322     Return True if s is a string of hexadecimal digits.
323     If one length argument is given, the string must contain exactly
324     that number of digits.
325     If two length arguments are given, the string must contain a number of
326     digits between those two lengths, inclusive.
327     Return False otherwise.
328     """
329     num_length_args = len(length_args)
330     if num_length_args > 2:
331         raise ArgumentError("is_hex accepts up to 3 arguments ({} given)".
332                             format(1 + num_length_args))
333     elif num_length_args == 2:
334         good_len = (length_args[0] <= len(s) <= length_args[1])
335     elif num_length_args == 1:
336         good_len = (len(s) == length_args[0])
337     else:
338         good_len = True
339     return bool(good_len and HEX_RE.match(s))
340
341 def list_all(fn, num_retries=0, **kwargs):
342     items = []
343     offset = 0
344     items_available = sys.maxint
345     while len(items) < items_available:
346         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
347         items += c['items']
348         items_available = c['items_available']
349         offset = c['offset'] + len(c['items'])
350     return items