6194: Fix typo in invocation of writeto() and use memoryview to avoid copying slices.
[arvados.git] / sdk / python / arvados / util.py
1 import fcntl
2 import hashlib
3 import os
4 import re
5 import subprocess
6 import errno
7 import sys
8
9 import arvados
10 from arvados.collection import CollectionReader
11
12 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
13
14 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
15 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
16 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
17 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
18 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
19 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
20 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
21 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
22 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
23
24 def clear_tmpdir(path=None):
25     """
26     Ensure the given directory (or TASK_TMPDIR if none given)
27     exists and is empty.
28     """
29     if path is None:
30         path = arvados.current_task().tmpdir
31     if os.path.exists(path):
32         p = subprocess.Popen(['rm', '-rf', path])
33         stdout, stderr = p.communicate(None)
34         if p.returncode != 0:
35             raise Exception('rm -rf %s: %s' % (path, stderr))
36     os.mkdir(path)
37
38 def run_command(execargs, **kwargs):
39     kwargs.setdefault('stdin', subprocess.PIPE)
40     kwargs.setdefault('stdout', subprocess.PIPE)
41     kwargs.setdefault('stderr', sys.stderr)
42     kwargs.setdefault('close_fds', True)
43     kwargs.setdefault('shell', False)
44     p = subprocess.Popen(execargs, **kwargs)
45     stdoutdata, stderrdata = p.communicate(None)
46     if p.returncode != 0:
47         raise errors.CommandFailedError(
48             "run_command %s exit %d:\n%s" %
49             (execargs, p.returncode, stderrdata))
50     return stdoutdata, stderrdata
51
52 def git_checkout(url, version, path):
53     if not re.search('^/', path):
54         path = os.path.join(arvados.current_job().tmpdir, path)
55     if not os.path.exists(path):
56         run_command(["git", "clone", url, path],
57                     cwd=os.path.dirname(path))
58     run_command(["git", "checkout", version],
59                 cwd=path)
60     return path
61
62 def tar_extractor(path, decompress_flag):
63     return subprocess.Popen(["tar",
64                              "-C", path,
65                              ("-x%sf" % decompress_flag),
66                              "-"],
67                             stdout=None,
68                             stdin=subprocess.PIPE, stderr=sys.stderr,
69                             shell=False, close_fds=True)
70
71 def tarball_extract(tarball, path):
72     """Retrieve a tarball from Keep and extract it to a local
73     directory.  Return the absolute path where the tarball was
74     extracted. If the top level of the tarball contained just one
75     file or directory, return the absolute path of that single
76     item.
77
78     tarball -- collection locator
79     path -- where to extract the tarball: absolute, or relative to job tmp
80     """
81     if not re.search('^/', path):
82         path = os.path.join(arvados.current_job().tmpdir, path)
83     lockfile = open(path + '.lock', 'w')
84     fcntl.flock(lockfile, fcntl.LOCK_EX)
85     try:
86         os.stat(path)
87     except OSError:
88         os.mkdir(path)
89     already_have_it = False
90     try:
91         if os.readlink(os.path.join(path, '.locator')) == tarball:
92             already_have_it = True
93     except OSError:
94         pass
95     if not already_have_it:
96
97         # emulate "rm -f" (i.e., if the file does not exist, we win)
98         try:
99             os.unlink(os.path.join(path, '.locator'))
100         except OSError:
101             if os.path.exists(os.path.join(path, '.locator')):
102                 os.unlink(os.path.join(path, '.locator'))
103
104         for f in CollectionReader(tarball).all_files():
105             if re.search('\.(tbz|tar.bz2)$', f.name()):
106                 p = tar_extractor(path, 'j')
107             elif re.search('\.(tgz|tar.gz)$', f.name()):
108                 p = tar_extractor(path, 'z')
109             elif re.search('\.tar$', f.name()):
110                 p = tar_extractor(path, '')
111             else:
112                 raise errors.AssertionError(
113                     "tarball_extract cannot handle filename %s" % f.name())
114             while True:
115                 buf = f.read(2**20)
116                 if len(buf) == 0:
117                     break
118                 p.stdin.write(buf)
119             p.stdin.close()
120             p.wait()
121             if p.returncode != 0:
122                 lockfile.close()
123                 raise errors.CommandFailedError(
124                     "tar exited %d" % p.returncode)
125         os.symlink(tarball, os.path.join(path, '.locator'))
126     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
127     lockfile.close()
128     if len(tld_extracts) == 1:
129         return os.path.join(path, tld_extracts[0])
130     return path
131
132 def zipball_extract(zipball, path):
133     """Retrieve a zip archive from Keep and extract it to a local
134     directory.  Return the absolute path where the archive was
135     extracted. If the top level of the archive contained just one
136     file or directory, return the absolute path of that single
137     item.
138
139     zipball -- collection locator
140     path -- where to extract the archive: absolute, or relative to job tmp
141     """
142     if not re.search('^/', path):
143         path = os.path.join(arvados.current_job().tmpdir, path)
144     lockfile = open(path + '.lock', 'w')
145     fcntl.flock(lockfile, fcntl.LOCK_EX)
146     try:
147         os.stat(path)
148     except OSError:
149         os.mkdir(path)
150     already_have_it = False
151     try:
152         if os.readlink(os.path.join(path, '.locator')) == zipball:
153             already_have_it = True
154     except OSError:
155         pass
156     if not already_have_it:
157
158         # emulate "rm -f" (i.e., if the file does not exist, we win)
159         try:
160             os.unlink(os.path.join(path, '.locator'))
161         except OSError:
162             if os.path.exists(os.path.join(path, '.locator')):
163                 os.unlink(os.path.join(path, '.locator'))
164
165         for f in CollectionReader(zipball).all_files():
166             if not re.search('\.zip$', f.name()):
167                 raise errors.NotImplementedError(
168                     "zipball_extract cannot handle filename %s" % f.name())
169             zip_filename = os.path.join(path, os.path.basename(f.name()))
170             zip_file = open(zip_filename, 'wb')
171             while True:
172                 buf = f.read(2**20)
173                 if len(buf) == 0:
174                     break
175                 zip_file.write(buf)
176             zip_file.close()
177
178             p = subprocess.Popen(["unzip",
179                                   "-q", "-o",
180                                   "-d", path,
181                                   zip_filename],
182                                  stdout=None,
183                                  stdin=None, stderr=sys.stderr,
184                                  shell=False, close_fds=True)
185             p.wait()
186             if p.returncode != 0:
187                 lockfile.close()
188                 raise errors.CommandFailedError(
189                     "unzip exited %d" % p.returncode)
190             os.unlink(zip_filename)
191         os.symlink(zipball, os.path.join(path, '.locator'))
192     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
193     lockfile.close()
194     if len(tld_extracts) == 1:
195         return os.path.join(path, tld_extracts[0])
196     return path
197
198 def collection_extract(collection, path, files=[], decompress=True):
199     """Retrieve a collection from Keep and extract it to a local
200     directory.  Return the absolute path where the collection was
201     extracted.
202
203     collection -- collection locator
204     path -- where to extract: absolute, or relative to job tmp
205     """
206     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
207     if matches:
208         collection_hash = matches.group(1)
209     else:
210         collection_hash = hashlib.md5(collection).hexdigest()
211     if not re.search('^/', path):
212         path = os.path.join(arvados.current_job().tmpdir, path)
213     lockfile = open(path + '.lock', 'w')
214     fcntl.flock(lockfile, fcntl.LOCK_EX)
215     try:
216         os.stat(path)
217     except OSError:
218         os.mkdir(path)
219     already_have_it = False
220     try:
221         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
222             already_have_it = True
223     except OSError:
224         pass
225
226     # emulate "rm -f" (i.e., if the file does not exist, we win)
227     try:
228         os.unlink(os.path.join(path, '.locator'))
229     except OSError:
230         if os.path.exists(os.path.join(path, '.locator')):
231             os.unlink(os.path.join(path, '.locator'))
232
233     files_got = []
234     for s in CollectionReader(collection).all_streams():
235         stream_name = s.name()
236         for f in s.all_files():
237             if (files == [] or
238                 ((f.name() not in files_got) and
239                  (f.name() in files or
240                   (decompress and f.decompressed_name() in files)))):
241                 outname = f.decompressed_name() if decompress else f.name()
242                 files_got += [outname]
243                 if os.path.exists(os.path.join(path, stream_name, outname)):
244                     continue
245                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
246                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
247                 for buf in (f.readall_decompressed() if decompress
248                             else f.readall()):
249                     outfile.write(buf)
250                 outfile.close()
251     if len(files_got) < len(files):
252         raise errors.AssertionError(
253             "Wanted files %s but only got %s from %s" %
254             (files, files_got,
255              [z.name() for z in CollectionReader(collection).all_files()]))
256     os.symlink(collection_hash, os.path.join(path, '.locator'))
257
258     lockfile.close()
259     return path
260
261 def mkdir_dash_p(path):
262     if not os.path.isdir(path):
263         try:
264             os.makedirs(path)
265         except OSError as e:
266             if e.errno == errno.EEXIST and os.path.isdir(path):
267                 # It is not an error if someone else creates the
268                 # directory between our exists() and makedirs() calls.
269                 pass
270             else:
271                 raise
272
273 def stream_extract(stream, path, files=[], decompress=True):
274     """Retrieve a stream from Keep and extract it to a local
275     directory.  Return the absolute path where the stream was
276     extracted.
277
278     stream -- StreamReader object
279     path -- where to extract: absolute, or relative to job tmp
280     """
281     if not re.search('^/', path):
282         path = os.path.join(arvados.current_job().tmpdir, path)
283     lockfile = open(path + '.lock', 'w')
284     fcntl.flock(lockfile, fcntl.LOCK_EX)
285     try:
286         os.stat(path)
287     except OSError:
288         os.mkdir(path)
289
290     files_got = []
291     for f in stream.all_files():
292         if (files == [] or
293             ((f.name() not in files_got) and
294              (f.name() in files or
295               (decompress and f.decompressed_name() in files)))):
296             outname = f.decompressed_name() if decompress else f.name()
297             files_got += [outname]
298             if os.path.exists(os.path.join(path, outname)):
299                 os.unlink(os.path.join(path, outname))
300             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
301             outfile = open(os.path.join(path, outname), 'wb')
302             for buf in (f.readall_decompressed() if decompress
303                         else f.readall()):
304                 outfile.write(buf)
305             outfile.close()
306     if len(files_got) < len(files):
307         raise errors.AssertionError(
308             "Wanted files %s but only got %s from %s" %
309             (files, files_got, [z.name() for z in stream.all_files()]))
310     lockfile.close()
311     return path
312
313 def listdir_recursive(dirname, base=None, max_depth=None):
314     """listdir_recursive(dirname, base, max_depth)
315
316     Return a list of file and directory names found under dirname.
317
318     If base is not None, prepend "{base}/" to each returned name.
319
320     If max_depth is None, descend into directories and return only the
321     names of files found in the directory tree.
322
323     If max_depth is a non-negative integer, stop descending into
324     directories at the given depth, and at that point return directory
325     names instead.
326
327     If max_depth==0 (and base is None) this is equivalent to
328     sorted(os.listdir(dirname)).
329     """
330     allfiles = []
331     for ent in sorted(os.listdir(dirname)):
332         ent_path = os.path.join(dirname, ent)
333         ent_base = os.path.join(base, ent) if base else ent
334         if os.path.isdir(ent_path) and max_depth != 0:
335             allfiles += listdir_recursive(
336                 ent_path, base=ent_base,
337                 max_depth=(max_depth-1 if max_depth else None))
338         else:
339             allfiles += [ent_base]
340     return allfiles
341
342 def is_hex(s, *length_args):
343     """is_hex(s[, length[, max_length]]) -> boolean
344
345     Return True if s is a string of hexadecimal digits.
346     If one length argument is given, the string must contain exactly
347     that number of digits.
348     If two length arguments are given, the string must contain a number of
349     digits between those two lengths, inclusive.
350     Return False otherwise.
351     """
352     num_length_args = len(length_args)
353     if num_length_args > 2:
354         raise errors.ArgumentError("is_hex accepts up to 3 arguments ({} given)"
355                                    .format(1 + num_length_args))
356     elif num_length_args == 2:
357         good_len = (length_args[0] <= len(s) <= length_args[1])
358     elif num_length_args == 1:
359         good_len = (len(s) == length_args[0])
360     else:
361         good_len = True
362     return bool(good_len and HEX_RE.match(s))
363
364 def list_all(fn, num_retries=0, **kwargs):
365     items = []
366     offset = 0
367     items_available = sys.maxint
368     while len(items) < items_available:
369         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
370         items += c['items']
371         items_available = c['items_available']
372         offset = c['offset'] + len(c['items'])
373     return items