1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
28 from pathlib import Path, PurePath
42 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
43 """Regular expression to match a hexadecimal string (case-insensitive)"""
44 CR_UNCOMMITTED = 'Uncommitted'
45 """Constant `state` value for uncommited container requests"""
46 CR_COMMITTED = 'Committed'
47 """Constant `state` value for committed container requests"""
49 """Constant `state` value for finalized container requests"""
51 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
52 """Regular expression to match any Keep block locator"""
53 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
54 """Regular expression to match any Keep block locator with an access token hint"""
55 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
56 """Regular expression to match any collection portable data hash"""
57 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
58 """Regular expression to match an Arvados collection manifest text"""
59 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
60 """Regular expression to match a file path from a collection identified by portable data hash"""
61 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
62 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
64 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
65 """Regular expression to match any Arvados object UUID"""
66 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
67 """Regular expression to match any Arvados collection UUID"""
68 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
69 """Regular expression to match any Arvados container UUID"""
70 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
71 """Regular expression to match any Arvados group UUID"""
72 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
73 """Regular expression to match any Arvados link UUID"""
74 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
75 """Regular expression to match any Arvados user UUID"""
76 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
77 """Regular expression to match any Arvados job UUID
79 .. WARNING:: Deprecated
80 Arvados job resources are deprecated and will be removed in a future
81 release. Prefer the containers API instead.
84 def _deprecated(version=None, preferred=None):
85 """Mark a callable as deprecated in the SDK
87 This will wrap the callable to emit as a DeprecationWarning
88 and add a deprecation notice to its docstring.
90 If the following arguments are given, they'll be included in the
93 * preferred: str | None --- The name of an alternative that users should
96 * version: str | None --- The version of Arvados when the callable is
97 scheduled to be removed.
102 version = f' and scheduled to be removed in Arvados {version}'
103 if preferred is None:
106 preferred = f' Prefer {preferred} instead.'
107 def deprecated_decorator(func):
108 fullname = f'{func.__module__}.{func.__qualname__}'
109 parent, _, name = fullname.rpartition('.')
110 if name == '__init__':
112 warning_msg = f'{fullname} is deprecated{version}.{preferred}'
113 @functools.wraps(func)
114 def deprecated_wrapper(*args, **kwargs):
115 warnings.warn(warning_msg, DeprecationWarning, 2)
116 return func(*args, **kwargs)
117 # Get func's docstring without any trailing newline or empty lines.
118 func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
119 match = re.search(r'\n([ \t]+)\S', func_doc)
120 indent = '' if match is None else match.group(1)
121 warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent} {warning_msg}'
122 # Make the deprecation notice the second "paragraph" of the
123 # docstring if possible. Otherwise append it.
124 docstring, count = re.subn(
125 rf'\n[ \t]*\n{indent}',
126 f'{warning_doc}\n\n{indent}',
131 docstring = f'{func_doc.lstrip()}{warning_doc}'
132 deprecated_wrapper.__doc__ = docstring
133 return deprecated_wrapper
134 return deprecated_decorator
136 @dataclasses.dataclass
137 class _BaseDirectorySpec:
138 """Parse base directories
140 A _BaseDirectorySpec defines all the environment variable keys and defaults
141 related to a set of base directories (cache, config, state, etc.). It
142 provides pure methods to parse environment settings into valid paths.
146 xdg_home_default: PurePath
147 xdg_dirs_key: Optional[str] = None
148 xdg_dirs_default: str = ''
151 def _abspath_from_env(env: Mapping[str, str], key: str) -> Optional[Path]:
153 path = Path(env[key])
154 except (KeyError, ValueError):
157 ok = path.is_absolute()
158 return path if ok else None
161 def _iter_abspaths(value: str) -> Iterator[Path]:
162 for path_s in value.split(':'):
164 if path.is_absolute():
167 def iter_systemd(self, env: Mapping[str, str]) -> Iterator[Path]:
168 return self._iter_abspaths(env.get(self.systemd_key, ''))
170 def iter_xdg(self, env: Mapping[str, str], subdir: PurePath) -> Iterator[Path]:
171 yield self.xdg_home(env, subdir)
172 if self.xdg_dirs_key is not None:
173 for path in self._iter_abspaths(env.get(self.xdg_dirs_key) or self.xdg_dirs_default):
176 def xdg_home(self, env: Mapping[str, str], subdir: PurePath) -> Path:
177 home_path = self._abspath_from_env(env, self.xdg_home_key)
178 if home_path is None:
179 home_path = self._abspath_from_env(env, 'HOME') or Path.home()
180 home_path /= self.xdg_home_default
181 return home_path / subdir
184 class _BaseDirectorySpecs(enum.Enum):
185 """Base directory specifications
187 This enum provides easy access to the standard base directory settings.
189 CACHE = _BaseDirectorySpec(
194 CONFIG = _BaseDirectorySpec(
195 'CONFIGURATION_DIRECTORY',
201 STATE = _BaseDirectorySpec(
204 PurePath('.local', 'state'),
208 class _BaseDirectories:
209 """Resolve paths from a base directory spec
211 Given a _BaseDirectorySpec, this class provides stateful methods to find
212 existing files and return the most-preferred directory for writing.
214 _STORE_MODE = stat.S_IFDIR | stat.S_IWUSR
218 spec: Union[_BaseDirectorySpec, _BaseDirectorySpecs, str],
219 env: Mapping[str, str]=os.environ,
220 xdg_subdir: Union[os.PathLike, str]='arvados',
222 if isinstance(spec, str):
223 spec = _BaseDirectorySpecs[spec].value
224 elif isinstance(spec, _BaseDirectorySpecs):
228 self._xdg_subdir = PurePath(xdg_subdir)
230 def search(self, name: str) -> Iterator[Path]:
231 for search_path in itertools.chain(
232 self._spec.iter_systemd(self._env),
233 self._spec.iter_xdg(self._env, self._xdg_subdir),
235 path = search_path / name
239 def storage_path(self) -> Path:
240 for path in self._spec.iter_systemd(self._env):
242 mode = path.stat().st_mode
245 if (mode & self._STORE_MODE) == self._STORE_MODE:
247 path = self._spec.xdg_home(self._env, self._xdg_subdir)
248 path.mkdir(parents=True, exist_ok=True)
252 def is_hex(s: str, *length_args: int) -> bool:
253 """Indicate whether a string is a hexadecimal number
255 This method returns true if all characters in the string are hexadecimal
256 digits. It is case-insensitive.
258 You can also pass optional length arguments to check that the string has
259 the expected number of digits. If you pass one integer, the string must
260 have that length exactly, otherwise the method returns False. If you
261 pass two integers, the string's length must fall within that minimum and
262 maximum (inclusive), otherwise the method returns False.
266 * s: str --- The string to check
268 * length_args: int --- Optional length limit(s) for the string to check
270 num_length_args = len(length_args)
271 if num_length_args > 2:
272 raise arvados.errors.ArgumentError(
273 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
274 elif num_length_args == 2:
275 good_len = (length_args[0] <= len(s) <= length_args[1])
276 elif num_length_args == 1:
277 good_len = (len(s) == length_args[0])
280 return bool(good_len and HEX_RE.match(s))
283 fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
284 order_key: str="created_at",
286 ascending: bool=True,
288 ) -> Iterator[Dict[str, Any]]:
289 """Iterate all Arvados resources from an API list call
291 This method takes a method that represents an Arvados API list call, and
292 iterates the objects returned by the API server. It can make multiple API
293 calls to retrieve and iterate all objects available from the API server.
297 * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
298 function that wraps an Arvados API method that returns a list of
299 objects. If you have an Arvados API client named `arv`, examples
300 include `arv.collections().list` and `arv.groups().contents`. Note
301 that you should pass the function *without* calling it.
303 * order_key: str --- The name of the primary object field that objects
304 should be sorted by. This name is used to build an `order` argument
305 for `fn`. Default `'created_at'`.
307 * num_retries: int --- This argument is passed through to
308 `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
309 that method's docstring for details. Default 0 (meaning API calls will
310 use the `num_retries` value set when the Arvados API client was
313 * ascending: bool --- Used to build an `order` argument for `fn`. If True,
314 all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
315 fields will be sorted in `'desc'` (descending) order.
317 Additional keyword arguments will be passed directly to `fn` for each API
318 call. Note that this function sets `count`, `limit`, and `order` as part of
322 kwargs["limit"] = pagesize
323 kwargs["count"] = 'none'
324 asc = "asc" if ascending else "desc"
325 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
326 other_filters = kwargs.get("filters", [])
329 select = set(kwargs['select'])
333 select.add(order_key)
335 kwargs['select'] = list(select)
339 expect_full_page = True
340 seen_prevpage = set()
341 seen_thispage = set()
343 prev_page_all_same_order_key = False
346 kwargs["filters"] = nextpage+other_filters
347 items = fn(**kwargs).execute(num_retries=num_retries)
349 if len(items["items"]) == 0:
350 if prev_page_all_same_order_key:
351 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
352 prev_page_all_same_order_key = False
357 seen_prevpage = seen_thispage
358 seen_thispage = set()
360 for i in items["items"]:
361 # In cases where there's more than one record with the
362 # same order key, the result could include records we
363 # already saw in the last page. Skip them.
364 if i["uuid"] in seen_prevpage:
366 seen_thispage.add(i["uuid"])
369 firstitem = items["items"][0]
370 lastitem = items["items"][-1]
372 if firstitem[order_key] == lastitem[order_key]:
373 # Got a page where every item has the same order key.
374 # Switch to using uuid for paging.
375 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
376 prev_page_all_same_order_key = True
378 # Start from the last order key seen, but skip the last
379 # known uuid to avoid retrieving the same row twice. If
380 # there are multiple rows with the same order key it is
381 # still likely we'll end up retrieving duplicate rows.
382 # That's handled by tracking the "seen" rows for each page
383 # so they can be skipped if they show up on the next page.
384 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
385 prev_page_all_same_order_key = False
387 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
388 """Return the path of the best available source of CA certificates
390 This function checks various known paths that provide trusted CA
391 certificates, and returns the first one that exists. It checks:
393 * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
394 * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
395 * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
397 * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
400 If none of these paths exist, this function returns the value of `fallback`.
404 * fallback: T --- The value to return if none of the known paths exist.
405 The default value is the certificate store of Mozilla's trusted CAs
406 included with the Python [certifi][] package.
408 [certifi]: https://pypi.org/project/certifi/
410 for ca_certs_path in [
411 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
412 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
413 os.environ.get('SSL_CERT_FILE'),
415 '/etc/arvados/ca-certificates.crt',
417 '/etc/ssl/certs/ca-certificates.crt',
419 '/etc/pki/tls/certs/ca-bundle.crt',
421 if ca_certs_path and os.path.exists(ca_certs_path):
425 def new_request_id() -> str:
426 """Return a random request ID
428 This function generates and returns a random string suitable for use as a
429 `X-Request-Id` header value in the Arvados API.
432 # 2**104 > 36**20 > 2**103
433 n = random.getrandbits(104)
437 rid += chr(c+ord('0'))
439 rid += chr(c+ord('a')-10)
443 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
444 """Return an Arvados cluster's configuration, with caching
446 This function gets and returns the Arvados configuration from the API
447 server. It caches the result on the client object and reuses it on any
452 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
453 object to use to retrieve and cache the Arvados cluster configuration.
455 if not svc._rootDesc.get('resources').get('configs', False):
456 # Old API server version, no config export endpoint
458 if not hasattr(svc, '_cached_config'):
459 svc._cached_config = svc.configs().get().execute()
460 return svc._cached_config
462 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
463 """Return an Arvados cluster's vocabulary, with caching
465 This function gets and returns the Arvados vocabulary from the API
466 server. It caches the result on the client object and reuses it on any
469 .. HINT:: Low-level method
470 This is a relatively low-level wrapper around the Arvados API. Most
471 users will prefer to use `arvados.vocabulary.load_vocabulary`.
475 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
476 object to use to retrieve and cache the Arvados cluster vocabulary.
478 if not svc._rootDesc.get('resources').get('vocabularies', False):
479 # Old API server version, no vocabulary export endpoint
481 if not hasattr(svc, '_cached_vocabulary'):
482 svc._cached_vocabulary = svc.vocabularies().get().execute()
483 return svc._cached_vocabulary
485 def trim_name(collectionname: str) -> str:
486 """Limit the length of a name to fit within Arvados API limits
488 This function ensures that a string is short enough to use as an object
489 name in the Arvados API, leaving room for text that may be added by the
490 `ensure_unique_name` argument. If the source name is short enough, it is
491 returned unchanged. Otherwise, this function returns a string with excess
492 characters removed from the middle of the source string and replaced with
497 * collectionname: str --- The desired source name
499 max_name_len = 254 - 28
501 if len(collectionname) > max_name_len:
502 over = len(collectionname) - max_name_len
503 split = int(max_name_len/2)
504 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
506 return collectionname
508 @_deprecated('3.0', 'arvados.util.keyset_list_all')
509 def list_all(fn, num_retries=0, **kwargs):
510 # Default limit to (effectively) api server's MAX_LIMIT
511 kwargs.setdefault('limit', sys.maxsize)
514 items_available = sys.maxsize
515 while len(items) < items_available:
516 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
518 items_available = c['items_available']
519 offset = c['offset'] + len(c['items'])
523 def clear_tmpdir(path=None):
525 Ensure the given directory (or TASK_TMPDIR if none given)
528 from arvados import current_task
530 path = current_task().tmpdir
531 if os.path.exists(path):
532 p = subprocess.Popen(['rm', '-rf', path])
533 stdout, stderr = p.communicate(None)
534 if p.returncode != 0:
535 raise Exception('rm -rf %s: %s' % (path, stderr))
538 @_deprecated('3.0', 'subprocess.run')
539 def run_command(execargs, **kwargs):
540 kwargs.setdefault('stdin', subprocess.PIPE)
541 kwargs.setdefault('stdout', subprocess.PIPE)
542 kwargs.setdefault('stderr', sys.stderr)
543 kwargs.setdefault('close_fds', True)
544 kwargs.setdefault('shell', False)
545 p = subprocess.Popen(execargs, **kwargs)
546 stdoutdata, stderrdata = p.communicate(None)
547 if p.returncode != 0:
548 raise arvados.errors.CommandFailedError(
549 "run_command %s exit %d:\n%s" %
550 (execargs, p.returncode, stderrdata))
551 return stdoutdata, stderrdata
554 def git_checkout(url, version, path):
555 from arvados import current_job
556 if not re.search('^/', path):
557 path = os.path.join(current_job().tmpdir, path)
558 if not os.path.exists(path):
559 run_command(["git", "clone", url, path],
560 cwd=os.path.dirname(path))
561 run_command(["git", "checkout", version],
566 def tar_extractor(path, decompress_flag):
567 return subprocess.Popen(["tar",
569 ("-x%sf" % decompress_flag),
572 stdin=subprocess.PIPE, stderr=sys.stderr,
573 shell=False, close_fds=True)
575 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
576 def tarball_extract(tarball, path):
577 """Retrieve a tarball from Keep and extract it to a local
578 directory. Return the absolute path where the tarball was
579 extracted. If the top level of the tarball contained just one
580 file or directory, return the absolute path of that single
583 tarball -- collection locator
584 path -- where to extract the tarball: absolute, or relative to job tmp
586 from arvados import current_job
587 from arvados.collection import CollectionReader
588 if not re.search('^/', path):
589 path = os.path.join(current_job().tmpdir, path)
590 lockfile = open(path + '.lock', 'w')
591 fcntl.flock(lockfile, fcntl.LOCK_EX)
596 already_have_it = False
598 if os.readlink(os.path.join(path, '.locator')) == tarball:
599 already_have_it = True
602 if not already_have_it:
604 # emulate "rm -f" (i.e., if the file does not exist, we win)
606 os.unlink(os.path.join(path, '.locator'))
608 if os.path.exists(os.path.join(path, '.locator')):
609 os.unlink(os.path.join(path, '.locator'))
611 for f in CollectionReader(tarball).all_files():
613 if f_name.endswith(('.tbz', '.tar.bz2')):
614 p = tar_extractor(path, 'j')
615 elif f_name.endswith(('.tgz', '.tar.gz')):
616 p = tar_extractor(path, 'z')
617 elif f_name.endswith('.tar'):
618 p = tar_extractor(path, '')
620 raise arvados.errors.AssertionError(
621 "tarball_extract cannot handle filename %s" % f.name())
629 if p.returncode != 0:
631 raise arvados.errors.CommandFailedError(
632 "tar exited %d" % p.returncode)
633 os.symlink(tarball, os.path.join(path, '.locator'))
634 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
636 if len(tld_extracts) == 1:
637 return os.path.join(path, tld_extracts[0])
640 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
641 def zipball_extract(zipball, path):
642 """Retrieve a zip archive from Keep and extract it to a local
643 directory. Return the absolute path where the archive was
644 extracted. If the top level of the archive contained just one
645 file or directory, return the absolute path of that single
648 zipball -- collection locator
649 path -- where to extract the archive: absolute, or relative to job tmp
651 from arvados import current_job
652 from arvados.collection import CollectionReader
653 if not re.search('^/', path):
654 path = os.path.join(current_job().tmpdir, path)
655 lockfile = open(path + '.lock', 'w')
656 fcntl.flock(lockfile, fcntl.LOCK_EX)
661 already_have_it = False
663 if os.readlink(os.path.join(path, '.locator')) == zipball:
664 already_have_it = True
667 if not already_have_it:
669 # emulate "rm -f" (i.e., if the file does not exist, we win)
671 os.unlink(os.path.join(path, '.locator'))
673 if os.path.exists(os.path.join(path, '.locator')):
674 os.unlink(os.path.join(path, '.locator'))
676 for f in CollectionReader(zipball).all_files():
677 if not f.name().endswith('.zip'):
678 raise arvados.errors.NotImplementedError(
679 "zipball_extract cannot handle filename %s" % f.name())
680 zip_filename = os.path.join(path, os.path.basename(f.name()))
681 zip_file = open(zip_filename, 'wb')
689 p = subprocess.Popen(["unzip",
694 stdin=None, stderr=sys.stderr,
695 shell=False, close_fds=True)
697 if p.returncode != 0:
699 raise arvados.errors.CommandFailedError(
700 "unzip exited %d" % p.returncode)
701 os.unlink(zip_filename)
702 os.symlink(zipball, os.path.join(path, '.locator'))
703 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
705 if len(tld_extracts) == 1:
706 return os.path.join(path, tld_extracts[0])
709 @_deprecated('3.0', 'arvados.collection.Collection')
710 def collection_extract(collection, path, files=[], decompress=True):
711 """Retrieve a collection from Keep and extract it to a local
712 directory. Return the absolute path where the collection was
715 collection -- collection locator
716 path -- where to extract: absolute, or relative to job tmp
718 from arvados import current_job
719 from arvados.collection import CollectionReader
720 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
722 collection_hash = matches.group(1)
724 collection_hash = hashlib.md5(collection).hexdigest()
725 if not re.search('^/', path):
726 path = os.path.join(current_job().tmpdir, path)
727 lockfile = open(path + '.lock', 'w')
728 fcntl.flock(lockfile, fcntl.LOCK_EX)
733 already_have_it = False
735 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
736 already_have_it = True
740 # emulate "rm -f" (i.e., if the file does not exist, we win)
742 os.unlink(os.path.join(path, '.locator'))
744 if os.path.exists(os.path.join(path, '.locator')):
745 os.unlink(os.path.join(path, '.locator'))
748 for s in CollectionReader(collection).all_streams():
749 stream_name = s.name()
750 for f in s.all_files():
752 ((f.name() not in files_got) and
753 (f.name() in files or
754 (decompress and f.decompressed_name() in files)))):
755 outname = f.decompressed_name() if decompress else f.name()
756 files_got += [outname]
757 if os.path.exists(os.path.join(path, stream_name, outname)):
759 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
760 outfile = open(os.path.join(path, stream_name, outname), 'wb')
761 for buf in (f.readall_decompressed() if decompress
765 if len(files_got) < len(files):
766 raise arvados.errors.AssertionError(
767 "Wanted files %s but only got %s from %s" %
769 [z.name() for z in CollectionReader(collection).all_files()]))
770 os.symlink(collection_hash, os.path.join(path, '.locator'))
775 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
776 def mkdir_dash_p(path):
777 if not os.path.isdir(path):
781 if e.errno == errno.EEXIST and os.path.isdir(path):
782 # It is not an error if someone else creates the
783 # directory between our exists() and makedirs() calls.
788 @_deprecated('3.0', 'arvados.collection.Collection')
789 def stream_extract(stream, path, files=[], decompress=True):
790 """Retrieve a stream from Keep and extract it to a local
791 directory. Return the absolute path where the stream was
794 stream -- StreamReader object
795 path -- where to extract: absolute, or relative to job tmp
797 from arvados import current_job
798 if not re.search('^/', path):
799 path = os.path.join(current_job().tmpdir, path)
800 lockfile = open(path + '.lock', 'w')
801 fcntl.flock(lockfile, fcntl.LOCK_EX)
808 for f in stream.all_files():
810 ((f.name() not in files_got) and
811 (f.name() in files or
812 (decompress and f.decompressed_name() in files)))):
813 outname = f.decompressed_name() if decompress else f.name()
814 files_got += [outname]
815 if os.path.exists(os.path.join(path, outname)):
816 os.unlink(os.path.join(path, outname))
817 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
818 outfile = open(os.path.join(path, outname), 'wb')
819 for buf in (f.readall_decompressed() if decompress
823 if len(files_got) < len(files):
824 raise arvados.errors.AssertionError(
825 "Wanted files %s but only got %s from %s" %
826 (files, files_got, [z.name() for z in stream.all_files()]))
830 @_deprecated('3.0', 'os.walk')
831 def listdir_recursive(dirname, base=None, max_depth=None):
832 """listdir_recursive(dirname, base, max_depth)
834 Return a list of file and directory names found under dirname.
836 If base is not None, prepend "{base}/" to each returned name.
838 If max_depth is None, descend into directories and return only the
839 names of files found in the directory tree.
841 If max_depth is a non-negative integer, stop descending into
842 directories at the given depth, and at that point return directory
845 If max_depth==0 (and base is None) this is equivalent to
846 sorted(os.listdir(dirname)).
849 for ent in sorted(os.listdir(dirname)):
850 ent_path = os.path.join(dirname, ent)
851 ent_base = os.path.join(base, ent) if base else ent
852 if os.path.isdir(ent_path) and max_depth != 0:
853 allfiles += listdir_recursive(
854 ent_path, base=ent_base,
855 max_depth=(max_depth-1 if max_depth else None))
857 allfiles += [ent_base]