20937: fix deadlock with duplicated blocks
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import range
7
8 import fcntl
9 import functools
10 import hashlib
11 import httplib2
12 import os
13 import random
14 import re
15 import subprocess
16 import errno
17 import sys
18 import warnings
19
20 import arvados.errors
21
22 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
23 CR_UNCOMMITTED = 'Uncommitted'
24 CR_COMMITTED = 'Committed'
25 CR_FINAL = 'Final'
26
27 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
28 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
29 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
30 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
31 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
32 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
33 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
34 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
35 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
36 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
37 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
38
39 def _deprecated(version=None, preferred=None):
40     """Mark a callable as deprecated in the SDK
41
42     This will wrap the callable to emit as a DeprecationWarning
43     and add a deprecation notice to its docstring.
44
45     If the following arguments are given, they'll be included in the
46     notices:
47
48     preferred: str | None
49     : The name of an alternative that users should use instead.
50
51     version: str | None
52     : The version of Arvados when the callable is scheduled to be
53       removed.
54     """
55     if version is None:
56         version = ''
57     else:
58         version = f' and scheduled to be removed in Arvados {version}'
59     if preferred is None:
60         preferred = ''
61     else:
62         preferred = f' Prefer {preferred} instead.'
63     def deprecated_decorator(func):
64         fullname = f'{func.__module__}.{func.__qualname__}'
65         parent, _, name = fullname.rpartition('.')
66         if name == '__init__':
67             fullname = parent
68         warning_msg = f'{fullname} is deprecated{version}.{preferred}'
69         @functools.wraps(func)
70         def deprecated_wrapper(*args, **kwargs):
71             warnings.warn(warning_msg, DeprecationWarning, 2)
72             return func(*args, **kwargs)
73         # Get func's docstring without any trailing newline or empty lines.
74         func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
75         match = re.search(r'\n([ \t]+)\S', func_doc)
76         indent = '' if match is None else match.group(1)
77         warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent}   {warning_msg}'
78         # Make the deprecation notice the second "paragraph" of the
79         # docstring if possible. Otherwise append it.
80         docstring, count = re.subn(
81             rf'\n[ \t]*\n{indent}',
82             f'{warning_doc}\n\n{indent}',
83             func_doc,
84             count=1,
85         )
86         if not count:
87             docstring = f'{func_doc.lstrip()}{warning_doc}'
88         deprecated_wrapper.__doc__ = docstring
89         return deprecated_wrapper
90     return deprecated_decorator
91
92 @_deprecated('3.0')
93 def clear_tmpdir(path=None):
94     """
95     Ensure the given directory (or TASK_TMPDIR if none given)
96     exists and is empty.
97     """
98     from arvados import current_task
99     if path is None:
100         path = current_task().tmpdir
101     if os.path.exists(path):
102         p = subprocess.Popen(['rm', '-rf', path])
103         stdout, stderr = p.communicate(None)
104         if p.returncode != 0:
105             raise Exception('rm -rf %s: %s' % (path, stderr))
106     os.mkdir(path)
107
108 @_deprecated('3.0', 'subprocess.run')
109 def run_command(execargs, **kwargs):
110     kwargs.setdefault('stdin', subprocess.PIPE)
111     kwargs.setdefault('stdout', subprocess.PIPE)
112     kwargs.setdefault('stderr', sys.stderr)
113     kwargs.setdefault('close_fds', True)
114     kwargs.setdefault('shell', False)
115     p = subprocess.Popen(execargs, **kwargs)
116     stdoutdata, stderrdata = p.communicate(None)
117     if p.returncode != 0:
118         raise arvados.errors.CommandFailedError(
119             "run_command %s exit %d:\n%s" %
120             (execargs, p.returncode, stderrdata))
121     return stdoutdata, stderrdata
122
123 @_deprecated('3.0')
124 def git_checkout(url, version, path):
125     from arvados import current_job
126     if not re.search('^/', path):
127         path = os.path.join(current_job().tmpdir, path)
128     if not os.path.exists(path):
129         run_command(["git", "clone", url, path],
130                     cwd=os.path.dirname(path))
131     run_command(["git", "checkout", version],
132                 cwd=path)
133     return path
134
135 @_deprecated('3.0')
136 def tar_extractor(path, decompress_flag):
137     return subprocess.Popen(["tar",
138                              "-C", path,
139                              ("-x%sf" % decompress_flag),
140                              "-"],
141                             stdout=None,
142                             stdin=subprocess.PIPE, stderr=sys.stderr,
143                             shell=False, close_fds=True)
144
145 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
146 def tarball_extract(tarball, path):
147     """Retrieve a tarball from Keep and extract it to a local
148     directory.  Return the absolute path where the tarball was
149     extracted. If the top level of the tarball contained just one
150     file or directory, return the absolute path of that single
151     item.
152
153     tarball -- collection locator
154     path -- where to extract the tarball: absolute, or relative to job tmp
155     """
156     from arvados import current_job
157     from arvados.collection import CollectionReader
158     if not re.search('^/', path):
159         path = os.path.join(current_job().tmpdir, path)
160     lockfile = open(path + '.lock', 'w')
161     fcntl.flock(lockfile, fcntl.LOCK_EX)
162     try:
163         os.stat(path)
164     except OSError:
165         os.mkdir(path)
166     already_have_it = False
167     try:
168         if os.readlink(os.path.join(path, '.locator')) == tarball:
169             already_have_it = True
170     except OSError:
171         pass
172     if not already_have_it:
173
174         # emulate "rm -f" (i.e., if the file does not exist, we win)
175         try:
176             os.unlink(os.path.join(path, '.locator'))
177         except OSError:
178             if os.path.exists(os.path.join(path, '.locator')):
179                 os.unlink(os.path.join(path, '.locator'))
180
181         for f in CollectionReader(tarball).all_files():
182             f_name = f.name()
183             if f_name.endswith(('.tbz', '.tar.bz2')):
184                 p = tar_extractor(path, 'j')
185             elif f_name.endswith(('.tgz', '.tar.gz')):
186                 p = tar_extractor(path, 'z')
187             elif f_name.endswith('.tar'):
188                 p = tar_extractor(path, '')
189             else:
190                 raise arvados.errors.AssertionError(
191                     "tarball_extract cannot handle filename %s" % f.name())
192             while True:
193                 buf = f.read(2**20)
194                 if len(buf) == 0:
195                     break
196                 p.stdin.write(buf)
197             p.stdin.close()
198             p.wait()
199             if p.returncode != 0:
200                 lockfile.close()
201                 raise arvados.errors.CommandFailedError(
202                     "tar exited %d" % p.returncode)
203         os.symlink(tarball, os.path.join(path, '.locator'))
204     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
205     lockfile.close()
206     if len(tld_extracts) == 1:
207         return os.path.join(path, tld_extracts[0])
208     return path
209
210 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
211 def zipball_extract(zipball, path):
212     """Retrieve a zip archive from Keep and extract it to a local
213     directory.  Return the absolute path where the archive was
214     extracted. If the top level of the archive contained just one
215     file or directory, return the absolute path of that single
216     item.
217
218     zipball -- collection locator
219     path -- where to extract the archive: absolute, or relative to job tmp
220     """
221     from arvados import current_job
222     from arvados.collection import CollectionReader
223     if not re.search('^/', path):
224         path = os.path.join(current_job().tmpdir, path)
225     lockfile = open(path + '.lock', 'w')
226     fcntl.flock(lockfile, fcntl.LOCK_EX)
227     try:
228         os.stat(path)
229     except OSError:
230         os.mkdir(path)
231     already_have_it = False
232     try:
233         if os.readlink(os.path.join(path, '.locator')) == zipball:
234             already_have_it = True
235     except OSError:
236         pass
237     if not already_have_it:
238
239         # emulate "rm -f" (i.e., if the file does not exist, we win)
240         try:
241             os.unlink(os.path.join(path, '.locator'))
242         except OSError:
243             if os.path.exists(os.path.join(path, '.locator')):
244                 os.unlink(os.path.join(path, '.locator'))
245
246         for f in CollectionReader(zipball).all_files():
247             if not f.name().endswith('.zip'):
248                 raise arvados.errors.NotImplementedError(
249                     "zipball_extract cannot handle filename %s" % f.name())
250             zip_filename = os.path.join(path, os.path.basename(f.name()))
251             zip_file = open(zip_filename, 'wb')
252             while True:
253                 buf = f.read(2**20)
254                 if len(buf) == 0:
255                     break
256                 zip_file.write(buf)
257             zip_file.close()
258
259             p = subprocess.Popen(["unzip",
260                                   "-q", "-o",
261                                   "-d", path,
262                                   zip_filename],
263                                  stdout=None,
264                                  stdin=None, stderr=sys.stderr,
265                                  shell=False, close_fds=True)
266             p.wait()
267             if p.returncode != 0:
268                 lockfile.close()
269                 raise arvados.errors.CommandFailedError(
270                     "unzip exited %d" % p.returncode)
271             os.unlink(zip_filename)
272         os.symlink(zipball, os.path.join(path, '.locator'))
273     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
274     lockfile.close()
275     if len(tld_extracts) == 1:
276         return os.path.join(path, tld_extracts[0])
277     return path
278
279 @_deprecated('3.0', 'arvados.collection.Collection')
280 def collection_extract(collection, path, files=[], decompress=True):
281     """Retrieve a collection from Keep and extract it to a local
282     directory.  Return the absolute path where the collection was
283     extracted.
284
285     collection -- collection locator
286     path -- where to extract: absolute, or relative to job tmp
287     """
288     from arvados import current_job
289     from arvados.collection import CollectionReader
290     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
291     if matches:
292         collection_hash = matches.group(1)
293     else:
294         collection_hash = hashlib.md5(collection).hexdigest()
295     if not re.search('^/', path):
296         path = os.path.join(current_job().tmpdir, path)
297     lockfile = open(path + '.lock', 'w')
298     fcntl.flock(lockfile, fcntl.LOCK_EX)
299     try:
300         os.stat(path)
301     except OSError:
302         os.mkdir(path)
303     already_have_it = False
304     try:
305         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
306             already_have_it = True
307     except OSError:
308         pass
309
310     # emulate "rm -f" (i.e., if the file does not exist, we win)
311     try:
312         os.unlink(os.path.join(path, '.locator'))
313     except OSError:
314         if os.path.exists(os.path.join(path, '.locator')):
315             os.unlink(os.path.join(path, '.locator'))
316
317     files_got = []
318     for s in CollectionReader(collection).all_streams():
319         stream_name = s.name()
320         for f in s.all_files():
321             if (files == [] or
322                 ((f.name() not in files_got) and
323                  (f.name() in files or
324                   (decompress and f.decompressed_name() in files)))):
325                 outname = f.decompressed_name() if decompress else f.name()
326                 files_got += [outname]
327                 if os.path.exists(os.path.join(path, stream_name, outname)):
328                     continue
329                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
330                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
331                 for buf in (f.readall_decompressed() if decompress
332                             else f.readall()):
333                     outfile.write(buf)
334                 outfile.close()
335     if len(files_got) < len(files):
336         raise arvados.errors.AssertionError(
337             "Wanted files %s but only got %s from %s" %
338             (files, files_got,
339              [z.name() for z in CollectionReader(collection).all_files()]))
340     os.symlink(collection_hash, os.path.join(path, '.locator'))
341
342     lockfile.close()
343     return path
344
345 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
346 def mkdir_dash_p(path):
347     if not os.path.isdir(path):
348         try:
349             os.makedirs(path)
350         except OSError as e:
351             if e.errno == errno.EEXIST and os.path.isdir(path):
352                 # It is not an error if someone else creates the
353                 # directory between our exists() and makedirs() calls.
354                 pass
355             else:
356                 raise
357
358 @_deprecated('3.0', 'arvados.collection.Collection')
359 def stream_extract(stream, path, files=[], decompress=True):
360     """Retrieve a stream from Keep and extract it to a local
361     directory.  Return the absolute path where the stream was
362     extracted.
363
364     stream -- StreamReader object
365     path -- where to extract: absolute, or relative to job tmp
366     """
367     from arvados import current_job
368     if not re.search('^/', path):
369         path = os.path.join(current_job().tmpdir, path)
370     lockfile = open(path + '.lock', 'w')
371     fcntl.flock(lockfile, fcntl.LOCK_EX)
372     try:
373         os.stat(path)
374     except OSError:
375         os.mkdir(path)
376
377     files_got = []
378     for f in stream.all_files():
379         if (files == [] or
380             ((f.name() not in files_got) and
381              (f.name() in files or
382               (decompress and f.decompressed_name() in files)))):
383             outname = f.decompressed_name() if decompress else f.name()
384             files_got += [outname]
385             if os.path.exists(os.path.join(path, outname)):
386                 os.unlink(os.path.join(path, outname))
387             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
388             outfile = open(os.path.join(path, outname), 'wb')
389             for buf in (f.readall_decompressed() if decompress
390                         else f.readall()):
391                 outfile.write(buf)
392             outfile.close()
393     if len(files_got) < len(files):
394         raise arvados.errors.AssertionError(
395             "Wanted files %s but only got %s from %s" %
396             (files, files_got, [z.name() for z in stream.all_files()]))
397     lockfile.close()
398     return path
399
400 @_deprecated('3.0', 'os.walk')
401 def listdir_recursive(dirname, base=None, max_depth=None):
402     """listdir_recursive(dirname, base, max_depth)
403
404     Return a list of file and directory names found under dirname.
405
406     If base is not None, prepend "{base}/" to each returned name.
407
408     If max_depth is None, descend into directories and return only the
409     names of files found in the directory tree.
410
411     If max_depth is a non-negative integer, stop descending into
412     directories at the given depth, and at that point return directory
413     names instead.
414
415     If max_depth==0 (and base is None) this is equivalent to
416     sorted(os.listdir(dirname)).
417     """
418     allfiles = []
419     for ent in sorted(os.listdir(dirname)):
420         ent_path = os.path.join(dirname, ent)
421         ent_base = os.path.join(base, ent) if base else ent
422         if os.path.isdir(ent_path) and max_depth != 0:
423             allfiles += listdir_recursive(
424                 ent_path, base=ent_base,
425                 max_depth=(max_depth-1 if max_depth else None))
426         else:
427             allfiles += [ent_base]
428     return allfiles
429
430 def is_hex(s, *length_args):
431     """is_hex(s[, length[, max_length]]) -> boolean
432
433     Return True if s is a string of hexadecimal digits.
434     If one length argument is given, the string must contain exactly
435     that number of digits.
436     If two length arguments are given, the string must contain a number of
437     digits between those two lengths, inclusive.
438     Return False otherwise.
439     """
440     num_length_args = len(length_args)
441     if num_length_args > 2:
442         raise arvados.errors.ArgumentError(
443             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
444     elif num_length_args == 2:
445         good_len = (length_args[0] <= len(s) <= length_args[1])
446     elif num_length_args == 1:
447         good_len = (len(s) == length_args[0])
448     else:
449         good_len = True
450     return bool(good_len and HEX_RE.match(s))
451
452 @_deprecated('3.0', 'arvados.util.keyset_list_all')
453 def list_all(fn, num_retries=0, **kwargs):
454     # Default limit to (effectively) api server's MAX_LIMIT
455     kwargs.setdefault('limit', sys.maxsize)
456     items = []
457     offset = 0
458     items_available = sys.maxsize
459     while len(items) < items_available:
460         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
461         items += c['items']
462         items_available = c['items_available']
463         offset = c['offset'] + len(c['items'])
464     return items
465
466 def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
467     pagesize = 1000
468     kwargs["limit"] = pagesize
469     kwargs["count"] = 'none'
470     asc = "asc" if ascending else "desc"
471     kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
472     other_filters = kwargs.get("filters", [])
473
474     try:
475         select = set(kwargs['select'])
476     except KeyError:
477         pass
478     else:
479         select.add(order_key)
480         select.add('uuid')
481         kwargs['select'] = list(select)
482
483     nextpage = []
484     tot = 0
485     expect_full_page = True
486     seen_prevpage = set()
487     seen_thispage = set()
488     lastitem = None
489     prev_page_all_same_order_key = False
490
491     while True:
492         kwargs["filters"] = nextpage+other_filters
493         items = fn(**kwargs).execute(num_retries=num_retries)
494
495         if len(items["items"]) == 0:
496             if prev_page_all_same_order_key:
497                 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
498                 prev_page_all_same_order_key = False
499                 continue
500             else:
501                 return
502
503         seen_prevpage = seen_thispage
504         seen_thispage = set()
505
506         for i in items["items"]:
507             # In cases where there's more than one record with the
508             # same order key, the result could include records we
509             # already saw in the last page.  Skip them.
510             if i["uuid"] in seen_prevpage:
511                 continue
512             seen_thispage.add(i["uuid"])
513             yield i
514
515         firstitem = items["items"][0]
516         lastitem = items["items"][-1]
517
518         if firstitem[order_key] == lastitem[order_key]:
519             # Got a page where every item has the same order key.
520             # Switch to using uuid for paging.
521             nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
522             prev_page_all_same_order_key = True
523         else:
524             # Start from the last order key seen, but skip the last
525             # known uuid to avoid retrieving the same row twice.  If
526             # there are multiple rows with the same order key it is
527             # still likely we'll end up retrieving duplicate rows.
528             # That's handled by tracking the "seen" rows for each page
529             # so they can be skipped if they show up on the next page.
530             nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
531             prev_page_all_same_order_key = False
532
533 def ca_certs_path(fallback=httplib2.CA_CERTS):
534     """Return the path of the best available CA certs source.
535
536     This function searches for various distribution sources of CA
537     certificates, and returns the first it finds.  If it doesn't find any,
538     it returns the value of `fallback` (httplib2's CA certs by default).
539     """
540     for ca_certs_path in [
541         # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
542         # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
543         os.environ.get('SSL_CERT_FILE'),
544         # Arvados specific:
545         '/etc/arvados/ca-certificates.crt',
546         # Debian:
547         '/etc/ssl/certs/ca-certificates.crt',
548         # Red Hat:
549         '/etc/pki/tls/certs/ca-bundle.crt',
550         ]:
551         if ca_certs_path and os.path.exists(ca_certs_path):
552             return ca_certs_path
553     return fallback
554
555 def new_request_id():
556     rid = "req-"
557     # 2**104 > 36**20 > 2**103
558     n = random.getrandbits(104)
559     for _ in range(20):
560         c = n % 36
561         if c < 10:
562             rid += chr(c+ord('0'))
563         else:
564             rid += chr(c+ord('a')-10)
565         n = n // 36
566     return rid
567
568 def get_config_once(svc):
569     if not svc._rootDesc.get('resources').get('configs', False):
570         # Old API server version, no config export endpoint
571         return {}
572     if not hasattr(svc, '_cached_config'):
573         svc._cached_config = svc.configs().get().execute()
574     return svc._cached_config
575
576 def get_vocabulary_once(svc):
577     if not svc._rootDesc.get('resources').get('vocabularies', False):
578         # Old API server version, no vocabulary export endpoint
579         return {}
580     if not hasattr(svc, '_cached_vocabulary'):
581         svc._cached_vocabulary = svc.vocabularies().get().execute()
582     return svc._cached_vocabulary
583
584 def trim_name(collectionname):
585     """
586     trim_name takes a record name (collection name, project name, etc)
587     and trims it to fit the 255 character name limit, with additional
588     space for the timestamp added by ensure_unique_name, by removing
589     excess characters from the middle and inserting an ellipse
590     """
591
592     max_name_len = 254 - 28
593
594     if len(collectionname) > max_name_len:
595         over = len(collectionname) - max_name_len
596         split = int(max_name_len/2)
597         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
598
599     return collectionname