1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from builtins import range
22 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
23 CR_UNCOMMITTED = 'Uncommitted'
24 CR_COMMITTED = 'Committed'
27 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
28 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
29 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
30 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
31 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
32 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
33 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
34 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
35 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
36 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
37 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
38 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
39 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
41 def _deprecated(version=None, preferred=None):
42 """Mark a callable as deprecated in the SDK
44 This will wrap the callable to emit as a DeprecationWarning
45 and add a deprecation notice to its docstring.
47 If the following arguments are given, they'll be included in the
51 : The name of an alternative that users should use instead.
54 : The version of Arvados when the callable is scheduled to be
60 version = f' and scheduled to be removed in Arvados {version}'
64 preferred = f' Prefer {preferred} instead.'
65 def deprecated_decorator(func):
66 fullname = f'{func.__module__}.{func.__qualname__}'
67 parent, _, name = fullname.rpartition('.')
68 if name == '__init__':
70 warning_msg = f'{fullname} is deprecated{version}.{preferred}'
71 @functools.wraps(func)
72 def deprecated_wrapper(*args, **kwargs):
73 warnings.warn(warning_msg, DeprecationWarning, 2)
74 return func(*args, **kwargs)
75 # Get func's docstring without any trailing newline or empty lines.
76 func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
77 match = re.search(r'\n([ \t]+)\S', func_doc)
78 indent = '' if match is None else match.group(1)
79 warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent} {warning_msg}'
80 # Make the deprecation notice the second "paragraph" of the
81 # docstring if possible. Otherwise append it.
82 docstring, count = re.subn(
83 rf'\n[ \t]*\n{indent}',
84 f'{warning_doc}\n\n{indent}',
89 docstring = f'{func_doc.lstrip()}{warning_doc}'
90 deprecated_wrapper.__doc__ = docstring
91 return deprecated_wrapper
92 return deprecated_decorator
95 def clear_tmpdir(path=None):
97 Ensure the given directory (or TASK_TMPDIR if none given)
100 from arvados import current_task
102 path = current_task().tmpdir
103 if os.path.exists(path):
104 p = subprocess.Popen(['rm', '-rf', path])
105 stdout, stderr = p.communicate(None)
106 if p.returncode != 0:
107 raise Exception('rm -rf %s: %s' % (path, stderr))
110 @_deprecated('3.0', 'subprocess.run')
111 def run_command(execargs, **kwargs):
112 kwargs.setdefault('stdin', subprocess.PIPE)
113 kwargs.setdefault('stdout', subprocess.PIPE)
114 kwargs.setdefault('stderr', sys.stderr)
115 kwargs.setdefault('close_fds', True)
116 kwargs.setdefault('shell', False)
117 p = subprocess.Popen(execargs, **kwargs)
118 stdoutdata, stderrdata = p.communicate(None)
119 if p.returncode != 0:
120 raise arvados.errors.CommandFailedError(
121 "run_command %s exit %d:\n%s" %
122 (execargs, p.returncode, stderrdata))
123 return stdoutdata, stderrdata
126 def git_checkout(url, version, path):
127 from arvados import current_job
128 if not re.search('^/', path):
129 path = os.path.join(current_job().tmpdir, path)
130 if not os.path.exists(path):
131 run_command(["git", "clone", url, path],
132 cwd=os.path.dirname(path))
133 run_command(["git", "checkout", version],
138 def tar_extractor(path, decompress_flag):
139 return subprocess.Popen(["tar",
141 ("-x%sf" % decompress_flag),
144 stdin=subprocess.PIPE, stderr=sys.stderr,
145 shell=False, close_fds=True)
147 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
148 def tarball_extract(tarball, path):
149 """Retrieve a tarball from Keep and extract it to a local
150 directory. Return the absolute path where the tarball was
151 extracted. If the top level of the tarball contained just one
152 file or directory, return the absolute path of that single
155 tarball -- collection locator
156 path -- where to extract the tarball: absolute, or relative to job tmp
158 from arvados import current_job
159 from arvados.collection import CollectionReader
160 if not re.search('^/', path):
161 path = os.path.join(current_job().tmpdir, path)
162 lockfile = open(path + '.lock', 'w')
163 fcntl.flock(lockfile, fcntl.LOCK_EX)
168 already_have_it = False
170 if os.readlink(os.path.join(path, '.locator')) == tarball:
171 already_have_it = True
174 if not already_have_it:
176 # emulate "rm -f" (i.e., if the file does not exist, we win)
178 os.unlink(os.path.join(path, '.locator'))
180 if os.path.exists(os.path.join(path, '.locator')):
181 os.unlink(os.path.join(path, '.locator'))
183 for f in CollectionReader(tarball).all_files():
185 if f_name.endswith(('.tbz', '.tar.bz2')):
186 p = tar_extractor(path, 'j')
187 elif f_name.endswith(('.tgz', '.tar.gz')):
188 p = tar_extractor(path, 'z')
189 elif f_name.endswith('.tar'):
190 p = tar_extractor(path, '')
192 raise arvados.errors.AssertionError(
193 "tarball_extract cannot handle filename %s" % f.name())
201 if p.returncode != 0:
203 raise arvados.errors.CommandFailedError(
204 "tar exited %d" % p.returncode)
205 os.symlink(tarball, os.path.join(path, '.locator'))
206 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
208 if len(tld_extracts) == 1:
209 return os.path.join(path, tld_extracts[0])
212 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
213 def zipball_extract(zipball, path):
214 """Retrieve a zip archive from Keep and extract it to a local
215 directory. Return the absolute path where the archive was
216 extracted. If the top level of the archive contained just one
217 file or directory, return the absolute path of that single
220 zipball -- collection locator
221 path -- where to extract the archive: absolute, or relative to job tmp
223 from arvados import current_job
224 from arvados.collection import CollectionReader
225 if not re.search('^/', path):
226 path = os.path.join(current_job().tmpdir, path)
227 lockfile = open(path + '.lock', 'w')
228 fcntl.flock(lockfile, fcntl.LOCK_EX)
233 already_have_it = False
235 if os.readlink(os.path.join(path, '.locator')) == zipball:
236 already_have_it = True
239 if not already_have_it:
241 # emulate "rm -f" (i.e., if the file does not exist, we win)
243 os.unlink(os.path.join(path, '.locator'))
245 if os.path.exists(os.path.join(path, '.locator')):
246 os.unlink(os.path.join(path, '.locator'))
248 for f in CollectionReader(zipball).all_files():
249 if not f.name().endswith('.zip'):
250 raise arvados.errors.NotImplementedError(
251 "zipball_extract cannot handle filename %s" % f.name())
252 zip_filename = os.path.join(path, os.path.basename(f.name()))
253 zip_file = open(zip_filename, 'wb')
261 p = subprocess.Popen(["unzip",
266 stdin=None, stderr=sys.stderr,
267 shell=False, close_fds=True)
269 if p.returncode != 0:
271 raise arvados.errors.CommandFailedError(
272 "unzip exited %d" % p.returncode)
273 os.unlink(zip_filename)
274 os.symlink(zipball, os.path.join(path, '.locator'))
275 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
277 if len(tld_extracts) == 1:
278 return os.path.join(path, tld_extracts[0])
281 @_deprecated('3.0', 'arvados.collection.Collection')
282 def collection_extract(collection, path, files=[], decompress=True):
283 """Retrieve a collection from Keep and extract it to a local
284 directory. Return the absolute path where the collection was
287 collection -- collection locator
288 path -- where to extract: absolute, or relative to job tmp
290 from arvados import current_job
291 from arvados.collection import CollectionReader
292 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
294 collection_hash = matches.group(1)
296 collection_hash = hashlib.md5(collection).hexdigest()
297 if not re.search('^/', path):
298 path = os.path.join(current_job().tmpdir, path)
299 lockfile = open(path + '.lock', 'w')
300 fcntl.flock(lockfile, fcntl.LOCK_EX)
305 already_have_it = False
307 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
308 already_have_it = True
312 # emulate "rm -f" (i.e., if the file does not exist, we win)
314 os.unlink(os.path.join(path, '.locator'))
316 if os.path.exists(os.path.join(path, '.locator')):
317 os.unlink(os.path.join(path, '.locator'))
320 for s in CollectionReader(collection).all_streams():
321 stream_name = s.name()
322 for f in s.all_files():
324 ((f.name() not in files_got) and
325 (f.name() in files or
326 (decompress and f.decompressed_name() in files)))):
327 outname = f.decompressed_name() if decompress else f.name()
328 files_got += [outname]
329 if os.path.exists(os.path.join(path, stream_name, outname)):
331 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
332 outfile = open(os.path.join(path, stream_name, outname), 'wb')
333 for buf in (f.readall_decompressed() if decompress
337 if len(files_got) < len(files):
338 raise arvados.errors.AssertionError(
339 "Wanted files %s but only got %s from %s" %
341 [z.name() for z in CollectionReader(collection).all_files()]))
342 os.symlink(collection_hash, os.path.join(path, '.locator'))
347 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
348 def mkdir_dash_p(path):
349 if not os.path.isdir(path):
353 if e.errno == errno.EEXIST and os.path.isdir(path):
354 # It is not an error if someone else creates the
355 # directory between our exists() and makedirs() calls.
360 @_deprecated('3.0', 'arvados.collection.Collection')
361 def stream_extract(stream, path, files=[], decompress=True):
362 """Retrieve a stream from Keep and extract it to a local
363 directory. Return the absolute path where the stream was
366 stream -- StreamReader object
367 path -- where to extract: absolute, or relative to job tmp
369 from arvados import current_job
370 if not re.search('^/', path):
371 path = os.path.join(current_job().tmpdir, path)
372 lockfile = open(path + '.lock', 'w')
373 fcntl.flock(lockfile, fcntl.LOCK_EX)
380 for f in stream.all_files():
382 ((f.name() not in files_got) and
383 (f.name() in files or
384 (decompress and f.decompressed_name() in files)))):
385 outname = f.decompressed_name() if decompress else f.name()
386 files_got += [outname]
387 if os.path.exists(os.path.join(path, outname)):
388 os.unlink(os.path.join(path, outname))
389 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
390 outfile = open(os.path.join(path, outname), 'wb')
391 for buf in (f.readall_decompressed() if decompress
395 if len(files_got) < len(files):
396 raise arvados.errors.AssertionError(
397 "Wanted files %s but only got %s from %s" %
398 (files, files_got, [z.name() for z in stream.all_files()]))
402 @_deprecated('3.0', 'os.walk')
403 def listdir_recursive(dirname, base=None, max_depth=None):
404 """listdir_recursive(dirname, base, max_depth)
406 Return a list of file and directory names found under dirname.
408 If base is not None, prepend "{base}/" to each returned name.
410 If max_depth is None, descend into directories and return only the
411 names of files found in the directory tree.
413 If max_depth is a non-negative integer, stop descending into
414 directories at the given depth, and at that point return directory
417 If max_depth==0 (and base is None) this is equivalent to
418 sorted(os.listdir(dirname)).
421 for ent in sorted(os.listdir(dirname)):
422 ent_path = os.path.join(dirname, ent)
423 ent_base = os.path.join(base, ent) if base else ent
424 if os.path.isdir(ent_path) and max_depth != 0:
425 allfiles += listdir_recursive(
426 ent_path, base=ent_base,
427 max_depth=(max_depth-1 if max_depth else None))
429 allfiles += [ent_base]
432 def is_hex(s, *length_args):
433 """is_hex(s[, length[, max_length]]) -> boolean
435 Return True if s is a string of hexadecimal digits.
436 If one length argument is given, the string must contain exactly
437 that number of digits.
438 If two length arguments are given, the string must contain a number of
439 digits between those two lengths, inclusive.
440 Return False otherwise.
442 num_length_args = len(length_args)
443 if num_length_args > 2:
444 raise arvados.errors.ArgumentError(
445 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
446 elif num_length_args == 2:
447 good_len = (length_args[0] <= len(s) <= length_args[1])
448 elif num_length_args == 1:
449 good_len = (len(s) == length_args[0])
452 return bool(good_len and HEX_RE.match(s))
454 @_deprecated('3.0', 'arvados.util.keyset_list_all')
455 def list_all(fn, num_retries=0, **kwargs):
456 # Default limit to (effectively) api server's MAX_LIMIT
457 kwargs.setdefault('limit', sys.maxsize)
460 items_available = sys.maxsize
461 while len(items) < items_available:
462 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
464 items_available = c['items_available']
465 offset = c['offset'] + len(c['items'])
468 def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
470 kwargs["limit"] = pagesize
471 kwargs["count"] = 'none'
472 asc = "asc" if ascending else "desc"
473 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
474 other_filters = kwargs.get("filters", [])
477 select = set(kwargs['select'])
481 select.add(order_key)
483 kwargs['select'] = list(select)
487 expect_full_page = True
488 seen_prevpage = set()
489 seen_thispage = set()
491 prev_page_all_same_order_key = False
494 kwargs["filters"] = nextpage+other_filters
495 items = fn(**kwargs).execute(num_retries=num_retries)
497 if len(items["items"]) == 0:
498 if prev_page_all_same_order_key:
499 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
500 prev_page_all_same_order_key = False
505 seen_prevpage = seen_thispage
506 seen_thispage = set()
508 for i in items["items"]:
509 # In cases where there's more than one record with the
510 # same order key, the result could include records we
511 # already saw in the last page. Skip them.
512 if i["uuid"] in seen_prevpage:
514 seen_thispage.add(i["uuid"])
517 firstitem = items["items"][0]
518 lastitem = items["items"][-1]
520 if firstitem[order_key] == lastitem[order_key]:
521 # Got a page where every item has the same order key.
522 # Switch to using uuid for paging.
523 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
524 prev_page_all_same_order_key = True
526 # Start from the last order key seen, but skip the last
527 # known uuid to avoid retrieving the same row twice. If
528 # there are multiple rows with the same order key it is
529 # still likely we'll end up retrieving duplicate rows.
530 # That's handled by tracking the "seen" rows for each page
531 # so they can be skipped if they show up on the next page.
532 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
533 prev_page_all_same_order_key = False
535 def ca_certs_path(fallback=httplib2.CA_CERTS):
536 """Return the path of the best available CA certs source.
538 This function searches for various distribution sources of CA
539 certificates, and returns the first it finds. If it doesn't find any,
540 it returns the value of `fallback` (httplib2's CA certs by default).
542 for ca_certs_path in [
543 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
544 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
545 os.environ.get('SSL_CERT_FILE'),
547 '/etc/arvados/ca-certificates.crt',
549 '/etc/ssl/certs/ca-certificates.crt',
551 '/etc/pki/tls/certs/ca-bundle.crt',
553 if ca_certs_path and os.path.exists(ca_certs_path):
557 def new_request_id():
559 # 2**104 > 36**20 > 2**103
560 n = random.getrandbits(104)
564 rid += chr(c+ord('0'))
566 rid += chr(c+ord('a')-10)
570 def get_config_once(svc):
571 if not svc._rootDesc.get('resources').get('configs', False):
572 # Old API server version, no config export endpoint
574 if not hasattr(svc, '_cached_config'):
575 svc._cached_config = svc.configs().get().execute()
576 return svc._cached_config
578 def get_vocabulary_once(svc):
579 if not svc._rootDesc.get('resources').get('vocabularies', False):
580 # Old API server version, no vocabulary export endpoint
582 if not hasattr(svc, '_cached_vocabulary'):
583 svc._cached_vocabulary = svc.vocabularies().get().execute()
584 return svc._cached_vocabulary
586 def trim_name(collectionname):
588 trim_name takes a record name (collection name, project name, etc)
589 and trims it to fit the 255 character name limit, with additional
590 space for the timestamp added by ensure_unique_name, by removing
591 excess characters from the middle and inserting an ellipse
594 max_name_len = 254 - 28
596 if len(collectionname) > max_name_len:
597 over = len(collectionname) - max_name_len
598 split = int(max_name_len/2)
599 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
601 return collectionname