1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from builtins import range
22 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
23 CR_UNCOMMITTED = 'Uncommitted'
24 CR_COMMITTED = 'Committed'
27 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
28 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
29 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
30 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
31 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
32 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
33 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
34 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
35 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
36 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
37 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
39 def _deprecated(version=None, preferred=None):
40 """Mark a callable as deprecated in the SDK
42 This will wrap the callable to emit as a DeprecationWarning
43 and add a deprecation notice to its docstring.
45 If the following arguments are given, they'll be included in the
49 : The name of an alternative that users should use instead.
52 : The version of Arvados when the callable is scheduled to be
58 version = f' and scheduled to be removed in Arvados {version}'
62 preferred = f' Prefer {preferred} instead.'
63 def deprecated_decorator(func):
64 fullname = f'{func.__module__}.{func.__qualname__}'
65 parent, _, name = fullname.rpartition('.')
66 if name == '__init__':
68 warning_msg = f'{fullname} is deprecated{version}.{preferred}'
69 @functools.wraps(func)
70 def deprecated_wrapper(*args, **kwargs):
71 warnings.warn(warning_msg, DeprecationWarning, 2)
72 return func(*args, **kwargs)
73 # Get func's docstring without any trailing newline or empty lines.
74 func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
75 match = re.search(r'\n([ \t]+)\S', func_doc)
76 indent = '' if match is None else match.group(1)
77 warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent} {warning_msg}'
78 # Make the deprecation notice the second "paragraph" of the
79 # docstring if possible. Otherwise append it.
80 docstring, count = re.subn(
81 rf'\n[ \t]*\n{indent}',
82 f'{warning_doc}\n\n{indent}',
87 docstring = f'{func_doc.lstrip()}{warning_doc}'
88 deprecated_wrapper.__doc__ = docstring
89 return deprecated_wrapper
90 return deprecated_decorator
93 def clear_tmpdir(path=None):
95 Ensure the given directory (or TASK_TMPDIR if none given)
98 from arvados import current_task
100 path = current_task().tmpdir
101 if os.path.exists(path):
102 p = subprocess.Popen(['rm', '-rf', path])
103 stdout, stderr = p.communicate(None)
104 if p.returncode != 0:
105 raise Exception('rm -rf %s: %s' % (path, stderr))
108 @_deprecated('3.0', 'subprocess.run')
109 def run_command(execargs, **kwargs):
110 kwargs.setdefault('stdin', subprocess.PIPE)
111 kwargs.setdefault('stdout', subprocess.PIPE)
112 kwargs.setdefault('stderr', sys.stderr)
113 kwargs.setdefault('close_fds', True)
114 kwargs.setdefault('shell', False)
115 p = subprocess.Popen(execargs, **kwargs)
116 stdoutdata, stderrdata = p.communicate(None)
117 if p.returncode != 0:
118 raise arvados.errors.CommandFailedError(
119 "run_command %s exit %d:\n%s" %
120 (execargs, p.returncode, stderrdata))
121 return stdoutdata, stderrdata
124 def git_checkout(url, version, path):
125 from arvados import current_job
126 if not re.search('^/', path):
127 path = os.path.join(current_job().tmpdir, path)
128 if not os.path.exists(path):
129 run_command(["git", "clone", url, path],
130 cwd=os.path.dirname(path))
131 run_command(["git", "checkout", version],
136 def tar_extractor(path, decompress_flag):
137 return subprocess.Popen(["tar",
139 ("-x%sf" % decompress_flag),
142 stdin=subprocess.PIPE, stderr=sys.stderr,
143 shell=False, close_fds=True)
145 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
146 def tarball_extract(tarball, path):
147 """Retrieve a tarball from Keep and extract it to a local
148 directory. Return the absolute path where the tarball was
149 extracted. If the top level of the tarball contained just one
150 file or directory, return the absolute path of that single
153 tarball -- collection locator
154 path -- where to extract the tarball: absolute, or relative to job tmp
156 from arvados import current_job
157 from arvados.collection import CollectionReader
158 if not re.search('^/', path):
159 path = os.path.join(current_job().tmpdir, path)
160 lockfile = open(path + '.lock', 'w')
161 fcntl.flock(lockfile, fcntl.LOCK_EX)
166 already_have_it = False
168 if os.readlink(os.path.join(path, '.locator')) == tarball:
169 already_have_it = True
172 if not already_have_it:
174 # emulate "rm -f" (i.e., if the file does not exist, we win)
176 os.unlink(os.path.join(path, '.locator'))
178 if os.path.exists(os.path.join(path, '.locator')):
179 os.unlink(os.path.join(path, '.locator'))
181 for f in CollectionReader(tarball).all_files():
183 if f_name.endswith(('.tbz', '.tar.bz2')):
184 p = tar_extractor(path, 'j')
185 elif f_name.endswith(('.tgz', '.tar.gz')):
186 p = tar_extractor(path, 'z')
187 elif f_name.endswith('.tar'):
188 p = tar_extractor(path, '')
190 raise arvados.errors.AssertionError(
191 "tarball_extract cannot handle filename %s" % f.name())
199 if p.returncode != 0:
201 raise arvados.errors.CommandFailedError(
202 "tar exited %d" % p.returncode)
203 os.symlink(tarball, os.path.join(path, '.locator'))
204 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
206 if len(tld_extracts) == 1:
207 return os.path.join(path, tld_extracts[0])
210 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
211 def zipball_extract(zipball, path):
212 """Retrieve a zip archive from Keep and extract it to a local
213 directory. Return the absolute path where the archive was
214 extracted. If the top level of the archive contained just one
215 file or directory, return the absolute path of that single
218 zipball -- collection locator
219 path -- where to extract the archive: absolute, or relative to job tmp
221 from arvados import current_job
222 from arvados.collection import CollectionReader
223 if not re.search('^/', path):
224 path = os.path.join(current_job().tmpdir, path)
225 lockfile = open(path + '.lock', 'w')
226 fcntl.flock(lockfile, fcntl.LOCK_EX)
231 already_have_it = False
233 if os.readlink(os.path.join(path, '.locator')) == zipball:
234 already_have_it = True
237 if not already_have_it:
239 # emulate "rm -f" (i.e., if the file does not exist, we win)
241 os.unlink(os.path.join(path, '.locator'))
243 if os.path.exists(os.path.join(path, '.locator')):
244 os.unlink(os.path.join(path, '.locator'))
246 for f in CollectionReader(zipball).all_files():
247 if not f.name().endswith('.zip'):
248 raise arvados.errors.NotImplementedError(
249 "zipball_extract cannot handle filename %s" % f.name())
250 zip_filename = os.path.join(path, os.path.basename(f.name()))
251 zip_file = open(zip_filename, 'wb')
259 p = subprocess.Popen(["unzip",
264 stdin=None, stderr=sys.stderr,
265 shell=False, close_fds=True)
267 if p.returncode != 0:
269 raise arvados.errors.CommandFailedError(
270 "unzip exited %d" % p.returncode)
271 os.unlink(zip_filename)
272 os.symlink(zipball, os.path.join(path, '.locator'))
273 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
275 if len(tld_extracts) == 1:
276 return os.path.join(path, tld_extracts[0])
279 @_deprecated('3.0', 'arvados.collection.Collection')
280 def collection_extract(collection, path, files=[], decompress=True):
281 """Retrieve a collection from Keep and extract it to a local
282 directory. Return the absolute path where the collection was
285 collection -- collection locator
286 path -- where to extract: absolute, or relative to job tmp
288 from arvados import current_job
289 from arvados.collection import CollectionReader
290 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
292 collection_hash = matches.group(1)
294 collection_hash = hashlib.md5(collection).hexdigest()
295 if not re.search('^/', path):
296 path = os.path.join(current_job().tmpdir, path)
297 lockfile = open(path + '.lock', 'w')
298 fcntl.flock(lockfile, fcntl.LOCK_EX)
303 already_have_it = False
305 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
306 already_have_it = True
310 # emulate "rm -f" (i.e., if the file does not exist, we win)
312 os.unlink(os.path.join(path, '.locator'))
314 if os.path.exists(os.path.join(path, '.locator')):
315 os.unlink(os.path.join(path, '.locator'))
318 for s in CollectionReader(collection).all_streams():
319 stream_name = s.name()
320 for f in s.all_files():
322 ((f.name() not in files_got) and
323 (f.name() in files or
324 (decompress and f.decompressed_name() in files)))):
325 outname = f.decompressed_name() if decompress else f.name()
326 files_got += [outname]
327 if os.path.exists(os.path.join(path, stream_name, outname)):
329 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
330 outfile = open(os.path.join(path, stream_name, outname), 'wb')
331 for buf in (f.readall_decompressed() if decompress
335 if len(files_got) < len(files):
336 raise arvados.errors.AssertionError(
337 "Wanted files %s but only got %s from %s" %
339 [z.name() for z in CollectionReader(collection).all_files()]))
340 os.symlink(collection_hash, os.path.join(path, '.locator'))
345 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
346 def mkdir_dash_p(path):
347 if not os.path.isdir(path):
351 if e.errno == errno.EEXIST and os.path.isdir(path):
352 # It is not an error if someone else creates the
353 # directory between our exists() and makedirs() calls.
358 @_deprecated('3.0', 'arvados.collection.Collection')
359 def stream_extract(stream, path, files=[], decompress=True):
360 """Retrieve a stream from Keep and extract it to a local
361 directory. Return the absolute path where the stream was
364 stream -- StreamReader object
365 path -- where to extract: absolute, or relative to job tmp
367 from arvados import current_job
368 if not re.search('^/', path):
369 path = os.path.join(current_job().tmpdir, path)
370 lockfile = open(path + '.lock', 'w')
371 fcntl.flock(lockfile, fcntl.LOCK_EX)
378 for f in stream.all_files():
380 ((f.name() not in files_got) and
381 (f.name() in files or
382 (decompress and f.decompressed_name() in files)))):
383 outname = f.decompressed_name() if decompress else f.name()
384 files_got += [outname]
385 if os.path.exists(os.path.join(path, outname)):
386 os.unlink(os.path.join(path, outname))
387 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
388 outfile = open(os.path.join(path, outname), 'wb')
389 for buf in (f.readall_decompressed() if decompress
393 if len(files_got) < len(files):
394 raise arvados.errors.AssertionError(
395 "Wanted files %s but only got %s from %s" %
396 (files, files_got, [z.name() for z in stream.all_files()]))
400 @_deprecated('3.0', 'os.walk')
401 def listdir_recursive(dirname, base=None, max_depth=None):
402 """listdir_recursive(dirname, base, max_depth)
404 Return a list of file and directory names found under dirname.
406 If base is not None, prepend "{base}/" to each returned name.
408 If max_depth is None, descend into directories and return only the
409 names of files found in the directory tree.
411 If max_depth is a non-negative integer, stop descending into
412 directories at the given depth, and at that point return directory
415 If max_depth==0 (and base is None) this is equivalent to
416 sorted(os.listdir(dirname)).
419 for ent in sorted(os.listdir(dirname)):
420 ent_path = os.path.join(dirname, ent)
421 ent_base = os.path.join(base, ent) if base else ent
422 if os.path.isdir(ent_path) and max_depth != 0:
423 allfiles += listdir_recursive(
424 ent_path, base=ent_base,
425 max_depth=(max_depth-1 if max_depth else None))
427 allfiles += [ent_base]
430 def is_hex(s, *length_args):
431 """is_hex(s[, length[, max_length]]) -> boolean
433 Return True if s is a string of hexadecimal digits.
434 If one length argument is given, the string must contain exactly
435 that number of digits.
436 If two length arguments are given, the string must contain a number of
437 digits between those two lengths, inclusive.
438 Return False otherwise.
440 num_length_args = len(length_args)
441 if num_length_args > 2:
442 raise arvados.errors.ArgumentError(
443 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
444 elif num_length_args == 2:
445 good_len = (length_args[0] <= len(s) <= length_args[1])
446 elif num_length_args == 1:
447 good_len = (len(s) == length_args[0])
450 return bool(good_len and HEX_RE.match(s))
452 @_deprecated('3.0', 'arvados.util.keyset_list_all')
453 def list_all(fn, num_retries=0, **kwargs):
454 # Default limit to (effectively) api server's MAX_LIMIT
455 kwargs.setdefault('limit', sys.maxsize)
458 items_available = sys.maxsize
459 while len(items) < items_available:
460 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
462 items_available = c['items_available']
463 offset = c['offset'] + len(c['items'])
466 def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
468 kwargs["limit"] = pagesize
469 kwargs["count"] = 'none'
470 asc = "asc" if ascending else "desc"
471 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
472 other_filters = kwargs.get("filters", [])
475 select = set(kwargs['select'])
479 select.add(order_key)
481 kwargs['select'] = list(select)
485 expect_full_page = True
486 seen_prevpage = set()
487 seen_thispage = set()
489 prev_page_all_same_order_key = False
492 kwargs["filters"] = nextpage+other_filters
493 items = fn(**kwargs).execute(num_retries=num_retries)
495 if len(items["items"]) == 0:
496 if prev_page_all_same_order_key:
497 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
498 prev_page_all_same_order_key = False
503 seen_prevpage = seen_thispage
504 seen_thispage = set()
506 for i in items["items"]:
507 # In cases where there's more than one record with the
508 # same order key, the result could include records we
509 # already saw in the last page. Skip them.
510 if i["uuid"] in seen_prevpage:
512 seen_thispage.add(i["uuid"])
515 firstitem = items["items"][0]
516 lastitem = items["items"][-1]
518 if firstitem[order_key] == lastitem[order_key]:
519 # Got a page where every item has the same order key.
520 # Switch to using uuid for paging.
521 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
522 prev_page_all_same_order_key = True
524 # Start from the last order key seen, but skip the last
525 # known uuid to avoid retrieving the same row twice. If
526 # there are multiple rows with the same order key it is
527 # still likely we'll end up retrieving duplicate rows.
528 # That's handled by tracking the "seen" rows for each page
529 # so they can be skipped if they show up on the next page.
530 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
531 prev_page_all_same_order_key = False
533 def ca_certs_path(fallback=httplib2.CA_CERTS):
534 """Return the path of the best available CA certs source.
536 This function searches for various distribution sources of CA
537 certificates, and returns the first it finds. If it doesn't find any,
538 it returns the value of `fallback` (httplib2's CA certs by default).
540 for ca_certs_path in [
541 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
542 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
543 os.environ.get('SSL_CERT_FILE'),
545 '/etc/arvados/ca-certificates.crt',
547 '/etc/ssl/certs/ca-certificates.crt',
549 '/etc/pki/tls/certs/ca-bundle.crt',
551 if ca_certs_path and os.path.exists(ca_certs_path):
555 def new_request_id():
557 # 2**104 > 36**20 > 2**103
558 n = random.getrandbits(104)
562 rid += chr(c+ord('0'))
564 rid += chr(c+ord('a')-10)
568 def get_config_once(svc):
569 if not svc._rootDesc.get('resources').get('configs', False):
570 # Old API server version, no config export endpoint
572 if not hasattr(svc, '_cached_config'):
573 svc._cached_config = svc.configs().get().execute()
574 return svc._cached_config
576 def get_vocabulary_once(svc):
577 if not svc._rootDesc.get('resources').get('vocabularies', False):
578 # Old API server version, no vocabulary export endpoint
580 if not hasattr(svc, '_cached_vocabulary'):
581 svc._cached_vocabulary = svc.vocabularies().get().execute()
582 return svc._cached_vocabulary
584 def trim_name(collectionname):
586 trim_name takes a record name (collection name, project name, etc)
587 and trims it to fit the 255 character name limit, with additional
588 space for the timestamp added by ensure_unique_name, by removing
589 excess characters from the middle and inserting an ellipse
592 max_name_len = 254 - 28
594 if len(collectionname) > max_name_len:
595 over = len(collectionname) - max_name_len
596 split = int(max_name_len/2)
597 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
599 return collectionname