1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
35 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
36 """Regular expression to match a hexadecimal string (case-insensitive)"""
37 CR_UNCOMMITTED = 'Uncommitted'
38 """Constant `state` value for uncommited container requests"""
39 CR_COMMITTED = 'Committed'
40 """Constant `state` value for committed container requests"""
42 """Constant `state` value for finalized container requests"""
44 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
45 """Regular expression to match any Keep block locator"""
46 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
47 """Regular expression to match any Keep block locator with an access token hint"""
48 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
49 """Regular expression to match any collection portable data hash"""
50 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
51 """Regular expression to match an Arvados collection manifest text"""
52 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
53 """Regular expression to match a file path from a collection identified by portable data hash"""
54 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
55 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
57 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
58 """Regular expression to match any Arvados object UUID"""
59 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
60 """Regular expression to match any Arvados collection UUID"""
61 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
62 """Regular expression to match any Arvados container UUID"""
63 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
64 """Regular expression to match any Arvados group UUID"""
65 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
66 """Regular expression to match any Arvados link UUID"""
67 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
68 """Regular expression to match any Arvados user UUID"""
69 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
70 """Regular expression to match any Arvados job UUID
72 .. WARNING:: Deprecated
73 Arvados job resources are deprecated and will be removed in a future
74 release. Prefer the containers API instead.
77 def _deprecated(version=None, preferred=None):
78 """Mark a callable as deprecated in the SDK
80 This will wrap the callable to emit as a DeprecationWarning
81 and add a deprecation notice to its docstring.
83 If the following arguments are given, they'll be included in the
86 * preferred: str | None --- The name of an alternative that users should
89 * version: str | None --- The version of Arvados when the callable is
90 scheduled to be removed.
95 version = f' and scheduled to be removed in Arvados {version}'
99 preferred = f' Prefer {preferred} instead.'
100 def deprecated_decorator(func):
101 fullname = f'{func.__module__}.{func.__qualname__}'
102 parent, _, name = fullname.rpartition('.')
103 if name == '__init__':
105 warning_msg = f'{fullname} is deprecated{version}.{preferred}'
106 @functools.wraps(func)
107 def deprecated_wrapper(*args, **kwargs):
108 warnings.warn(warning_msg, DeprecationWarning, 2)
109 return func(*args, **kwargs)
110 # Get func's docstring without any trailing newline or empty lines.
111 func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
112 match = re.search(r'\n([ \t]+)\S', func_doc)
113 indent = '' if match is None else match.group(1)
114 warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent} {warning_msg}'
115 # Make the deprecation notice the second "paragraph" of the
116 # docstring if possible. Otherwise append it.
117 docstring, count = re.subn(
118 rf'\n[ \t]*\n{indent}',
119 f'{warning_doc}\n\n{indent}',
124 docstring = f'{func_doc.lstrip()}{warning_doc}'
125 deprecated_wrapper.__doc__ = docstring
126 return deprecated_wrapper
127 return deprecated_decorator
129 def is_hex(s: str, *length_args: int) -> bool:
130 """Indicate whether a string is a hexadecimal number
132 This method returns true if all characters in the string are hexadecimal
133 digits. It is case-insensitive.
135 You can also pass optional length arguments to check that the string has
136 the expected number of digits. If you pass one integer, the string must
137 have that length exactly, otherwise the method returns False. If you
138 pass two integers, the string's length must fall within that minimum and
139 maximum (inclusive), otherwise the method returns False.
143 * s: str --- The string to check
145 * length_args: int --- Optional length limit(s) for the string to check
147 num_length_args = len(length_args)
148 if num_length_args > 2:
149 raise arvados.errors.ArgumentError(
150 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
151 elif num_length_args == 2:
152 good_len = (length_args[0] <= len(s) <= length_args[1])
153 elif num_length_args == 1:
154 good_len = (len(s) == length_args[0])
157 return bool(good_len and HEX_RE.match(s))
160 fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
161 order_key: str="created_at",
163 ascending: bool=True,
165 ) -> Iterator[Dict[str, Any]]:
166 """Iterate all Arvados resources from an API list call
168 This method takes a method that represents an Arvados API list call, and
169 iterates the objects returned by the API server. It can make multiple API
170 calls to retrieve and iterate all objects available from the API server.
174 * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
175 function that wraps an Arvados API method that returns a list of
176 objects. If you have an Arvados API client named `arv`, examples
177 include `arv.collections().list` and `arv.groups().contents`. Note
178 that you should pass the function *without* calling it.
180 * order_key: str --- The name of the primary object field that objects
181 should be sorted by. This name is used to build an `order` argument
182 for `fn`. Default `'created_at'`.
184 * num_retries: int --- This argument is passed through to
185 `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
186 that method's docstring for details. Default 0 (meaning API calls will
187 use the `num_retries` value set when the Arvados API client was
190 * ascending: bool --- Used to build an `order` argument for `fn`. If True,
191 all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
192 fields will be sorted in `'desc'` (descending) order.
194 Additional keyword arguments will be passed directly to `fn` for each API
195 call. Note that this function sets `count`, `limit`, and `order` as part of
199 kwargs["limit"] = pagesize
200 kwargs["count"] = 'none'
201 asc = "asc" if ascending else "desc"
202 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
203 other_filters = kwargs.get("filters", [])
206 select = set(kwargs['select'])
210 select.add(order_key)
212 kwargs['select'] = list(select)
216 expect_full_page = True
217 seen_prevpage = set()
218 seen_thispage = set()
220 prev_page_all_same_order_key = False
223 kwargs["filters"] = nextpage+other_filters
224 items = fn(**kwargs).execute(num_retries=num_retries)
226 if len(items["items"]) == 0:
227 if prev_page_all_same_order_key:
228 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
229 prev_page_all_same_order_key = False
234 seen_prevpage = seen_thispage
235 seen_thispage = set()
237 for i in items["items"]:
238 # In cases where there's more than one record with the
239 # same order key, the result could include records we
240 # already saw in the last page. Skip them.
241 if i["uuid"] in seen_prevpage:
243 seen_thispage.add(i["uuid"])
246 firstitem = items["items"][0]
247 lastitem = items["items"][-1]
249 if firstitem[order_key] == lastitem[order_key]:
250 # Got a page where every item has the same order key.
251 # Switch to using uuid for paging.
252 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
253 prev_page_all_same_order_key = True
255 # Start from the last order key seen, but skip the last
256 # known uuid to avoid retrieving the same row twice. If
257 # there are multiple rows with the same order key it is
258 # still likely we'll end up retrieving duplicate rows.
259 # That's handled by tracking the "seen" rows for each page
260 # so they can be skipped if they show up on the next page.
261 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
262 prev_page_all_same_order_key = False
264 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
265 """Return the path of the best available source of CA certificates
267 This function checks various known paths that provide trusted CA
268 certificates, and returns the first one that exists. It checks:
270 * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
271 * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
272 * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
274 * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
277 If none of these paths exist, this function returns the value of `fallback`.
281 * fallback: T --- The value to return if none of the known paths exist.
282 The default value is the certificate store of Mozilla's trusted CAs
283 included with the Python [certifi][] package.
285 [certifi]: https://pypi.org/project/certifi/
287 for ca_certs_path in [
288 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
289 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
290 os.environ.get('SSL_CERT_FILE'),
292 '/etc/arvados/ca-certificates.crt',
294 '/etc/ssl/certs/ca-certificates.crt',
296 '/etc/pki/tls/certs/ca-bundle.crt',
298 if ca_certs_path and os.path.exists(ca_certs_path):
302 def new_request_id() -> str:
303 """Return a random request ID
305 This function generates and returns a random string suitable for use as a
306 `X-Request-Id` header value in the Arvados API.
309 # 2**104 > 36**20 > 2**103
310 n = random.getrandbits(104)
314 rid += chr(c+ord('0'))
316 rid += chr(c+ord('a')-10)
320 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
321 """Return an Arvados cluster's configuration, with caching
323 This function gets and returns the Arvados configuration from the API
324 server. It caches the result on the client object and reuses it on any
329 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
330 object to use to retrieve and cache the Arvados cluster configuration.
332 if not svc._rootDesc.get('resources').get('configs', False):
333 # Old API server version, no config export endpoint
335 if not hasattr(svc, '_cached_config'):
336 svc._cached_config = svc.configs().get().execute()
337 return svc._cached_config
339 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
340 """Return an Arvados cluster's vocabulary, with caching
342 This function gets and returns the Arvados vocabulary from the API
343 server. It caches the result on the client object and reuses it on any
346 .. HINT:: Low-level method
347 This is a relatively low-level wrapper around the Arvados API. Most
348 users will prefer to use `arvados.vocabulary.load_vocabulary`.
352 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
353 object to use to retrieve and cache the Arvados cluster vocabulary.
355 if not svc._rootDesc.get('resources').get('vocabularies', False):
356 # Old API server version, no vocabulary export endpoint
358 if not hasattr(svc, '_cached_vocabulary'):
359 svc._cached_vocabulary = svc.vocabularies().get().execute()
360 return svc._cached_vocabulary
362 def trim_name(collectionname: str) -> str:
363 """Limit the length of a name to fit within Arvados API limits
365 This function ensures that a string is short enough to use as an object
366 name in the Arvados API, leaving room for text that may be added by the
367 `ensure_unique_name` argument. If the source name is short enough, it is
368 returned unchanged. Otherwise, this function returns a string with excess
369 characters removed from the middle of the source string and replaced with
374 * collectionname: str --- The desired source name
376 max_name_len = 254 - 28
378 if len(collectionname) > max_name_len:
379 over = len(collectionname) - max_name_len
380 split = int(max_name_len/2)
381 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
383 return collectionname
385 @_deprecated('3.0', 'arvados.util.keyset_list_all')
386 def list_all(fn, num_retries=0, **kwargs):
387 # Default limit to (effectively) api server's MAX_LIMIT
388 kwargs.setdefault('limit', sys.maxsize)
391 items_available = sys.maxsize
392 while len(items) < items_available:
393 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
395 items_available = c['items_available']
396 offset = c['offset'] + len(c['items'])
400 def clear_tmpdir(path=None):
402 Ensure the given directory (or TASK_TMPDIR if none given)
405 from arvados import current_task
407 path = current_task().tmpdir
408 if os.path.exists(path):
409 p = subprocess.Popen(['rm', '-rf', path])
410 stdout, stderr = p.communicate(None)
411 if p.returncode != 0:
412 raise Exception('rm -rf %s: %s' % (path, stderr))
415 @_deprecated('3.0', 'subprocess.run')
416 def run_command(execargs, **kwargs):
417 kwargs.setdefault('stdin', subprocess.PIPE)
418 kwargs.setdefault('stdout', subprocess.PIPE)
419 kwargs.setdefault('stderr', sys.stderr)
420 kwargs.setdefault('close_fds', True)
421 kwargs.setdefault('shell', False)
422 p = subprocess.Popen(execargs, **kwargs)
423 stdoutdata, stderrdata = p.communicate(None)
424 if p.returncode != 0:
425 raise arvados.errors.CommandFailedError(
426 "run_command %s exit %d:\n%s" %
427 (execargs, p.returncode, stderrdata))
428 return stdoutdata, stderrdata
431 def git_checkout(url, version, path):
432 from arvados import current_job
433 if not re.search('^/', path):
434 path = os.path.join(current_job().tmpdir, path)
435 if not os.path.exists(path):
436 run_command(["git", "clone", url, path],
437 cwd=os.path.dirname(path))
438 run_command(["git", "checkout", version],
443 def tar_extractor(path, decompress_flag):
444 return subprocess.Popen(["tar",
446 ("-x%sf" % decompress_flag),
449 stdin=subprocess.PIPE, stderr=sys.stderr,
450 shell=False, close_fds=True)
452 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
453 def tarball_extract(tarball, path):
454 """Retrieve a tarball from Keep and extract it to a local
455 directory. Return the absolute path where the tarball was
456 extracted. If the top level of the tarball contained just one
457 file or directory, return the absolute path of that single
460 tarball -- collection locator
461 path -- where to extract the tarball: absolute, or relative to job tmp
463 from arvados import current_job
464 from arvados.collection import CollectionReader
465 if not re.search('^/', path):
466 path = os.path.join(current_job().tmpdir, path)
467 lockfile = open(path + '.lock', 'w')
468 fcntl.flock(lockfile, fcntl.LOCK_EX)
473 already_have_it = False
475 if os.readlink(os.path.join(path, '.locator')) == tarball:
476 already_have_it = True
479 if not already_have_it:
481 # emulate "rm -f" (i.e., if the file does not exist, we win)
483 os.unlink(os.path.join(path, '.locator'))
485 if os.path.exists(os.path.join(path, '.locator')):
486 os.unlink(os.path.join(path, '.locator'))
488 for f in CollectionReader(tarball).all_files():
490 if f_name.endswith(('.tbz', '.tar.bz2')):
491 p = tar_extractor(path, 'j')
492 elif f_name.endswith(('.tgz', '.tar.gz')):
493 p = tar_extractor(path, 'z')
494 elif f_name.endswith('.tar'):
495 p = tar_extractor(path, '')
497 raise arvados.errors.AssertionError(
498 "tarball_extract cannot handle filename %s" % f.name())
506 if p.returncode != 0:
508 raise arvados.errors.CommandFailedError(
509 "tar exited %d" % p.returncode)
510 os.symlink(tarball, os.path.join(path, '.locator'))
511 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
513 if len(tld_extracts) == 1:
514 return os.path.join(path, tld_extracts[0])
517 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
518 def zipball_extract(zipball, path):
519 """Retrieve a zip archive from Keep and extract it to a local
520 directory. Return the absolute path where the archive was
521 extracted. If the top level of the archive contained just one
522 file or directory, return the absolute path of that single
525 zipball -- collection locator
526 path -- where to extract the archive: absolute, or relative to job tmp
528 from arvados import current_job
529 from arvados.collection import CollectionReader
530 if not re.search('^/', path):
531 path = os.path.join(current_job().tmpdir, path)
532 lockfile = open(path + '.lock', 'w')
533 fcntl.flock(lockfile, fcntl.LOCK_EX)
538 already_have_it = False
540 if os.readlink(os.path.join(path, '.locator')) == zipball:
541 already_have_it = True
544 if not already_have_it:
546 # emulate "rm -f" (i.e., if the file does not exist, we win)
548 os.unlink(os.path.join(path, '.locator'))
550 if os.path.exists(os.path.join(path, '.locator')):
551 os.unlink(os.path.join(path, '.locator'))
553 for f in CollectionReader(zipball).all_files():
554 if not f.name().endswith('.zip'):
555 raise arvados.errors.NotImplementedError(
556 "zipball_extract cannot handle filename %s" % f.name())
557 zip_filename = os.path.join(path, os.path.basename(f.name()))
558 zip_file = open(zip_filename, 'wb')
566 p = subprocess.Popen(["unzip",
571 stdin=None, stderr=sys.stderr,
572 shell=False, close_fds=True)
574 if p.returncode != 0:
576 raise arvados.errors.CommandFailedError(
577 "unzip exited %d" % p.returncode)
578 os.unlink(zip_filename)
579 os.symlink(zipball, os.path.join(path, '.locator'))
580 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
582 if len(tld_extracts) == 1:
583 return os.path.join(path, tld_extracts[0])
586 @_deprecated('3.0', 'arvados.collection.Collection')
587 def collection_extract(collection, path, files=[], decompress=True):
588 """Retrieve a collection from Keep and extract it to a local
589 directory. Return the absolute path where the collection was
592 collection -- collection locator
593 path -- where to extract: absolute, or relative to job tmp
595 from arvados import current_job
596 from arvados.collection import CollectionReader
597 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
599 collection_hash = matches.group(1)
601 collection_hash = hashlib.md5(collection).hexdigest()
602 if not re.search('^/', path):
603 path = os.path.join(current_job().tmpdir, path)
604 lockfile = open(path + '.lock', 'w')
605 fcntl.flock(lockfile, fcntl.LOCK_EX)
610 already_have_it = False
612 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
613 already_have_it = True
617 # emulate "rm -f" (i.e., if the file does not exist, we win)
619 os.unlink(os.path.join(path, '.locator'))
621 if os.path.exists(os.path.join(path, '.locator')):
622 os.unlink(os.path.join(path, '.locator'))
625 for s in CollectionReader(collection).all_streams():
626 stream_name = s.name()
627 for f in s.all_files():
629 ((f.name() not in files_got) and
630 (f.name() in files or
631 (decompress and f.decompressed_name() in files)))):
632 outname = f.decompressed_name() if decompress else f.name()
633 files_got += [outname]
634 if os.path.exists(os.path.join(path, stream_name, outname)):
636 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
637 outfile = open(os.path.join(path, stream_name, outname), 'wb')
638 for buf in (f.readall_decompressed() if decompress
642 if len(files_got) < len(files):
643 raise arvados.errors.AssertionError(
644 "Wanted files %s but only got %s from %s" %
646 [z.name() for z in CollectionReader(collection).all_files()]))
647 os.symlink(collection_hash, os.path.join(path, '.locator'))
652 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
653 def mkdir_dash_p(path):
654 if not os.path.isdir(path):
658 if e.errno == errno.EEXIST and os.path.isdir(path):
659 # It is not an error if someone else creates the
660 # directory between our exists() and makedirs() calls.
665 @_deprecated('3.0', 'arvados.collection.Collection')
666 def stream_extract(stream, path, files=[], decompress=True):
667 """Retrieve a stream from Keep and extract it to a local
668 directory. Return the absolute path where the stream was
671 stream -- StreamReader object
672 path -- where to extract: absolute, or relative to job tmp
674 from arvados import current_job
675 if not re.search('^/', path):
676 path = os.path.join(current_job().tmpdir, path)
677 lockfile = open(path + '.lock', 'w')
678 fcntl.flock(lockfile, fcntl.LOCK_EX)
685 for f in stream.all_files():
687 ((f.name() not in files_got) and
688 (f.name() in files or
689 (decompress and f.decompressed_name() in files)))):
690 outname = f.decompressed_name() if decompress else f.name()
691 files_got += [outname]
692 if os.path.exists(os.path.join(path, outname)):
693 os.unlink(os.path.join(path, outname))
694 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
695 outfile = open(os.path.join(path, outname), 'wb')
696 for buf in (f.readall_decompressed() if decompress
700 if len(files_got) < len(files):
701 raise arvados.errors.AssertionError(
702 "Wanted files %s but only got %s from %s" %
703 (files, files_got, [z.name() for z in stream.all_files()]))
707 @_deprecated('3.0', 'os.walk')
708 def listdir_recursive(dirname, base=None, max_depth=None):
709 """listdir_recursive(dirname, base, max_depth)
711 Return a list of file and directory names found under dirname.
713 If base is not None, prepend "{base}/" to each returned name.
715 If max_depth is None, descend into directories and return only the
716 names of files found in the directory tree.
718 If max_depth is a non-negative integer, stop descending into
719 directories at the given depth, and at that point return directory
722 If max_depth==0 (and base is None) this is equivalent to
723 sorted(os.listdir(dirname)).
726 for ent in sorted(os.listdir(dirname)):
727 ent_path = os.path.join(dirname, ent)
728 ent_base = os.path.join(base, ent) if base else ent
729 if os.path.isdir(ent_path) and max_depth != 0:
730 allfiles += listdir_recursive(
731 ent_path, base=ent_base,
732 max_depth=(max_depth-1 if max_depth else None))
734 allfiles += [ent_base]