Added tests and documentation for save and save_new methods in
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import range
7
8 import fcntl
9 import hashlib
10 import httplib2
11 import os
12 import random
13 import re
14 import subprocess
15 import errno
16 import sys
17
18 import arvados
19 from arvados.collection import CollectionReader
20
21 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
22
23 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
24 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
25 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
26 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
27 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
28 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
29 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
30 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
31 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
32 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
33 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
34
35 def clear_tmpdir(path=None):
36     """
37     Ensure the given directory (or TASK_TMPDIR if none given)
38     exists and is empty.
39     """
40     if path is None:
41         path = arvados.current_task().tmpdir
42     if os.path.exists(path):
43         p = subprocess.Popen(['rm', '-rf', path])
44         stdout, stderr = p.communicate(None)
45         if p.returncode != 0:
46             raise Exception('rm -rf %s: %s' % (path, stderr))
47     os.mkdir(path)
48
49 def run_command(execargs, **kwargs):
50     kwargs.setdefault('stdin', subprocess.PIPE)
51     kwargs.setdefault('stdout', subprocess.PIPE)
52     kwargs.setdefault('stderr', sys.stderr)
53     kwargs.setdefault('close_fds', True)
54     kwargs.setdefault('shell', False)
55     p = subprocess.Popen(execargs, **kwargs)
56     stdoutdata, stderrdata = p.communicate(None)
57     if p.returncode != 0:
58         raise arvados.errors.CommandFailedError(
59             "run_command %s exit %d:\n%s" %
60             (execargs, p.returncode, stderrdata))
61     return stdoutdata, stderrdata
62
63 def git_checkout(url, version, path):
64     if not re.search('^/', path):
65         path = os.path.join(arvados.current_job().tmpdir, path)
66     if not os.path.exists(path):
67         run_command(["git", "clone", url, path],
68                     cwd=os.path.dirname(path))
69     run_command(["git", "checkout", version],
70                 cwd=path)
71     return path
72
73 def tar_extractor(path, decompress_flag):
74     return subprocess.Popen(["tar",
75                              "-C", path,
76                              ("-x%sf" % decompress_flag),
77                              "-"],
78                             stdout=None,
79                             stdin=subprocess.PIPE, stderr=sys.stderr,
80                             shell=False, close_fds=True)
81
82 def tarball_extract(tarball, path):
83     """Retrieve a tarball from Keep and extract it to a local
84     directory.  Return the absolute path where the tarball was
85     extracted. If the top level of the tarball contained just one
86     file or directory, return the absolute path of that single
87     item.
88
89     tarball -- collection locator
90     path -- where to extract the tarball: absolute, or relative to job tmp
91     """
92     if not re.search('^/', path):
93         path = os.path.join(arvados.current_job().tmpdir, path)
94     lockfile = open(path + '.lock', 'w')
95     fcntl.flock(lockfile, fcntl.LOCK_EX)
96     try:
97         os.stat(path)
98     except OSError:
99         os.mkdir(path)
100     already_have_it = False
101     try:
102         if os.readlink(os.path.join(path, '.locator')) == tarball:
103             already_have_it = True
104     except OSError:
105         pass
106     if not already_have_it:
107
108         # emulate "rm -f" (i.e., if the file does not exist, we win)
109         try:
110             os.unlink(os.path.join(path, '.locator'))
111         except OSError:
112             if os.path.exists(os.path.join(path, '.locator')):
113                 os.unlink(os.path.join(path, '.locator'))
114
115         for f in CollectionReader(tarball).all_files():
116             if re.search('\.(tbz|tar.bz2)$', f.name()):
117                 p = tar_extractor(path, 'j')
118             elif re.search('\.(tgz|tar.gz)$', f.name()):
119                 p = tar_extractor(path, 'z')
120             elif re.search('\.tar$', f.name()):
121                 p = tar_extractor(path, '')
122             else:
123                 raise arvados.errors.AssertionError(
124                     "tarball_extract cannot handle filename %s" % f.name())
125             while True:
126                 buf = f.read(2**20)
127                 if len(buf) == 0:
128                     break
129                 p.stdin.write(buf)
130             p.stdin.close()
131             p.wait()
132             if p.returncode != 0:
133                 lockfile.close()
134                 raise arvados.errors.CommandFailedError(
135                     "tar exited %d" % p.returncode)
136         os.symlink(tarball, os.path.join(path, '.locator'))
137     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
138     lockfile.close()
139     if len(tld_extracts) == 1:
140         return os.path.join(path, tld_extracts[0])
141     return path
142
143 def zipball_extract(zipball, path):
144     """Retrieve a zip archive from Keep and extract it to a local
145     directory.  Return the absolute path where the archive was
146     extracted. If the top level of the archive contained just one
147     file or directory, return the absolute path of that single
148     item.
149
150     zipball -- collection locator
151     path -- where to extract the archive: absolute, or relative to job tmp
152     """
153     if not re.search('^/', path):
154         path = os.path.join(arvados.current_job().tmpdir, path)
155     lockfile = open(path + '.lock', 'w')
156     fcntl.flock(lockfile, fcntl.LOCK_EX)
157     try:
158         os.stat(path)
159     except OSError:
160         os.mkdir(path)
161     already_have_it = False
162     try:
163         if os.readlink(os.path.join(path, '.locator')) == zipball:
164             already_have_it = True
165     except OSError:
166         pass
167     if not already_have_it:
168
169         # emulate "rm -f" (i.e., if the file does not exist, we win)
170         try:
171             os.unlink(os.path.join(path, '.locator'))
172         except OSError:
173             if os.path.exists(os.path.join(path, '.locator')):
174                 os.unlink(os.path.join(path, '.locator'))
175
176         for f in CollectionReader(zipball).all_files():
177             if not re.search('\.zip$', f.name()):
178                 raise arvados.errors.NotImplementedError(
179                     "zipball_extract cannot handle filename %s" % f.name())
180             zip_filename = os.path.join(path, os.path.basename(f.name()))
181             zip_file = open(zip_filename, 'wb')
182             while True:
183                 buf = f.read(2**20)
184                 if len(buf) == 0:
185                     break
186                 zip_file.write(buf)
187             zip_file.close()
188
189             p = subprocess.Popen(["unzip",
190                                   "-q", "-o",
191                                   "-d", path,
192                                   zip_filename],
193                                  stdout=None,
194                                  stdin=None, stderr=sys.stderr,
195                                  shell=False, close_fds=True)
196             p.wait()
197             if p.returncode != 0:
198                 lockfile.close()
199                 raise arvados.errors.CommandFailedError(
200                     "unzip exited %d" % p.returncode)
201             os.unlink(zip_filename)
202         os.symlink(zipball, os.path.join(path, '.locator'))
203     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
204     lockfile.close()
205     if len(tld_extracts) == 1:
206         return os.path.join(path, tld_extracts[0])
207     return path
208
209 def collection_extract(collection, path, files=[], decompress=True):
210     """Retrieve a collection from Keep and extract it to a local
211     directory.  Return the absolute path where the collection was
212     extracted.
213
214     collection -- collection locator
215     path -- where to extract: absolute, or relative to job tmp
216     """
217     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
218     if matches:
219         collection_hash = matches.group(1)
220     else:
221         collection_hash = hashlib.md5(collection).hexdigest()
222     if not re.search('^/', path):
223         path = os.path.join(arvados.current_job().tmpdir, path)
224     lockfile = open(path + '.lock', 'w')
225     fcntl.flock(lockfile, fcntl.LOCK_EX)
226     try:
227         os.stat(path)
228     except OSError:
229         os.mkdir(path)
230     already_have_it = False
231     try:
232         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
233             already_have_it = True
234     except OSError:
235         pass
236
237     # emulate "rm -f" (i.e., if the file does not exist, we win)
238     try:
239         os.unlink(os.path.join(path, '.locator'))
240     except OSError:
241         if os.path.exists(os.path.join(path, '.locator')):
242             os.unlink(os.path.join(path, '.locator'))
243
244     files_got = []
245     for s in CollectionReader(collection).all_streams():
246         stream_name = s.name()
247         for f in s.all_files():
248             if (files == [] or
249                 ((f.name() not in files_got) and
250                  (f.name() in files or
251                   (decompress and f.decompressed_name() in files)))):
252                 outname = f.decompressed_name() if decompress else f.name()
253                 files_got += [outname]
254                 if os.path.exists(os.path.join(path, stream_name, outname)):
255                     continue
256                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
257                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
258                 for buf in (f.readall_decompressed() if decompress
259                             else f.readall()):
260                     outfile.write(buf)
261                 outfile.close()
262     if len(files_got) < len(files):
263         raise arvados.errors.AssertionError(
264             "Wanted files %s but only got %s from %s" %
265             (files, files_got,
266              [z.name() for z in CollectionReader(collection).all_files()]))
267     os.symlink(collection_hash, os.path.join(path, '.locator'))
268
269     lockfile.close()
270     return path
271
272 def mkdir_dash_p(path):
273     if not os.path.isdir(path):
274         try:
275             os.makedirs(path)
276         except OSError as e:
277             if e.errno == errno.EEXIST and os.path.isdir(path):
278                 # It is not an error if someone else creates the
279                 # directory between our exists() and makedirs() calls.
280                 pass
281             else:
282                 raise
283
284 def stream_extract(stream, path, files=[], decompress=True):
285     """Retrieve a stream from Keep and extract it to a local
286     directory.  Return the absolute path where the stream was
287     extracted.
288
289     stream -- StreamReader object
290     path -- where to extract: absolute, or relative to job tmp
291     """
292     if not re.search('^/', path):
293         path = os.path.join(arvados.current_job().tmpdir, path)
294     lockfile = open(path + '.lock', 'w')
295     fcntl.flock(lockfile, fcntl.LOCK_EX)
296     try:
297         os.stat(path)
298     except OSError:
299         os.mkdir(path)
300
301     files_got = []
302     for f in stream.all_files():
303         if (files == [] or
304             ((f.name() not in files_got) and
305              (f.name() in files or
306               (decompress and f.decompressed_name() in files)))):
307             outname = f.decompressed_name() if decompress else f.name()
308             files_got += [outname]
309             if os.path.exists(os.path.join(path, outname)):
310                 os.unlink(os.path.join(path, outname))
311             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
312             outfile = open(os.path.join(path, outname), 'wb')
313             for buf in (f.readall_decompressed() if decompress
314                         else f.readall()):
315                 outfile.write(buf)
316             outfile.close()
317     if len(files_got) < len(files):
318         raise arvados.errors.AssertionError(
319             "Wanted files %s but only got %s from %s" %
320             (files, files_got, [z.name() for z in stream.all_files()]))
321     lockfile.close()
322     return path
323
324 def listdir_recursive(dirname, base=None, max_depth=None):
325     """listdir_recursive(dirname, base, max_depth)
326
327     Return a list of file and directory names found under dirname.
328
329     If base is not None, prepend "{base}/" to each returned name.
330
331     If max_depth is None, descend into directories and return only the
332     names of files found in the directory tree.
333
334     If max_depth is a non-negative integer, stop descending into
335     directories at the given depth, and at that point return directory
336     names instead.
337
338     If max_depth==0 (and base is None) this is equivalent to
339     sorted(os.listdir(dirname)).
340     """
341     allfiles = []
342     for ent in sorted(os.listdir(dirname)):
343         ent_path = os.path.join(dirname, ent)
344         ent_base = os.path.join(base, ent) if base else ent
345         if os.path.isdir(ent_path) and max_depth != 0:
346             allfiles += listdir_recursive(
347                 ent_path, base=ent_base,
348                 max_depth=(max_depth-1 if max_depth else None))
349         else:
350             allfiles += [ent_base]
351     return allfiles
352
353 def is_hex(s, *length_args):
354     """is_hex(s[, length[, max_length]]) -> boolean
355
356     Return True if s is a string of hexadecimal digits.
357     If one length argument is given, the string must contain exactly
358     that number of digits.
359     If two length arguments are given, the string must contain a number of
360     digits between those two lengths, inclusive.
361     Return False otherwise.
362     """
363     num_length_args = len(length_args)
364     if num_length_args > 2:
365         raise arvados.errors.ArgumentError(
366             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
367     elif num_length_args == 2:
368         good_len = (length_args[0] <= len(s) <= length_args[1])
369     elif num_length_args == 1:
370         good_len = (len(s) == length_args[0])
371     else:
372         good_len = True
373     return bool(good_len and HEX_RE.match(s))
374
375 def list_all(fn, num_retries=0, **kwargs):
376     # Default limit to (effectively) api server's MAX_LIMIT
377     kwargs.setdefault('limit', sys.maxsize)
378     items = []
379     offset = 0
380     items_available = sys.maxsize
381     while len(items) < items_available:
382         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
383         items += c['items']
384         items_available = c['items_available']
385         offset = c['offset'] + len(c['items'])
386     return items
387
388 def ca_certs_path(fallback=httplib2.CA_CERTS):
389     """Return the path of the best available CA certs source.
390
391     This function searches for various distribution sources of CA
392     certificates, and returns the first it finds.  If it doesn't find any,
393     it returns the value of `fallback` (httplib2's CA certs by default).
394     """
395     for ca_certs_path in [
396         # Arvados specific:
397         '/etc/arvados/ca-certificates.crt',
398         # Debian:
399         '/etc/ssl/certs/ca-certificates.crt',
400         # Red Hat:
401         '/etc/pki/tls/certs/ca-bundle.crt',
402         ]:
403         if os.path.exists(ca_certs_path):
404             return ca_certs_path
405     return fallback
406
407 def new_request_id():
408     rid = "req-"
409     # 2**104 > 36**20 > 2**103
410     n = random.getrandbits(104)
411     for _ in range(20):
412         c = n % 36
413         if c < 10:
414             rid += chr(c+ord('0'))
415         else:
416             rid += chr(c+ord('a')-10)
417         n = n // 36
418     return rid