20612: Fix up punctuation/capitalization in docs.
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import range
7
8 import fcntl
9 import hashlib
10 import httplib2
11 import os
12 import random
13 import re
14 import subprocess
15 import errno
16 import sys
17
18 import arvados
19 from arvados.collection import CollectionReader
20
21 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
22 CR_UNCOMMITTED = 'Uncommitted'
23 CR_COMMITTED = 'Committed'
24 CR_FINAL = 'Final'
25
26 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
27 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
28 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
29 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
30 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
31 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
32 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
33 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
34 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
35 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
36 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
37
38 def clear_tmpdir(path=None):
39     """
40     Ensure the given directory (or TASK_TMPDIR if none given)
41     exists and is empty.
42     """
43     if path is None:
44         path = arvados.current_task().tmpdir
45     if os.path.exists(path):
46         p = subprocess.Popen(['rm', '-rf', path])
47         stdout, stderr = p.communicate(None)
48         if p.returncode != 0:
49             raise Exception('rm -rf %s: %s' % (path, stderr))
50     os.mkdir(path)
51
52 def run_command(execargs, **kwargs):
53     kwargs.setdefault('stdin', subprocess.PIPE)
54     kwargs.setdefault('stdout', subprocess.PIPE)
55     kwargs.setdefault('stderr', sys.stderr)
56     kwargs.setdefault('close_fds', True)
57     kwargs.setdefault('shell', False)
58     p = subprocess.Popen(execargs, **kwargs)
59     stdoutdata, stderrdata = p.communicate(None)
60     if p.returncode != 0:
61         raise arvados.errors.CommandFailedError(
62             "run_command %s exit %d:\n%s" %
63             (execargs, p.returncode, stderrdata))
64     return stdoutdata, stderrdata
65
66 def git_checkout(url, version, path):
67     if not re.search('^/', path):
68         path = os.path.join(arvados.current_job().tmpdir, path)
69     if not os.path.exists(path):
70         run_command(["git", "clone", url, path],
71                     cwd=os.path.dirname(path))
72     run_command(["git", "checkout", version],
73                 cwd=path)
74     return path
75
76 def tar_extractor(path, decompress_flag):
77     return subprocess.Popen(["tar",
78                              "-C", path,
79                              ("-x%sf" % decompress_flag),
80                              "-"],
81                             stdout=None,
82                             stdin=subprocess.PIPE, stderr=sys.stderr,
83                             shell=False, close_fds=True)
84
85 def tarball_extract(tarball, path):
86     """Retrieve a tarball from Keep and extract it to a local
87     directory.  Return the absolute path where the tarball was
88     extracted. If the top level of the tarball contained just one
89     file or directory, return the absolute path of that single
90     item.
91
92     tarball -- collection locator
93     path -- where to extract the tarball: absolute, or relative to job tmp
94     """
95     if not re.search('^/', path):
96         path = os.path.join(arvados.current_job().tmpdir, path)
97     lockfile = open(path + '.lock', 'w')
98     fcntl.flock(lockfile, fcntl.LOCK_EX)
99     try:
100         os.stat(path)
101     except OSError:
102         os.mkdir(path)
103     already_have_it = False
104     try:
105         if os.readlink(os.path.join(path, '.locator')) == tarball:
106             already_have_it = True
107     except OSError:
108         pass
109     if not already_have_it:
110
111         # emulate "rm -f" (i.e., if the file does not exist, we win)
112         try:
113             os.unlink(os.path.join(path, '.locator'))
114         except OSError:
115             if os.path.exists(os.path.join(path, '.locator')):
116                 os.unlink(os.path.join(path, '.locator'))
117
118         for f in CollectionReader(tarball).all_files():
119             f_name = f.name()
120             if f_name.endswith(('.tbz', '.tar.bz2')):
121                 p = tar_extractor(path, 'j')
122             elif f_name.endswith(('.tgz', '.tar.gz')):
123                 p = tar_extractor(path, 'z')
124             elif f_name.endswith('.tar'):
125                 p = tar_extractor(path, '')
126             else:
127                 raise arvados.errors.AssertionError(
128                     "tarball_extract cannot handle filename %s" % f.name())
129             while True:
130                 buf = f.read(2**20)
131                 if len(buf) == 0:
132                     break
133                 p.stdin.write(buf)
134             p.stdin.close()
135             p.wait()
136             if p.returncode != 0:
137                 lockfile.close()
138                 raise arvados.errors.CommandFailedError(
139                     "tar exited %d" % p.returncode)
140         os.symlink(tarball, os.path.join(path, '.locator'))
141     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
142     lockfile.close()
143     if len(tld_extracts) == 1:
144         return os.path.join(path, tld_extracts[0])
145     return path
146
147 def zipball_extract(zipball, path):
148     """Retrieve a zip archive from Keep and extract it to a local
149     directory.  Return the absolute path where the archive was
150     extracted. If the top level of the archive contained just one
151     file or directory, return the absolute path of that single
152     item.
153
154     zipball -- collection locator
155     path -- where to extract the archive: absolute, or relative to job tmp
156     """
157     if not re.search('^/', path):
158         path = os.path.join(arvados.current_job().tmpdir, path)
159     lockfile = open(path + '.lock', 'w')
160     fcntl.flock(lockfile, fcntl.LOCK_EX)
161     try:
162         os.stat(path)
163     except OSError:
164         os.mkdir(path)
165     already_have_it = False
166     try:
167         if os.readlink(os.path.join(path, '.locator')) == zipball:
168             already_have_it = True
169     except OSError:
170         pass
171     if not already_have_it:
172
173         # emulate "rm -f" (i.e., if the file does not exist, we win)
174         try:
175             os.unlink(os.path.join(path, '.locator'))
176         except OSError:
177             if os.path.exists(os.path.join(path, '.locator')):
178                 os.unlink(os.path.join(path, '.locator'))
179
180         for f in CollectionReader(zipball).all_files():
181             if not f.name().endswith('.zip'):
182                 raise arvados.errors.NotImplementedError(
183                     "zipball_extract cannot handle filename %s" % f.name())
184             zip_filename = os.path.join(path, os.path.basename(f.name()))
185             zip_file = open(zip_filename, 'wb')
186             while True:
187                 buf = f.read(2**20)
188                 if len(buf) == 0:
189                     break
190                 zip_file.write(buf)
191             zip_file.close()
192
193             p = subprocess.Popen(["unzip",
194                                   "-q", "-o",
195                                   "-d", path,
196                                   zip_filename],
197                                  stdout=None,
198                                  stdin=None, stderr=sys.stderr,
199                                  shell=False, close_fds=True)
200             p.wait()
201             if p.returncode != 0:
202                 lockfile.close()
203                 raise arvados.errors.CommandFailedError(
204                     "unzip exited %d" % p.returncode)
205             os.unlink(zip_filename)
206         os.symlink(zipball, os.path.join(path, '.locator'))
207     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
208     lockfile.close()
209     if len(tld_extracts) == 1:
210         return os.path.join(path, tld_extracts[0])
211     return path
212
213 def collection_extract(collection, path, files=[], decompress=True):
214     """Retrieve a collection from Keep and extract it to a local
215     directory.  Return the absolute path where the collection was
216     extracted.
217
218     collection -- collection locator
219     path -- where to extract: absolute, or relative to job tmp
220     """
221     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
222     if matches:
223         collection_hash = matches.group(1)
224     else:
225         collection_hash = hashlib.md5(collection).hexdigest()
226     if not re.search('^/', path):
227         path = os.path.join(arvados.current_job().tmpdir, path)
228     lockfile = open(path + '.lock', 'w')
229     fcntl.flock(lockfile, fcntl.LOCK_EX)
230     try:
231         os.stat(path)
232     except OSError:
233         os.mkdir(path)
234     already_have_it = False
235     try:
236         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
237             already_have_it = True
238     except OSError:
239         pass
240
241     # emulate "rm -f" (i.e., if the file does not exist, we win)
242     try:
243         os.unlink(os.path.join(path, '.locator'))
244     except OSError:
245         if os.path.exists(os.path.join(path, '.locator')):
246             os.unlink(os.path.join(path, '.locator'))
247
248     files_got = []
249     for s in CollectionReader(collection).all_streams():
250         stream_name = s.name()
251         for f in s.all_files():
252             if (files == [] or
253                 ((f.name() not in files_got) and
254                  (f.name() in files or
255                   (decompress and f.decompressed_name() in files)))):
256                 outname = f.decompressed_name() if decompress else f.name()
257                 files_got += [outname]
258                 if os.path.exists(os.path.join(path, stream_name, outname)):
259                     continue
260                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
261                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
262                 for buf in (f.readall_decompressed() if decompress
263                             else f.readall()):
264                     outfile.write(buf)
265                 outfile.close()
266     if len(files_got) < len(files):
267         raise arvados.errors.AssertionError(
268             "Wanted files %s but only got %s from %s" %
269             (files, files_got,
270              [z.name() for z in CollectionReader(collection).all_files()]))
271     os.symlink(collection_hash, os.path.join(path, '.locator'))
272
273     lockfile.close()
274     return path
275
276 def mkdir_dash_p(path):
277     if not os.path.isdir(path):
278         try:
279             os.makedirs(path)
280         except OSError as e:
281             if e.errno == errno.EEXIST and os.path.isdir(path):
282                 # It is not an error if someone else creates the
283                 # directory between our exists() and makedirs() calls.
284                 pass
285             else:
286                 raise
287
288 def stream_extract(stream, path, files=[], decompress=True):
289     """Retrieve a stream from Keep and extract it to a local
290     directory.  Return the absolute path where the stream was
291     extracted.
292
293     stream -- StreamReader object
294     path -- where to extract: absolute, or relative to job tmp
295     """
296     if not re.search('^/', path):
297         path = os.path.join(arvados.current_job().tmpdir, path)
298     lockfile = open(path + '.lock', 'w')
299     fcntl.flock(lockfile, fcntl.LOCK_EX)
300     try:
301         os.stat(path)
302     except OSError:
303         os.mkdir(path)
304
305     files_got = []
306     for f in stream.all_files():
307         if (files == [] or
308             ((f.name() not in files_got) and
309              (f.name() in files or
310               (decompress and f.decompressed_name() in files)))):
311             outname = f.decompressed_name() if decompress else f.name()
312             files_got += [outname]
313             if os.path.exists(os.path.join(path, outname)):
314                 os.unlink(os.path.join(path, outname))
315             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
316             outfile = open(os.path.join(path, outname), 'wb')
317             for buf in (f.readall_decompressed() if decompress
318                         else f.readall()):
319                 outfile.write(buf)
320             outfile.close()
321     if len(files_got) < len(files):
322         raise arvados.errors.AssertionError(
323             "Wanted files %s but only got %s from %s" %
324             (files, files_got, [z.name() for z in stream.all_files()]))
325     lockfile.close()
326     return path
327
328 def listdir_recursive(dirname, base=None, max_depth=None):
329     """listdir_recursive(dirname, base, max_depth)
330
331     Return a list of file and directory names found under dirname.
332
333     If base is not None, prepend "{base}/" to each returned name.
334
335     If max_depth is None, descend into directories and return only the
336     names of files found in the directory tree.
337
338     If max_depth is a non-negative integer, stop descending into
339     directories at the given depth, and at that point return directory
340     names instead.
341
342     If max_depth==0 (and base is None) this is equivalent to
343     sorted(os.listdir(dirname)).
344     """
345     allfiles = []
346     for ent in sorted(os.listdir(dirname)):
347         ent_path = os.path.join(dirname, ent)
348         ent_base = os.path.join(base, ent) if base else ent
349         if os.path.isdir(ent_path) and max_depth != 0:
350             allfiles += listdir_recursive(
351                 ent_path, base=ent_base,
352                 max_depth=(max_depth-1 if max_depth else None))
353         else:
354             allfiles += [ent_base]
355     return allfiles
356
357 def is_hex(s, *length_args):
358     """is_hex(s[, length[, max_length]]) -> boolean
359
360     Return True if s is a string of hexadecimal digits.
361     If one length argument is given, the string must contain exactly
362     that number of digits.
363     If two length arguments are given, the string must contain a number of
364     digits between those two lengths, inclusive.
365     Return False otherwise.
366     """
367     num_length_args = len(length_args)
368     if num_length_args > 2:
369         raise arvados.errors.ArgumentError(
370             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
371     elif num_length_args == 2:
372         good_len = (length_args[0] <= len(s) <= length_args[1])
373     elif num_length_args == 1:
374         good_len = (len(s) == length_args[0])
375     else:
376         good_len = True
377     return bool(good_len and HEX_RE.match(s))
378
379 def list_all(fn, num_retries=0, **kwargs):
380     # Default limit to (effectively) api server's MAX_LIMIT
381     kwargs.setdefault('limit', sys.maxsize)
382     items = []
383     offset = 0
384     items_available = sys.maxsize
385     while len(items) < items_available:
386         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
387         items += c['items']
388         items_available = c['items_available']
389         offset = c['offset'] + len(c['items'])
390     return items
391
392 def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
393     pagesize = 1000
394     kwargs["limit"] = pagesize
395     kwargs["count"] = 'none'
396     asc = "asc" if ascending else "desc"
397     kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
398     other_filters = kwargs.get("filters", [])
399
400     if "select" in kwargs and "uuid" not in kwargs["select"]:
401         kwargs["select"].append("uuid")
402
403     nextpage = []
404     tot = 0
405     expect_full_page = True
406     seen_prevpage = set()
407     seen_thispage = set()
408     lastitem = None
409     prev_page_all_same_order_key = False
410
411     while True:
412         kwargs["filters"] = nextpage+other_filters
413         items = fn(**kwargs).execute(num_retries=num_retries)
414
415         if len(items["items"]) == 0:
416             if prev_page_all_same_order_key:
417                 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
418                 prev_page_all_same_order_key = False
419                 continue
420             else:
421                 return
422
423         seen_prevpage = seen_thispage
424         seen_thispage = set()
425
426         for i in items["items"]:
427             # In cases where there's more than one record with the
428             # same order key, the result could include records we
429             # already saw in the last page.  Skip them.
430             if i["uuid"] in seen_prevpage:
431                 continue
432             seen_thispage.add(i["uuid"])
433             yield i
434
435         firstitem = items["items"][0]
436         lastitem = items["items"][-1]
437
438         if firstitem[order_key] == lastitem[order_key]:
439             # Got a page where every item has the same order key.
440             # Switch to using uuid for paging.
441             nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
442             prev_page_all_same_order_key = True
443         else:
444             # Start from the last order key seen, but skip the last
445             # known uuid to avoid retrieving the same row twice.  If
446             # there are multiple rows with the same order key it is
447             # still likely we'll end up retrieving duplicate rows.
448             # That's handled by tracking the "seen" rows for each page
449             # so they can be skipped if they show up on the next page.
450             nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
451             prev_page_all_same_order_key = False
452
453
454 def ca_certs_path(fallback=httplib2.CA_CERTS):
455     """Return the path of the best available CA certs source.
456
457     This function searches for various distribution sources of CA
458     certificates, and returns the first it finds.  If it doesn't find any,
459     it returns the value of `fallback` (httplib2's CA certs by default).
460     """
461     for ca_certs_path in [
462         # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
463         # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
464         os.environ.get('SSL_CERT_FILE'),
465         # Arvados specific:
466         '/etc/arvados/ca-certificates.crt',
467         # Debian:
468         '/etc/ssl/certs/ca-certificates.crt',
469         # Red Hat:
470         '/etc/pki/tls/certs/ca-bundle.crt',
471         ]:
472         if ca_certs_path and os.path.exists(ca_certs_path):
473             return ca_certs_path
474     return fallback
475
476 def new_request_id():
477     rid = "req-"
478     # 2**104 > 36**20 > 2**103
479     n = random.getrandbits(104)
480     for _ in range(20):
481         c = n % 36
482         if c < 10:
483             rid += chr(c+ord('0'))
484         else:
485             rid += chr(c+ord('a')-10)
486         n = n // 36
487     return rid
488
489 def get_config_once(svc):
490     if not svc._rootDesc.get('resources').get('configs', False):
491         # Old API server version, no config export endpoint
492         return {}
493     if not hasattr(svc, '_cached_config'):
494         svc._cached_config = svc.configs().get().execute()
495     return svc._cached_config
496
497 def get_vocabulary_once(svc):
498     if not svc._rootDesc.get('resources').get('vocabularies', False):
499         # Old API server version, no vocabulary export endpoint
500         return {}
501     if not hasattr(svc, '_cached_vocabulary'):
502         svc._cached_vocabulary = svc.vocabularies().get().execute()
503     return svc._cached_vocabulary
504
505 def trim_name(collectionname):
506     """
507     trim_name takes a record name (collection name, project name, etc)
508     and trims it to fit the 255 character name limit, with additional
509     space for the timestamp added by ensure_unique_name, by removing
510     excess characters from the middle and inserting an ellipse
511     """
512
513     max_name_len = 254 - 28
514
515     if len(collectionname) > max_name_len:
516         over = len(collectionname) - max_name_len
517         split = int(max_name_len/2)
518         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
519
520     return collectionname