1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from builtins import range
22 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
23 CR_UNCOMMITTED = 'Uncommitted'
24 CR_COMMITTED = 'Committed'
27 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
28 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
29 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
30 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
31 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
32 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
33 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
34 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
35 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
36 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
37 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
39 def _deprecated(version=None, preferred=None):
40 """Mark a callable as deprecated in the SDK
42 This will wrap the callable to emit as a DeprecationWarning
43 and add a deprecation notice to its docstring.
45 If the following arguments are given, they'll be included in the
49 : The name of an alternative that users should use instead.
52 : The version of Arvados when the callable is scheduled to be
58 version = f' and scheduled to be removed in Arvados {version}'
62 preferred = f' Prefer {preferred} instead.'
63 def deprecated_decorator(func):
64 fullname = f'{func.__module__}.{func.__qualname__}'
65 parent, _, name = fullname.rpartition('.')
66 if name == '__init__':
68 warning_msg = f'{fullname} is deprecated{version}.{preferred}'
69 @functools.wraps(func)
70 def deprecated_wrapper(*args, **kwargs):
71 warnings.warn(warning_msg, DeprecationWarning, 2)
72 return func(*args, **kwargs)
73 # Get func's docstring without any trailing newline or empty lines.
74 func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
75 match = re.search(r'\n([ \t]+)\S', func_doc)
76 indent = '' if match is None else match.group(1)
77 # Make the deprecation notice the second "paragraph" of the
78 # docstring if possible. Otherwise append it.
79 docstring, count = re.subn(
80 rf'\n[ \t]*\n{indent}',
81 f'\n\n{indent}DEPRECATED: {warning_msg}\n\n{indent}',
86 docstring = f'{func_doc}\n\n{indent}DEPRECATED: {warning_msg}'.lstrip()
87 deprecated_wrapper.__doc__ = docstring
88 return deprecated_wrapper
89 return deprecated_decorator
92 def clear_tmpdir(path=None):
94 Ensure the given directory (or TASK_TMPDIR if none given)
97 from arvados import current_task
99 path = current_task().tmpdir
100 if os.path.exists(path):
101 p = subprocess.Popen(['rm', '-rf', path])
102 stdout, stderr = p.communicate(None)
103 if p.returncode != 0:
104 raise Exception('rm -rf %s: %s' % (path, stderr))
107 @_deprecated('3.0', 'subprocess.run')
108 def run_command(execargs, **kwargs):
109 kwargs.setdefault('stdin', subprocess.PIPE)
110 kwargs.setdefault('stdout', subprocess.PIPE)
111 kwargs.setdefault('stderr', sys.stderr)
112 kwargs.setdefault('close_fds', True)
113 kwargs.setdefault('shell', False)
114 p = subprocess.Popen(execargs, **kwargs)
115 stdoutdata, stderrdata = p.communicate(None)
116 if p.returncode != 0:
117 raise arvados.errors.CommandFailedError(
118 "run_command %s exit %d:\n%s" %
119 (execargs, p.returncode, stderrdata))
120 return stdoutdata, stderrdata
123 def git_checkout(url, version, path):
124 from arvados import current_job
125 if not re.search('^/', path):
126 path = os.path.join(current_job().tmpdir, path)
127 if not os.path.exists(path):
128 run_command(["git", "clone", url, path],
129 cwd=os.path.dirname(path))
130 run_command(["git", "checkout", version],
135 def tar_extractor(path, decompress_flag):
136 return subprocess.Popen(["tar",
138 ("-x%sf" % decompress_flag),
141 stdin=subprocess.PIPE, stderr=sys.stderr,
142 shell=False, close_fds=True)
144 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
145 def tarball_extract(tarball, path):
146 """Retrieve a tarball from Keep and extract it to a local
147 directory. Return the absolute path where the tarball was
148 extracted. If the top level of the tarball contained just one
149 file or directory, return the absolute path of that single
152 tarball -- collection locator
153 path -- where to extract the tarball: absolute, or relative to job tmp
155 from arvados import current_job
156 from arvados.collection import CollectionReader
157 if not re.search('^/', path):
158 path = os.path.join(current_job().tmpdir, path)
159 lockfile = open(path + '.lock', 'w')
160 fcntl.flock(lockfile, fcntl.LOCK_EX)
165 already_have_it = False
167 if os.readlink(os.path.join(path, '.locator')) == tarball:
168 already_have_it = True
171 if not already_have_it:
173 # emulate "rm -f" (i.e., if the file does not exist, we win)
175 os.unlink(os.path.join(path, '.locator'))
177 if os.path.exists(os.path.join(path, '.locator')):
178 os.unlink(os.path.join(path, '.locator'))
180 for f in CollectionReader(tarball).all_files():
182 if f_name.endswith(('.tbz', '.tar.bz2')):
183 p = tar_extractor(path, 'j')
184 elif f_name.endswith(('.tgz', '.tar.gz')):
185 p = tar_extractor(path, 'z')
186 elif f_name.endswith('.tar'):
187 p = tar_extractor(path, '')
189 raise arvados.errors.AssertionError(
190 "tarball_extract cannot handle filename %s" % f.name())
198 if p.returncode != 0:
200 raise arvados.errors.CommandFailedError(
201 "tar exited %d" % p.returncode)
202 os.symlink(tarball, os.path.join(path, '.locator'))
203 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
205 if len(tld_extracts) == 1:
206 return os.path.join(path, tld_extracts[0])
209 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
210 def zipball_extract(zipball, path):
211 """Retrieve a zip archive from Keep and extract it to a local
212 directory. Return the absolute path where the archive was
213 extracted. If the top level of the archive contained just one
214 file or directory, return the absolute path of that single
217 zipball -- collection locator
218 path -- where to extract the archive: absolute, or relative to job tmp
220 from arvados import current_job
221 from arvados.collection import CollectionReader
222 if not re.search('^/', path):
223 path = os.path.join(current_job().tmpdir, path)
224 lockfile = open(path + '.lock', 'w')
225 fcntl.flock(lockfile, fcntl.LOCK_EX)
230 already_have_it = False
232 if os.readlink(os.path.join(path, '.locator')) == zipball:
233 already_have_it = True
236 if not already_have_it:
238 # emulate "rm -f" (i.e., if the file does not exist, we win)
240 os.unlink(os.path.join(path, '.locator'))
242 if os.path.exists(os.path.join(path, '.locator')):
243 os.unlink(os.path.join(path, '.locator'))
245 for f in CollectionReader(zipball).all_files():
246 if not f.name().endswith('.zip'):
247 raise arvados.errors.NotImplementedError(
248 "zipball_extract cannot handle filename %s" % f.name())
249 zip_filename = os.path.join(path, os.path.basename(f.name()))
250 zip_file = open(zip_filename, 'wb')
258 p = subprocess.Popen(["unzip",
263 stdin=None, stderr=sys.stderr,
264 shell=False, close_fds=True)
266 if p.returncode != 0:
268 raise arvados.errors.CommandFailedError(
269 "unzip exited %d" % p.returncode)
270 os.unlink(zip_filename)
271 os.symlink(zipball, os.path.join(path, '.locator'))
272 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
274 if len(tld_extracts) == 1:
275 return os.path.join(path, tld_extracts[0])
278 @_deprecated('3.0', 'arvados.collection.Collection')
279 def collection_extract(collection, path, files=[], decompress=True):
280 """Retrieve a collection from Keep and extract it to a local
281 directory. Return the absolute path where the collection was
284 collection -- collection locator
285 path -- where to extract: absolute, or relative to job tmp
287 from arvados import current_job
288 from arvados.collection import CollectionReader
289 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
291 collection_hash = matches.group(1)
293 collection_hash = hashlib.md5(collection).hexdigest()
294 if not re.search('^/', path):
295 path = os.path.join(current_job().tmpdir, path)
296 lockfile = open(path + '.lock', 'w')
297 fcntl.flock(lockfile, fcntl.LOCK_EX)
302 already_have_it = False
304 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
305 already_have_it = True
309 # emulate "rm -f" (i.e., if the file does not exist, we win)
311 os.unlink(os.path.join(path, '.locator'))
313 if os.path.exists(os.path.join(path, '.locator')):
314 os.unlink(os.path.join(path, '.locator'))
317 for s in CollectionReader(collection).all_streams():
318 stream_name = s.name()
319 for f in s.all_files():
321 ((f.name() not in files_got) and
322 (f.name() in files or
323 (decompress and f.decompressed_name() in files)))):
324 outname = f.decompressed_name() if decompress else f.name()
325 files_got += [outname]
326 if os.path.exists(os.path.join(path, stream_name, outname)):
328 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
329 outfile = open(os.path.join(path, stream_name, outname), 'wb')
330 for buf in (f.readall_decompressed() if decompress
334 if len(files_got) < len(files):
335 raise arvados.errors.AssertionError(
336 "Wanted files %s but only got %s from %s" %
338 [z.name() for z in CollectionReader(collection).all_files()]))
339 os.symlink(collection_hash, os.path.join(path, '.locator'))
344 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
345 def mkdir_dash_p(path):
346 if not os.path.isdir(path):
350 if e.errno == errno.EEXIST and os.path.isdir(path):
351 # It is not an error if someone else creates the
352 # directory between our exists() and makedirs() calls.
357 @_deprecated('3.0', 'arvados.collection.Collection')
358 def stream_extract(stream, path, files=[], decompress=True):
359 """Retrieve a stream from Keep and extract it to a local
360 directory. Return the absolute path where the stream was
363 stream -- StreamReader object
364 path -- where to extract: absolute, or relative to job tmp
366 from arvados import current_job
367 if not re.search('^/', path):
368 path = os.path.join(current_job().tmpdir, path)
369 lockfile = open(path + '.lock', 'w')
370 fcntl.flock(lockfile, fcntl.LOCK_EX)
377 for f in stream.all_files():
379 ((f.name() not in files_got) and
380 (f.name() in files or
381 (decompress and f.decompressed_name() in files)))):
382 outname = f.decompressed_name() if decompress else f.name()
383 files_got += [outname]
384 if os.path.exists(os.path.join(path, outname)):
385 os.unlink(os.path.join(path, outname))
386 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
387 outfile = open(os.path.join(path, outname), 'wb')
388 for buf in (f.readall_decompressed() if decompress
392 if len(files_got) < len(files):
393 raise arvados.errors.AssertionError(
394 "Wanted files %s but only got %s from %s" %
395 (files, files_got, [z.name() for z in stream.all_files()]))
399 @_deprecated('3.0', 'os.walk')
400 def listdir_recursive(dirname, base=None, max_depth=None):
401 """listdir_recursive(dirname, base, max_depth)
403 Return a list of file and directory names found under dirname.
405 If base is not None, prepend "{base}/" to each returned name.
407 If max_depth is None, descend into directories and return only the
408 names of files found in the directory tree.
410 If max_depth is a non-negative integer, stop descending into
411 directories at the given depth, and at that point return directory
414 If max_depth==0 (and base is None) this is equivalent to
415 sorted(os.listdir(dirname)).
418 for ent in sorted(os.listdir(dirname)):
419 ent_path = os.path.join(dirname, ent)
420 ent_base = os.path.join(base, ent) if base else ent
421 if os.path.isdir(ent_path) and max_depth != 0:
422 allfiles += listdir_recursive(
423 ent_path, base=ent_base,
424 max_depth=(max_depth-1 if max_depth else None))
426 allfiles += [ent_base]
429 def is_hex(s, *length_args):
430 """is_hex(s[, length[, max_length]]) -> boolean
432 Return True if s is a string of hexadecimal digits.
433 If one length argument is given, the string must contain exactly
434 that number of digits.
435 If two length arguments are given, the string must contain a number of
436 digits between those two lengths, inclusive.
437 Return False otherwise.
439 num_length_args = len(length_args)
440 if num_length_args > 2:
441 raise arvados.errors.ArgumentError(
442 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
443 elif num_length_args == 2:
444 good_len = (length_args[0] <= len(s) <= length_args[1])
445 elif num_length_args == 1:
446 good_len = (len(s) == length_args[0])
449 return bool(good_len and HEX_RE.match(s))
451 @_deprecated('3.0', 'arvados.util.keyset_list_all')
452 def list_all(fn, num_retries=0, **kwargs):
453 # Default limit to (effectively) api server's MAX_LIMIT
454 kwargs.setdefault('limit', sys.maxsize)
457 items_available = sys.maxsize
458 while len(items) < items_available:
459 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
461 items_available = c['items_available']
462 offset = c['offset'] + len(c['items'])
465 def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
467 kwargs["limit"] = pagesize
468 kwargs["count"] = 'none'
469 asc = "asc" if ascending else "desc"
470 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
471 other_filters = kwargs.get("filters", [])
474 select = set(kwargs['select'])
478 select.add(order_key)
480 kwargs['select'] = list(select)
484 expect_full_page = True
485 seen_prevpage = set()
486 seen_thispage = set()
488 prev_page_all_same_order_key = False
491 kwargs["filters"] = nextpage+other_filters
492 items = fn(**kwargs).execute(num_retries=num_retries)
494 if len(items["items"]) == 0:
495 if prev_page_all_same_order_key:
496 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
497 prev_page_all_same_order_key = False
502 seen_prevpage = seen_thispage
503 seen_thispage = set()
505 for i in items["items"]:
506 # In cases where there's more than one record with the
507 # same order key, the result could include records we
508 # already saw in the last page. Skip them.
509 if i["uuid"] in seen_prevpage:
511 seen_thispage.add(i["uuid"])
514 firstitem = items["items"][0]
515 lastitem = items["items"][-1]
517 if firstitem[order_key] == lastitem[order_key]:
518 # Got a page where every item has the same order key.
519 # Switch to using uuid for paging.
520 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
521 prev_page_all_same_order_key = True
523 # Start from the last order key seen, but skip the last
524 # known uuid to avoid retrieving the same row twice. If
525 # there are multiple rows with the same order key it is
526 # still likely we'll end up retrieving duplicate rows.
527 # That's handled by tracking the "seen" rows for each page
528 # so they can be skipped if they show up on the next page.
529 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
530 prev_page_all_same_order_key = False
532 def ca_certs_path(fallback=httplib2.CA_CERTS):
533 """Return the path of the best available CA certs source.
535 This function searches for various distribution sources of CA
536 certificates, and returns the first it finds. If it doesn't find any,
537 it returns the value of `fallback` (httplib2's CA certs by default).
539 for ca_certs_path in [
540 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
541 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
542 os.environ.get('SSL_CERT_FILE'),
544 '/etc/arvados/ca-certificates.crt',
546 '/etc/ssl/certs/ca-certificates.crt',
548 '/etc/pki/tls/certs/ca-bundle.crt',
550 if ca_certs_path and os.path.exists(ca_certs_path):
554 def new_request_id():
556 # 2**104 > 36**20 > 2**103
557 n = random.getrandbits(104)
561 rid += chr(c+ord('0'))
563 rid += chr(c+ord('a')-10)
567 def get_config_once(svc):
568 if not svc._rootDesc.get('resources').get('configs', False):
569 # Old API server version, no config export endpoint
571 if not hasattr(svc, '_cached_config'):
572 svc._cached_config = svc.configs().get().execute()
573 return svc._cached_config
575 def get_vocabulary_once(svc):
576 if not svc._rootDesc.get('resources').get('vocabularies', False):
577 # Old API server version, no vocabulary export endpoint
579 if not hasattr(svc, '_cached_vocabulary'):
580 svc._cached_vocabulary = svc.vocabularies().get().execute()
581 return svc._cached_vocabulary
583 def trim_name(collectionname):
585 trim_name takes a record name (collection name, project name, etc)
586 and trims it to fit the 255 character name limit, with additional
587 space for the timestamp added by ensure_unique_name, by removing
588 excess characters from the middle and inserting an ellipse
591 max_name_len = 254 - 28
593 if len(collectionname) > max_name_len:
594 over = len(collectionname) - max_name_len
595 split = int(max_name_len/2)
596 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
598 return collectionname