Merge branch '8784-dir-listings'
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import fcntl
6 import hashlib
7 import httplib2
8 import os
9 import re
10 import subprocess
11 import errno
12 import sys
13
14 import arvados
15 from arvados.collection import CollectionReader
16
17 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
18
19 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
20 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
21 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
22 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
23 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
24 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
25 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
26 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
27 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
28
29 def clear_tmpdir(path=None):
30     """
31     Ensure the given directory (or TASK_TMPDIR if none given)
32     exists and is empty.
33     """
34     if path is None:
35         path = arvados.current_task().tmpdir
36     if os.path.exists(path):
37         p = subprocess.Popen(['rm', '-rf', path])
38         stdout, stderr = p.communicate(None)
39         if p.returncode != 0:
40             raise Exception('rm -rf %s: %s' % (path, stderr))
41     os.mkdir(path)
42
43 def run_command(execargs, **kwargs):
44     kwargs.setdefault('stdin', subprocess.PIPE)
45     kwargs.setdefault('stdout', subprocess.PIPE)
46     kwargs.setdefault('stderr', sys.stderr)
47     kwargs.setdefault('close_fds', True)
48     kwargs.setdefault('shell', False)
49     p = subprocess.Popen(execargs, **kwargs)
50     stdoutdata, stderrdata = p.communicate(None)
51     if p.returncode != 0:
52         raise arvados.errors.CommandFailedError(
53             "run_command %s exit %d:\n%s" %
54             (execargs, p.returncode, stderrdata))
55     return stdoutdata, stderrdata
56
57 def git_checkout(url, version, path):
58     if not re.search('^/', path):
59         path = os.path.join(arvados.current_job().tmpdir, path)
60     if not os.path.exists(path):
61         run_command(["git", "clone", url, path],
62                     cwd=os.path.dirname(path))
63     run_command(["git", "checkout", version],
64                 cwd=path)
65     return path
66
67 def tar_extractor(path, decompress_flag):
68     return subprocess.Popen(["tar",
69                              "-C", path,
70                              ("-x%sf" % decompress_flag),
71                              "-"],
72                             stdout=None,
73                             stdin=subprocess.PIPE, stderr=sys.stderr,
74                             shell=False, close_fds=True)
75
76 def tarball_extract(tarball, path):
77     """Retrieve a tarball from Keep and extract it to a local
78     directory.  Return the absolute path where the tarball was
79     extracted. If the top level of the tarball contained just one
80     file or directory, return the absolute path of that single
81     item.
82
83     tarball -- collection locator
84     path -- where to extract the tarball: absolute, or relative to job tmp
85     """
86     if not re.search('^/', path):
87         path = os.path.join(arvados.current_job().tmpdir, path)
88     lockfile = open(path + '.lock', 'w')
89     fcntl.flock(lockfile, fcntl.LOCK_EX)
90     try:
91         os.stat(path)
92     except OSError:
93         os.mkdir(path)
94     already_have_it = False
95     try:
96         if os.readlink(os.path.join(path, '.locator')) == tarball:
97             already_have_it = True
98     except OSError:
99         pass
100     if not already_have_it:
101
102         # emulate "rm -f" (i.e., if the file does not exist, we win)
103         try:
104             os.unlink(os.path.join(path, '.locator'))
105         except OSError:
106             if os.path.exists(os.path.join(path, '.locator')):
107                 os.unlink(os.path.join(path, '.locator'))
108
109         for f in CollectionReader(tarball).all_files():
110             if re.search('\.(tbz|tar.bz2)$', f.name()):
111                 p = tar_extractor(path, 'j')
112             elif re.search('\.(tgz|tar.gz)$', f.name()):
113                 p = tar_extractor(path, 'z')
114             elif re.search('\.tar$', f.name()):
115                 p = tar_extractor(path, '')
116             else:
117                 raise arvados.errors.AssertionError(
118                     "tarball_extract cannot handle filename %s" % f.name())
119             while True:
120                 buf = f.read(2**20)
121                 if len(buf) == 0:
122                     break
123                 p.stdin.write(buf)
124             p.stdin.close()
125             p.wait()
126             if p.returncode != 0:
127                 lockfile.close()
128                 raise arvados.errors.CommandFailedError(
129                     "tar exited %d" % p.returncode)
130         os.symlink(tarball, os.path.join(path, '.locator'))
131     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
132     lockfile.close()
133     if len(tld_extracts) == 1:
134         return os.path.join(path, tld_extracts[0])
135     return path
136
137 def zipball_extract(zipball, path):
138     """Retrieve a zip archive from Keep and extract it to a local
139     directory.  Return the absolute path where the archive was
140     extracted. If the top level of the archive contained just one
141     file or directory, return the absolute path of that single
142     item.
143
144     zipball -- collection locator
145     path -- where to extract the archive: absolute, or relative to job tmp
146     """
147     if not re.search('^/', path):
148         path = os.path.join(arvados.current_job().tmpdir, path)
149     lockfile = open(path + '.lock', 'w')
150     fcntl.flock(lockfile, fcntl.LOCK_EX)
151     try:
152         os.stat(path)
153     except OSError:
154         os.mkdir(path)
155     already_have_it = False
156     try:
157         if os.readlink(os.path.join(path, '.locator')) == zipball:
158             already_have_it = True
159     except OSError:
160         pass
161     if not already_have_it:
162
163         # emulate "rm -f" (i.e., if the file does not exist, we win)
164         try:
165             os.unlink(os.path.join(path, '.locator'))
166         except OSError:
167             if os.path.exists(os.path.join(path, '.locator')):
168                 os.unlink(os.path.join(path, '.locator'))
169
170         for f in CollectionReader(zipball).all_files():
171             if not re.search('\.zip$', f.name()):
172                 raise arvados.errors.NotImplementedError(
173                     "zipball_extract cannot handle filename %s" % f.name())
174             zip_filename = os.path.join(path, os.path.basename(f.name()))
175             zip_file = open(zip_filename, 'wb')
176             while True:
177                 buf = f.read(2**20)
178                 if len(buf) == 0:
179                     break
180                 zip_file.write(buf)
181             zip_file.close()
182
183             p = subprocess.Popen(["unzip",
184                                   "-q", "-o",
185                                   "-d", path,
186                                   zip_filename],
187                                  stdout=None,
188                                  stdin=None, stderr=sys.stderr,
189                                  shell=False, close_fds=True)
190             p.wait()
191             if p.returncode != 0:
192                 lockfile.close()
193                 raise arvados.errors.CommandFailedError(
194                     "unzip exited %d" % p.returncode)
195             os.unlink(zip_filename)
196         os.symlink(zipball, os.path.join(path, '.locator'))
197     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
198     lockfile.close()
199     if len(tld_extracts) == 1:
200         return os.path.join(path, tld_extracts[0])
201     return path
202
203 def collection_extract(collection, path, files=[], decompress=True):
204     """Retrieve a collection from Keep and extract it to a local
205     directory.  Return the absolute path where the collection was
206     extracted.
207
208     collection -- collection locator
209     path -- where to extract: absolute, or relative to job tmp
210     """
211     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
212     if matches:
213         collection_hash = matches.group(1)
214     else:
215         collection_hash = hashlib.md5(collection).hexdigest()
216     if not re.search('^/', path):
217         path = os.path.join(arvados.current_job().tmpdir, path)
218     lockfile = open(path + '.lock', 'w')
219     fcntl.flock(lockfile, fcntl.LOCK_EX)
220     try:
221         os.stat(path)
222     except OSError:
223         os.mkdir(path)
224     already_have_it = False
225     try:
226         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
227             already_have_it = True
228     except OSError:
229         pass
230
231     # emulate "rm -f" (i.e., if the file does not exist, we win)
232     try:
233         os.unlink(os.path.join(path, '.locator'))
234     except OSError:
235         if os.path.exists(os.path.join(path, '.locator')):
236             os.unlink(os.path.join(path, '.locator'))
237
238     files_got = []
239     for s in CollectionReader(collection).all_streams():
240         stream_name = s.name()
241         for f in s.all_files():
242             if (files == [] or
243                 ((f.name() not in files_got) and
244                  (f.name() in files or
245                   (decompress and f.decompressed_name() in files)))):
246                 outname = f.decompressed_name() if decompress else f.name()
247                 files_got += [outname]
248                 if os.path.exists(os.path.join(path, stream_name, outname)):
249                     continue
250                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
251                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
252                 for buf in (f.readall_decompressed() if decompress
253                             else f.readall()):
254                     outfile.write(buf)
255                 outfile.close()
256     if len(files_got) < len(files):
257         raise arvados.errors.AssertionError(
258             "Wanted files %s but only got %s from %s" %
259             (files, files_got,
260              [z.name() for z in CollectionReader(collection).all_files()]))
261     os.symlink(collection_hash, os.path.join(path, '.locator'))
262
263     lockfile.close()
264     return path
265
266 def mkdir_dash_p(path):
267     if not os.path.isdir(path):
268         try:
269             os.makedirs(path)
270         except OSError as e:
271             if e.errno == errno.EEXIST and os.path.isdir(path):
272                 # It is not an error if someone else creates the
273                 # directory between our exists() and makedirs() calls.
274                 pass
275             else:
276                 raise
277
278 def stream_extract(stream, path, files=[], decompress=True):
279     """Retrieve a stream from Keep and extract it to a local
280     directory.  Return the absolute path where the stream was
281     extracted.
282
283     stream -- StreamReader object
284     path -- where to extract: absolute, or relative to job tmp
285     """
286     if not re.search('^/', path):
287         path = os.path.join(arvados.current_job().tmpdir, path)
288     lockfile = open(path + '.lock', 'w')
289     fcntl.flock(lockfile, fcntl.LOCK_EX)
290     try:
291         os.stat(path)
292     except OSError:
293         os.mkdir(path)
294
295     files_got = []
296     for f in stream.all_files():
297         if (files == [] or
298             ((f.name() not in files_got) and
299              (f.name() in files or
300               (decompress and f.decompressed_name() in files)))):
301             outname = f.decompressed_name() if decompress else f.name()
302             files_got += [outname]
303             if os.path.exists(os.path.join(path, outname)):
304                 os.unlink(os.path.join(path, outname))
305             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
306             outfile = open(os.path.join(path, outname), 'wb')
307             for buf in (f.readall_decompressed() if decompress
308                         else f.readall()):
309                 outfile.write(buf)
310             outfile.close()
311     if len(files_got) < len(files):
312         raise arvados.errors.AssertionError(
313             "Wanted files %s but only got %s from %s" %
314             (files, files_got, [z.name() for z in stream.all_files()]))
315     lockfile.close()
316     return path
317
318 def listdir_recursive(dirname, base=None, max_depth=None):
319     """listdir_recursive(dirname, base, max_depth)
320
321     Return a list of file and directory names found under dirname.
322
323     If base is not None, prepend "{base}/" to each returned name.
324
325     If max_depth is None, descend into directories and return only the
326     names of files found in the directory tree.
327
328     If max_depth is a non-negative integer, stop descending into
329     directories at the given depth, and at that point return directory
330     names instead.
331
332     If max_depth==0 (and base is None) this is equivalent to
333     sorted(os.listdir(dirname)).
334     """
335     allfiles = []
336     for ent in sorted(os.listdir(dirname)):
337         ent_path = os.path.join(dirname, ent)
338         ent_base = os.path.join(base, ent) if base else ent
339         if os.path.isdir(ent_path) and max_depth != 0:
340             allfiles += listdir_recursive(
341                 ent_path, base=ent_base,
342                 max_depth=(max_depth-1 if max_depth else None))
343         else:
344             allfiles += [ent_base]
345     return allfiles
346
347 def is_hex(s, *length_args):
348     """is_hex(s[, length[, max_length]]) -> boolean
349
350     Return True if s is a string of hexadecimal digits.
351     If one length argument is given, the string must contain exactly
352     that number of digits.
353     If two length arguments are given, the string must contain a number of
354     digits between those two lengths, inclusive.
355     Return False otherwise.
356     """
357     num_length_args = len(length_args)
358     if num_length_args > 2:
359         raise arvados.errors.ArgumentError(
360             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
361     elif num_length_args == 2:
362         good_len = (length_args[0] <= len(s) <= length_args[1])
363     elif num_length_args == 1:
364         good_len = (len(s) == length_args[0])
365     else:
366         good_len = True
367     return bool(good_len and HEX_RE.match(s))
368
369 def list_all(fn, num_retries=0, **kwargs):
370     # Default limit to (effectively) api server's MAX_LIMIT
371     kwargs.setdefault('limit', sys.maxsize)
372     items = []
373     offset = 0
374     items_available = sys.maxsize
375     while len(items) < items_available:
376         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
377         items += c['items']
378         items_available = c['items_available']
379         offset = c['offset'] + len(c['items'])
380     return items
381
382 def ca_certs_path(fallback=httplib2.CA_CERTS):
383     """Return the path of the best available CA certs source.
384
385     This function searches for various distribution sources of CA
386     certificates, and returns the first it finds.  If it doesn't find any,
387     it returns the value of `fallback` (httplib2's CA certs by default).
388     """
389     for ca_certs_path in [
390         # Arvados specific:
391         '/etc/arvados/ca-certificates.crt',
392         # Debian:
393         '/etc/ssl/certs/ca-certificates.crt',
394         # Red Hat:
395         '/etc/pki/tls/certs/ca-bundle.crt',
396         ]:
397         if os.path.exists(ca_certs_path):
398             return ca_certs_path
399     return fallback