Merge branch '21126-trash-when-ro'
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import range
7
8 import fcntl
9 import functools
10 import hashlib
11 import httplib2
12 import os
13 import random
14 import re
15 import subprocess
16 import errno
17 import sys
18 import warnings
19
20 import arvados.errors
21
22 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
23 CR_UNCOMMITTED = 'Uncommitted'
24 CR_COMMITTED = 'Committed'
25 CR_FINAL = 'Final'
26
27 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
28 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
29 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
30 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
31 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
32 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
33 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
34 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
35 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
36 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
37 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
38 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
39 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
40
41 def _deprecated(version=None, preferred=None):
42     """Mark a callable as deprecated in the SDK
43
44     This will wrap the callable to emit as a DeprecationWarning
45     and add a deprecation notice to its docstring.
46
47     If the following arguments are given, they'll be included in the
48     notices:
49
50     preferred: str | None
51     : The name of an alternative that users should use instead.
52
53     version: str | None
54     : The version of Arvados when the callable is scheduled to be
55       removed.
56     """
57     if version is None:
58         version = ''
59     else:
60         version = f' and scheduled to be removed in Arvados {version}'
61     if preferred is None:
62         preferred = ''
63     else:
64         preferred = f' Prefer {preferred} instead.'
65     def deprecated_decorator(func):
66         fullname = f'{func.__module__}.{func.__qualname__}'
67         parent, _, name = fullname.rpartition('.')
68         if name == '__init__':
69             fullname = parent
70         warning_msg = f'{fullname} is deprecated{version}.{preferred}'
71         @functools.wraps(func)
72         def deprecated_wrapper(*args, **kwargs):
73             warnings.warn(warning_msg, DeprecationWarning, 2)
74             return func(*args, **kwargs)
75         # Get func's docstring without any trailing newline or empty lines.
76         func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
77         match = re.search(r'\n([ \t]+)\S', func_doc)
78         indent = '' if match is None else match.group(1)
79         warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent}   {warning_msg}'
80         # Make the deprecation notice the second "paragraph" of the
81         # docstring if possible. Otherwise append it.
82         docstring, count = re.subn(
83             rf'\n[ \t]*\n{indent}',
84             f'{warning_doc}\n\n{indent}',
85             func_doc,
86             count=1,
87         )
88         if not count:
89             docstring = f'{func_doc.lstrip()}{warning_doc}'
90         deprecated_wrapper.__doc__ = docstring
91         return deprecated_wrapper
92     return deprecated_decorator
93
94 @_deprecated('3.0')
95 def clear_tmpdir(path=None):
96     """
97     Ensure the given directory (or TASK_TMPDIR if none given)
98     exists and is empty.
99     """
100     from arvados import current_task
101     if path is None:
102         path = current_task().tmpdir
103     if os.path.exists(path):
104         p = subprocess.Popen(['rm', '-rf', path])
105         stdout, stderr = p.communicate(None)
106         if p.returncode != 0:
107             raise Exception('rm -rf %s: %s' % (path, stderr))
108     os.mkdir(path)
109
110 @_deprecated('3.0', 'subprocess.run')
111 def run_command(execargs, **kwargs):
112     kwargs.setdefault('stdin', subprocess.PIPE)
113     kwargs.setdefault('stdout', subprocess.PIPE)
114     kwargs.setdefault('stderr', sys.stderr)
115     kwargs.setdefault('close_fds', True)
116     kwargs.setdefault('shell', False)
117     p = subprocess.Popen(execargs, **kwargs)
118     stdoutdata, stderrdata = p.communicate(None)
119     if p.returncode != 0:
120         raise arvados.errors.CommandFailedError(
121             "run_command %s exit %d:\n%s" %
122             (execargs, p.returncode, stderrdata))
123     return stdoutdata, stderrdata
124
125 @_deprecated('3.0')
126 def git_checkout(url, version, path):
127     from arvados import current_job
128     if not re.search('^/', path):
129         path = os.path.join(current_job().tmpdir, path)
130     if not os.path.exists(path):
131         run_command(["git", "clone", url, path],
132                     cwd=os.path.dirname(path))
133     run_command(["git", "checkout", version],
134                 cwd=path)
135     return path
136
137 @_deprecated('3.0')
138 def tar_extractor(path, decompress_flag):
139     return subprocess.Popen(["tar",
140                              "-C", path,
141                              ("-x%sf" % decompress_flag),
142                              "-"],
143                             stdout=None,
144                             stdin=subprocess.PIPE, stderr=sys.stderr,
145                             shell=False, close_fds=True)
146
147 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
148 def tarball_extract(tarball, path):
149     """Retrieve a tarball from Keep and extract it to a local
150     directory.  Return the absolute path where the tarball was
151     extracted. If the top level of the tarball contained just one
152     file or directory, return the absolute path of that single
153     item.
154
155     tarball -- collection locator
156     path -- where to extract the tarball: absolute, or relative to job tmp
157     """
158     from arvados import current_job
159     from arvados.collection import CollectionReader
160     if not re.search('^/', path):
161         path = os.path.join(current_job().tmpdir, path)
162     lockfile = open(path + '.lock', 'w')
163     fcntl.flock(lockfile, fcntl.LOCK_EX)
164     try:
165         os.stat(path)
166     except OSError:
167         os.mkdir(path)
168     already_have_it = False
169     try:
170         if os.readlink(os.path.join(path, '.locator')) == tarball:
171             already_have_it = True
172     except OSError:
173         pass
174     if not already_have_it:
175
176         # emulate "rm -f" (i.e., if the file does not exist, we win)
177         try:
178             os.unlink(os.path.join(path, '.locator'))
179         except OSError:
180             if os.path.exists(os.path.join(path, '.locator')):
181                 os.unlink(os.path.join(path, '.locator'))
182
183         for f in CollectionReader(tarball).all_files():
184             f_name = f.name()
185             if f_name.endswith(('.tbz', '.tar.bz2')):
186                 p = tar_extractor(path, 'j')
187             elif f_name.endswith(('.tgz', '.tar.gz')):
188                 p = tar_extractor(path, 'z')
189             elif f_name.endswith('.tar'):
190                 p = tar_extractor(path, '')
191             else:
192                 raise arvados.errors.AssertionError(
193                     "tarball_extract cannot handle filename %s" % f.name())
194             while True:
195                 buf = f.read(2**20)
196                 if len(buf) == 0:
197                     break
198                 p.stdin.write(buf)
199             p.stdin.close()
200             p.wait()
201             if p.returncode != 0:
202                 lockfile.close()
203                 raise arvados.errors.CommandFailedError(
204                     "tar exited %d" % p.returncode)
205         os.symlink(tarball, os.path.join(path, '.locator'))
206     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
207     lockfile.close()
208     if len(tld_extracts) == 1:
209         return os.path.join(path, tld_extracts[0])
210     return path
211
212 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
213 def zipball_extract(zipball, path):
214     """Retrieve a zip archive from Keep and extract it to a local
215     directory.  Return the absolute path where the archive was
216     extracted. If the top level of the archive contained just one
217     file or directory, return the absolute path of that single
218     item.
219
220     zipball -- collection locator
221     path -- where to extract the archive: absolute, or relative to job tmp
222     """
223     from arvados import current_job
224     from arvados.collection import CollectionReader
225     if not re.search('^/', path):
226         path = os.path.join(current_job().tmpdir, path)
227     lockfile = open(path + '.lock', 'w')
228     fcntl.flock(lockfile, fcntl.LOCK_EX)
229     try:
230         os.stat(path)
231     except OSError:
232         os.mkdir(path)
233     already_have_it = False
234     try:
235         if os.readlink(os.path.join(path, '.locator')) == zipball:
236             already_have_it = True
237     except OSError:
238         pass
239     if not already_have_it:
240
241         # emulate "rm -f" (i.e., if the file does not exist, we win)
242         try:
243             os.unlink(os.path.join(path, '.locator'))
244         except OSError:
245             if os.path.exists(os.path.join(path, '.locator')):
246                 os.unlink(os.path.join(path, '.locator'))
247
248         for f in CollectionReader(zipball).all_files():
249             if not f.name().endswith('.zip'):
250                 raise arvados.errors.NotImplementedError(
251                     "zipball_extract cannot handle filename %s" % f.name())
252             zip_filename = os.path.join(path, os.path.basename(f.name()))
253             zip_file = open(zip_filename, 'wb')
254             while True:
255                 buf = f.read(2**20)
256                 if len(buf) == 0:
257                     break
258                 zip_file.write(buf)
259             zip_file.close()
260
261             p = subprocess.Popen(["unzip",
262                                   "-q", "-o",
263                                   "-d", path,
264                                   zip_filename],
265                                  stdout=None,
266                                  stdin=None, stderr=sys.stderr,
267                                  shell=False, close_fds=True)
268             p.wait()
269             if p.returncode != 0:
270                 lockfile.close()
271                 raise arvados.errors.CommandFailedError(
272                     "unzip exited %d" % p.returncode)
273             os.unlink(zip_filename)
274         os.symlink(zipball, os.path.join(path, '.locator'))
275     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
276     lockfile.close()
277     if len(tld_extracts) == 1:
278         return os.path.join(path, tld_extracts[0])
279     return path
280
281 @_deprecated('3.0', 'arvados.collection.Collection')
282 def collection_extract(collection, path, files=[], decompress=True):
283     """Retrieve a collection from Keep and extract it to a local
284     directory.  Return the absolute path where the collection was
285     extracted.
286
287     collection -- collection locator
288     path -- where to extract: absolute, or relative to job tmp
289     """
290     from arvados import current_job
291     from arvados.collection import CollectionReader
292     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
293     if matches:
294         collection_hash = matches.group(1)
295     else:
296         collection_hash = hashlib.md5(collection).hexdigest()
297     if not re.search('^/', path):
298         path = os.path.join(current_job().tmpdir, path)
299     lockfile = open(path + '.lock', 'w')
300     fcntl.flock(lockfile, fcntl.LOCK_EX)
301     try:
302         os.stat(path)
303     except OSError:
304         os.mkdir(path)
305     already_have_it = False
306     try:
307         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
308             already_have_it = True
309     except OSError:
310         pass
311
312     # emulate "rm -f" (i.e., if the file does not exist, we win)
313     try:
314         os.unlink(os.path.join(path, '.locator'))
315     except OSError:
316         if os.path.exists(os.path.join(path, '.locator')):
317             os.unlink(os.path.join(path, '.locator'))
318
319     files_got = []
320     for s in CollectionReader(collection).all_streams():
321         stream_name = s.name()
322         for f in s.all_files():
323             if (files == [] or
324                 ((f.name() not in files_got) and
325                  (f.name() in files or
326                   (decompress and f.decompressed_name() in files)))):
327                 outname = f.decompressed_name() if decompress else f.name()
328                 files_got += [outname]
329                 if os.path.exists(os.path.join(path, stream_name, outname)):
330                     continue
331                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
332                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
333                 for buf in (f.readall_decompressed() if decompress
334                             else f.readall()):
335                     outfile.write(buf)
336                 outfile.close()
337     if len(files_got) < len(files):
338         raise arvados.errors.AssertionError(
339             "Wanted files %s but only got %s from %s" %
340             (files, files_got,
341              [z.name() for z in CollectionReader(collection).all_files()]))
342     os.symlink(collection_hash, os.path.join(path, '.locator'))
343
344     lockfile.close()
345     return path
346
347 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
348 def mkdir_dash_p(path):
349     if not os.path.isdir(path):
350         try:
351             os.makedirs(path)
352         except OSError as e:
353             if e.errno == errno.EEXIST and os.path.isdir(path):
354                 # It is not an error if someone else creates the
355                 # directory between our exists() and makedirs() calls.
356                 pass
357             else:
358                 raise
359
360 @_deprecated('3.0', 'arvados.collection.Collection')
361 def stream_extract(stream, path, files=[], decompress=True):
362     """Retrieve a stream from Keep and extract it to a local
363     directory.  Return the absolute path where the stream was
364     extracted.
365
366     stream -- StreamReader object
367     path -- where to extract: absolute, or relative to job tmp
368     """
369     from arvados import current_job
370     if not re.search('^/', path):
371         path = os.path.join(current_job().tmpdir, path)
372     lockfile = open(path + '.lock', 'w')
373     fcntl.flock(lockfile, fcntl.LOCK_EX)
374     try:
375         os.stat(path)
376     except OSError:
377         os.mkdir(path)
378
379     files_got = []
380     for f in stream.all_files():
381         if (files == [] or
382             ((f.name() not in files_got) and
383              (f.name() in files or
384               (decompress and f.decompressed_name() in files)))):
385             outname = f.decompressed_name() if decompress else f.name()
386             files_got += [outname]
387             if os.path.exists(os.path.join(path, outname)):
388                 os.unlink(os.path.join(path, outname))
389             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
390             outfile = open(os.path.join(path, outname), 'wb')
391             for buf in (f.readall_decompressed() if decompress
392                         else f.readall()):
393                 outfile.write(buf)
394             outfile.close()
395     if len(files_got) < len(files):
396         raise arvados.errors.AssertionError(
397             "Wanted files %s but only got %s from %s" %
398             (files, files_got, [z.name() for z in stream.all_files()]))
399     lockfile.close()
400     return path
401
402 @_deprecated('3.0', 'os.walk')
403 def listdir_recursive(dirname, base=None, max_depth=None):
404     """listdir_recursive(dirname, base, max_depth)
405
406     Return a list of file and directory names found under dirname.
407
408     If base is not None, prepend "{base}/" to each returned name.
409
410     If max_depth is None, descend into directories and return only the
411     names of files found in the directory tree.
412
413     If max_depth is a non-negative integer, stop descending into
414     directories at the given depth, and at that point return directory
415     names instead.
416
417     If max_depth==0 (and base is None) this is equivalent to
418     sorted(os.listdir(dirname)).
419     """
420     allfiles = []
421     for ent in sorted(os.listdir(dirname)):
422         ent_path = os.path.join(dirname, ent)
423         ent_base = os.path.join(base, ent) if base else ent
424         if os.path.isdir(ent_path) and max_depth != 0:
425             allfiles += listdir_recursive(
426                 ent_path, base=ent_base,
427                 max_depth=(max_depth-1 if max_depth else None))
428         else:
429             allfiles += [ent_base]
430     return allfiles
431
432 def is_hex(s, *length_args):
433     """is_hex(s[, length[, max_length]]) -> boolean
434
435     Return True if s is a string of hexadecimal digits.
436     If one length argument is given, the string must contain exactly
437     that number of digits.
438     If two length arguments are given, the string must contain a number of
439     digits between those two lengths, inclusive.
440     Return False otherwise.
441     """
442     num_length_args = len(length_args)
443     if num_length_args > 2:
444         raise arvados.errors.ArgumentError(
445             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
446     elif num_length_args == 2:
447         good_len = (length_args[0] <= len(s) <= length_args[1])
448     elif num_length_args == 1:
449         good_len = (len(s) == length_args[0])
450     else:
451         good_len = True
452     return bool(good_len and HEX_RE.match(s))
453
454 @_deprecated('3.0', 'arvados.util.keyset_list_all')
455 def list_all(fn, num_retries=0, **kwargs):
456     # Default limit to (effectively) api server's MAX_LIMIT
457     kwargs.setdefault('limit', sys.maxsize)
458     items = []
459     offset = 0
460     items_available = sys.maxsize
461     while len(items) < items_available:
462         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
463         items += c['items']
464         items_available = c['items_available']
465         offset = c['offset'] + len(c['items'])
466     return items
467
468 def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
469     pagesize = 1000
470     kwargs["limit"] = pagesize
471     kwargs["count"] = 'none'
472     asc = "asc" if ascending else "desc"
473     kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
474     other_filters = kwargs.get("filters", [])
475
476     try:
477         select = set(kwargs['select'])
478     except KeyError:
479         pass
480     else:
481         select.add(order_key)
482         select.add('uuid')
483         kwargs['select'] = list(select)
484
485     nextpage = []
486     tot = 0
487     expect_full_page = True
488     seen_prevpage = set()
489     seen_thispage = set()
490     lastitem = None
491     prev_page_all_same_order_key = False
492
493     while True:
494         kwargs["filters"] = nextpage+other_filters
495         items = fn(**kwargs).execute(num_retries=num_retries)
496
497         if len(items["items"]) == 0:
498             if prev_page_all_same_order_key:
499                 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
500                 prev_page_all_same_order_key = False
501                 continue
502             else:
503                 return
504
505         seen_prevpage = seen_thispage
506         seen_thispage = set()
507
508         for i in items["items"]:
509             # In cases where there's more than one record with the
510             # same order key, the result could include records we
511             # already saw in the last page.  Skip them.
512             if i["uuid"] in seen_prevpage:
513                 continue
514             seen_thispage.add(i["uuid"])
515             yield i
516
517         firstitem = items["items"][0]
518         lastitem = items["items"][-1]
519
520         if firstitem[order_key] == lastitem[order_key]:
521             # Got a page where every item has the same order key.
522             # Switch to using uuid for paging.
523             nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
524             prev_page_all_same_order_key = True
525         else:
526             # Start from the last order key seen, but skip the last
527             # known uuid to avoid retrieving the same row twice.  If
528             # there are multiple rows with the same order key it is
529             # still likely we'll end up retrieving duplicate rows.
530             # That's handled by tracking the "seen" rows for each page
531             # so they can be skipped if they show up on the next page.
532             nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
533             prev_page_all_same_order_key = False
534
535 def ca_certs_path(fallback=httplib2.CA_CERTS):
536     """Return the path of the best available CA certs source.
537
538     This function searches for various distribution sources of CA
539     certificates, and returns the first it finds.  If it doesn't find any,
540     it returns the value of `fallback` (httplib2's CA certs by default).
541     """
542     for ca_certs_path in [
543         # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
544         # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
545         os.environ.get('SSL_CERT_FILE'),
546         # Arvados specific:
547         '/etc/arvados/ca-certificates.crt',
548         # Debian:
549         '/etc/ssl/certs/ca-certificates.crt',
550         # Red Hat:
551         '/etc/pki/tls/certs/ca-bundle.crt',
552         ]:
553         if ca_certs_path and os.path.exists(ca_certs_path):
554             return ca_certs_path
555     return fallback
556
557 def new_request_id():
558     rid = "req-"
559     # 2**104 > 36**20 > 2**103
560     n = random.getrandbits(104)
561     for _ in range(20):
562         c = n % 36
563         if c < 10:
564             rid += chr(c+ord('0'))
565         else:
566             rid += chr(c+ord('a')-10)
567         n = n // 36
568     return rid
569
570 def get_config_once(svc):
571     if not svc._rootDesc.get('resources').get('configs', False):
572         # Old API server version, no config export endpoint
573         return {}
574     if not hasattr(svc, '_cached_config'):
575         svc._cached_config = svc.configs().get().execute()
576     return svc._cached_config
577
578 def get_vocabulary_once(svc):
579     if not svc._rootDesc.get('resources').get('vocabularies', False):
580         # Old API server version, no vocabulary export endpoint
581         return {}
582     if not hasattr(svc, '_cached_vocabulary'):
583         svc._cached_vocabulary = svc.vocabularies().get().execute()
584     return svc._cached_vocabulary
585
586 def trim_name(collectionname):
587     """
588     trim_name takes a record name (collection name, project name, etc)
589     and trims it to fit the 255 character name limit, with additional
590     space for the timestamp added by ensure_unique_name, by removing
591     excess characters from the middle and inserting an ellipse
592     """
593
594     max_name_len = 254 - 28
595
596     if len(collectionname) > max_name_len:
597         over = len(collectionname) - max_name_len
598         split = int(max_name_len/2)
599         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
600
601     return collectionname