1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
28 from pathlib import Path, PurePath
42 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
43 """Regular expression to match a hexadecimal string (case-insensitive)"""
44 CR_UNCOMMITTED = 'Uncommitted'
45 """Constant `state` value for uncommited container requests"""
46 CR_COMMITTED = 'Committed'
47 """Constant `state` value for committed container requests"""
49 """Constant `state` value for finalized container requests"""
51 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
52 """Regular expression to match any Keep block locator"""
53 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
54 """Regular expression to match any Keep block locator with an access token hint"""
55 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
56 """Regular expression to match any collection portable data hash"""
57 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
58 """Regular expression to match an Arvados collection manifest text"""
59 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
60 """Regular expression to match a file path from a collection identified by portable data hash"""
61 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
62 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
64 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
65 """Regular expression to match any Arvados object UUID"""
66 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
67 """Regular expression to match any Arvados collection UUID"""
68 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
69 """Regular expression to match any Arvados container UUID"""
70 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
71 """Regular expression to match any Arvados group UUID"""
72 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
73 """Regular expression to match any Arvados link UUID"""
74 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
75 """Regular expression to match any Arvados user UUID"""
76 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
77 """Regular expression to match any Arvados job UUID
79 .. WARNING:: Deprecated
80 Arvados job resources are deprecated and will be removed in a future
81 release. Prefer the containers API instead.
84 def _deprecated(version=None, preferred=None):
85 """Mark a callable as deprecated in the SDK
87 This will wrap the callable to emit as a DeprecationWarning
88 and add a deprecation notice to its docstring.
90 If the following arguments are given, they'll be included in the
93 * preferred: str | None --- The name of an alternative that users should
96 * version: str | None --- The version of Arvados when the callable is
97 scheduled to be removed.
102 version = f' and scheduled to be removed in Arvados {version}'
103 if preferred is None:
106 preferred = f' Prefer {preferred} instead.'
107 def deprecated_decorator(func):
108 fullname = f'{func.__module__}.{func.__qualname__}'
109 parent, _, name = fullname.rpartition('.')
110 if name == '__init__':
112 warning_msg = f'{fullname} is deprecated{version}.{preferred}'
113 @functools.wraps(func)
114 def deprecated_wrapper(*args, **kwargs):
115 warnings.warn(warning_msg, DeprecationWarning, 2)
116 return func(*args, **kwargs)
117 # Get func's docstring without any trailing newline or empty lines.
118 func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
119 match = re.search(r'\n([ \t]+)\S', func_doc)
120 indent = '' if match is None else match.group(1)
121 warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent} {warning_msg}'
122 # Make the deprecation notice the second "paragraph" of the
123 # docstring if possible. Otherwise append it.
124 docstring, count = re.subn(
125 rf'\n[ \t]*\n{indent}',
126 f'{warning_doc}\n\n{indent}',
131 docstring = f'{func_doc.lstrip()}{warning_doc}'
132 deprecated_wrapper.__doc__ = docstring
133 return deprecated_wrapper
134 return deprecated_decorator
136 @dataclasses.dataclass
137 class _BaseDirectorySpec:
138 """Parse base directories
140 A _BaseDirectorySpec defines all the environment variable keys and defaults
141 related to a set of base directories (cache, config, state, etc.). It
142 provides pure methods to parse environment settings into valid paths.
146 xdg_home_default: PurePath
147 xdg_dirs_key: Optional[str] = None
148 xdg_dirs_default: str = ''
151 def _abspath_from_env(env: Mapping[str, str], key: str) -> Optional[Path]:
153 path = Path(env[key])
154 except (KeyError, ValueError):
157 ok = path.is_absolute()
158 return path if ok else None
161 def _iter_abspaths(value: str) -> Iterator[Path]:
162 for path_s in value.split(':'):
164 if path.is_absolute():
167 def iter_systemd(self, env: Mapping[str, str]) -> Iterator[Path]:
168 return self._iter_abspaths(env.get(self.systemd_key, ''))
170 def iter_xdg(self, env: Mapping[str, str], subdir: PurePath) -> Iterator[Path]:
171 yield self.xdg_home(env, subdir)
172 if self.xdg_dirs_key is not None:
173 for path in self._iter_abspaths(env.get(self.xdg_dirs_key) or self.xdg_dirs_default):
176 def xdg_home(self, env: Mapping[str, str], subdir: PurePath) -> Path:
177 home_path = self._abspath_from_env(env, self.xdg_home_key)
178 if home_path is None:
179 home_path = self._abspath_from_env(env, 'HOME') or Path.home()
180 home_path /= self.xdg_home_default
181 return home_path / subdir
184 class _BaseDirectorySpecs(enum.Enum):
185 """Base directory specifications
187 This enum provides easy access to the standard base directory settings.
189 CACHE = _BaseDirectorySpec(
194 CONFIG = _BaseDirectorySpec(
195 'CONFIGURATION_DIRECTORY',
201 STATE = _BaseDirectorySpec(
204 PurePath('.local', 'state'),
208 class _BaseDirectories:
209 """Resolve paths from a base directory spec
211 Given a _BaseDirectorySpec, this class provides stateful methods to find
212 existing files and return the most-preferred directory for writing.
214 _STORE_MODE = stat.S_IFDIR | stat.S_IWUSR
218 spec: Union[_BaseDirectorySpec, _BaseDirectorySpecs, str],
219 env: Mapping[str, str]=os.environ,
220 xdg_subdir: Union[os.PathLike, str]='arvados',
222 if isinstance(spec, str):
223 spec = _BaseDirectorySpecs[spec].value
224 elif isinstance(spec, _BaseDirectorySpecs):
228 self._xdg_subdir = PurePath(xdg_subdir)
230 def search(self, name: str) -> Iterator[Path]:
231 for search_path in itertools.chain(
232 self._spec.iter_systemd(self._env),
233 self._spec.iter_xdg(self._env, self._xdg_subdir),
235 path = search_path / name
241 subdir: Union[str, os.PathLike]=PurePath(),
244 for path in self._spec.iter_systemd(self._env):
246 mode = path.stat().st_mode
249 if (mode & self._STORE_MODE) == self._STORE_MODE:
252 path = self._spec.xdg_home(self._env, self._xdg_subdir)
254 path.mkdir(parents=True, exist_ok=True, mode=mode)
258 def is_hex(s: str, *length_args: int) -> bool:
259 """Indicate whether a string is a hexadecimal number
261 This method returns true if all characters in the string are hexadecimal
262 digits. It is case-insensitive.
264 You can also pass optional length arguments to check that the string has
265 the expected number of digits. If you pass one integer, the string must
266 have that length exactly, otherwise the method returns False. If you
267 pass two integers, the string's length must fall within that minimum and
268 maximum (inclusive), otherwise the method returns False.
272 * s: str --- The string to check
274 * length_args: int --- Optional length limit(s) for the string to check
276 num_length_args = len(length_args)
277 if num_length_args > 2:
278 raise arvados.errors.ArgumentError(
279 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
280 elif num_length_args == 2:
281 good_len = (length_args[0] <= len(s) <= length_args[1])
282 elif num_length_args == 1:
283 good_len = (len(s) == length_args[0])
286 return bool(good_len and HEX_RE.match(s))
289 fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
290 order_key: str="created_at",
292 ascending: bool=True,
294 ) -> Iterator[Dict[str, Any]]:
295 """Iterate all Arvados resources from an API list call
297 This method takes a method that represents an Arvados API list call, and
298 iterates the objects returned by the API server. It can make multiple API
299 calls to retrieve and iterate all objects available from the API server.
303 * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
304 function that wraps an Arvados API method that returns a list of
305 objects. If you have an Arvados API client named `arv`, examples
306 include `arv.collections().list` and `arv.groups().contents`. Note
307 that you should pass the function *without* calling it.
309 * order_key: str --- The name of the primary object field that objects
310 should be sorted by. This name is used to build an `order` argument
311 for `fn`. Default `'created_at'`.
313 * num_retries: int --- This argument is passed through to
314 `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
315 that method's docstring for details. Default 0 (meaning API calls will
316 use the `num_retries` value set when the Arvados API client was
319 * ascending: bool --- Used to build an `order` argument for `fn`. If True,
320 all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
321 fields will be sorted in `'desc'` (descending) order.
323 Additional keyword arguments will be passed directly to `fn` for each API
324 call. Note that this function sets `count`, `limit`, and `order` as part of
328 kwargs["limit"] = pagesize
329 kwargs["count"] = 'none'
330 asc = "asc" if ascending else "desc"
331 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
332 other_filters = kwargs.get("filters", [])
335 select = set(kwargs['select'])
339 select.add(order_key)
341 kwargs['select'] = list(select)
345 expect_full_page = True
346 seen_prevpage = set()
347 seen_thispage = set()
349 prev_page_all_same_order_key = False
352 kwargs["filters"] = nextpage+other_filters
353 items = fn(**kwargs).execute(num_retries=num_retries)
355 if len(items["items"]) == 0:
356 if prev_page_all_same_order_key:
357 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
358 prev_page_all_same_order_key = False
363 seen_prevpage = seen_thispage
364 seen_thispage = set()
366 for i in items["items"]:
367 # In cases where there's more than one record with the
368 # same order key, the result could include records we
369 # already saw in the last page. Skip them.
370 if i["uuid"] in seen_prevpage:
372 seen_thispage.add(i["uuid"])
375 firstitem = items["items"][0]
376 lastitem = items["items"][-1]
378 if firstitem[order_key] == lastitem[order_key]:
379 # Got a page where every item has the same order key.
380 # Switch to using uuid for paging.
381 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
382 prev_page_all_same_order_key = True
384 # Start from the last order key seen, but skip the last
385 # known uuid to avoid retrieving the same row twice. If
386 # there are multiple rows with the same order key it is
387 # still likely we'll end up retrieving duplicate rows.
388 # That's handled by tracking the "seen" rows for each page
389 # so they can be skipped if they show up on the next page.
390 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
391 prev_page_all_same_order_key = False
393 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
394 """Return the path of the best available source of CA certificates
396 This function checks various known paths that provide trusted CA
397 certificates, and returns the first one that exists. It checks:
399 * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
400 * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
401 * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
403 * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
406 If none of these paths exist, this function returns the value of `fallback`.
410 * fallback: T --- The value to return if none of the known paths exist.
411 The default value is the certificate store of Mozilla's trusted CAs
412 included with the Python [certifi][] package.
414 [certifi]: https://pypi.org/project/certifi/
416 for ca_certs_path in [
417 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
418 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
419 os.environ.get('SSL_CERT_FILE'),
421 '/etc/arvados/ca-certificates.crt',
423 '/etc/ssl/certs/ca-certificates.crt',
425 '/etc/pki/tls/certs/ca-bundle.crt',
427 if ca_certs_path and os.path.exists(ca_certs_path):
431 def new_request_id() -> str:
432 """Return a random request ID
434 This function generates and returns a random string suitable for use as a
435 `X-Request-Id` header value in the Arvados API.
438 # 2**104 > 36**20 > 2**103
439 n = random.getrandbits(104)
443 rid += chr(c+ord('0'))
445 rid += chr(c+ord('a')-10)
449 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
450 """Return an Arvados cluster's configuration, with caching
452 This function gets and returns the Arvados configuration from the API
453 server. It caches the result on the client object and reuses it on any
458 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
459 object to use to retrieve and cache the Arvados cluster configuration.
461 if not svc._rootDesc.get('resources').get('configs', False):
462 # Old API server version, no config export endpoint
464 if not hasattr(svc, '_cached_config'):
465 svc._cached_config = svc.configs().get().execute()
466 return svc._cached_config
468 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
469 """Return an Arvados cluster's vocabulary, with caching
471 This function gets and returns the Arvados vocabulary from the API
472 server. It caches the result on the client object and reuses it on any
475 .. HINT:: Low-level method
476 This is a relatively low-level wrapper around the Arvados API. Most
477 users will prefer to use `arvados.vocabulary.load_vocabulary`.
481 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
482 object to use to retrieve and cache the Arvados cluster vocabulary.
484 if not svc._rootDesc.get('resources').get('vocabularies', False):
485 # Old API server version, no vocabulary export endpoint
487 if not hasattr(svc, '_cached_vocabulary'):
488 svc._cached_vocabulary = svc.vocabularies().get().execute()
489 return svc._cached_vocabulary
491 def trim_name(collectionname: str) -> str:
492 """Limit the length of a name to fit within Arvados API limits
494 This function ensures that a string is short enough to use as an object
495 name in the Arvados API, leaving room for text that may be added by the
496 `ensure_unique_name` argument. If the source name is short enough, it is
497 returned unchanged. Otherwise, this function returns a string with excess
498 characters removed from the middle of the source string and replaced with
503 * collectionname: str --- The desired source name
505 max_name_len = 254 - 28
507 if len(collectionname) > max_name_len:
508 over = len(collectionname) - max_name_len
509 split = int(max_name_len/2)
510 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
512 return collectionname
514 @_deprecated('3.0', 'arvados.util.keyset_list_all')
515 def list_all(fn, num_retries=0, **kwargs):
516 # Default limit to (effectively) api server's MAX_LIMIT
517 kwargs.setdefault('limit', sys.maxsize)
520 items_available = sys.maxsize
521 while len(items) < items_available:
522 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
524 items_available = c['items_available']
525 offset = c['offset'] + len(c['items'])
529 def clear_tmpdir(path=None):
531 Ensure the given directory (or TASK_TMPDIR if none given)
534 from arvados import current_task
536 path = current_task().tmpdir
537 if os.path.exists(path):
538 p = subprocess.Popen(['rm', '-rf', path])
539 stdout, stderr = p.communicate(None)
540 if p.returncode != 0:
541 raise Exception('rm -rf %s: %s' % (path, stderr))
544 @_deprecated('3.0', 'subprocess.run')
545 def run_command(execargs, **kwargs):
546 kwargs.setdefault('stdin', subprocess.PIPE)
547 kwargs.setdefault('stdout', subprocess.PIPE)
548 kwargs.setdefault('stderr', sys.stderr)
549 kwargs.setdefault('close_fds', True)
550 kwargs.setdefault('shell', False)
551 p = subprocess.Popen(execargs, **kwargs)
552 stdoutdata, stderrdata = p.communicate(None)
553 if p.returncode != 0:
554 raise arvados.errors.CommandFailedError(
555 "run_command %s exit %d:\n%s" %
556 (execargs, p.returncode, stderrdata))
557 return stdoutdata, stderrdata
560 def git_checkout(url, version, path):
561 from arvados import current_job
562 if not re.search('^/', path):
563 path = os.path.join(current_job().tmpdir, path)
564 if not os.path.exists(path):
565 run_command(["git", "clone", url, path],
566 cwd=os.path.dirname(path))
567 run_command(["git", "checkout", version],
572 def tar_extractor(path, decompress_flag):
573 return subprocess.Popen(["tar",
575 ("-x%sf" % decompress_flag),
578 stdin=subprocess.PIPE, stderr=sys.stderr,
579 shell=False, close_fds=True)
581 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
582 def tarball_extract(tarball, path):
583 """Retrieve a tarball from Keep and extract it to a local
584 directory. Return the absolute path where the tarball was
585 extracted. If the top level of the tarball contained just one
586 file or directory, return the absolute path of that single
589 tarball -- collection locator
590 path -- where to extract the tarball: absolute, or relative to job tmp
592 from arvados import current_job
593 from arvados.collection import CollectionReader
594 if not re.search('^/', path):
595 path = os.path.join(current_job().tmpdir, path)
596 lockfile = open(path + '.lock', 'w')
597 fcntl.flock(lockfile, fcntl.LOCK_EX)
602 already_have_it = False
604 if os.readlink(os.path.join(path, '.locator')) == tarball:
605 already_have_it = True
608 if not already_have_it:
610 # emulate "rm -f" (i.e., if the file does not exist, we win)
612 os.unlink(os.path.join(path, '.locator'))
614 if os.path.exists(os.path.join(path, '.locator')):
615 os.unlink(os.path.join(path, '.locator'))
617 for f in CollectionReader(tarball).all_files():
619 if f_name.endswith(('.tbz', '.tar.bz2')):
620 p = tar_extractor(path, 'j')
621 elif f_name.endswith(('.tgz', '.tar.gz')):
622 p = tar_extractor(path, 'z')
623 elif f_name.endswith('.tar'):
624 p = tar_extractor(path, '')
626 raise arvados.errors.AssertionError(
627 "tarball_extract cannot handle filename %s" % f.name())
635 if p.returncode != 0:
637 raise arvados.errors.CommandFailedError(
638 "tar exited %d" % p.returncode)
639 os.symlink(tarball, os.path.join(path, '.locator'))
640 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
642 if len(tld_extracts) == 1:
643 return os.path.join(path, tld_extracts[0])
646 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
647 def zipball_extract(zipball, path):
648 """Retrieve a zip archive from Keep and extract it to a local
649 directory. Return the absolute path where the archive was
650 extracted. If the top level of the archive contained just one
651 file or directory, return the absolute path of that single
654 zipball -- collection locator
655 path -- where to extract the archive: absolute, or relative to job tmp
657 from arvados import current_job
658 from arvados.collection import CollectionReader
659 if not re.search('^/', path):
660 path = os.path.join(current_job().tmpdir, path)
661 lockfile = open(path + '.lock', 'w')
662 fcntl.flock(lockfile, fcntl.LOCK_EX)
667 already_have_it = False
669 if os.readlink(os.path.join(path, '.locator')) == zipball:
670 already_have_it = True
673 if not already_have_it:
675 # emulate "rm -f" (i.e., if the file does not exist, we win)
677 os.unlink(os.path.join(path, '.locator'))
679 if os.path.exists(os.path.join(path, '.locator')):
680 os.unlink(os.path.join(path, '.locator'))
682 for f in CollectionReader(zipball).all_files():
683 if not f.name().endswith('.zip'):
684 raise arvados.errors.NotImplementedError(
685 "zipball_extract cannot handle filename %s" % f.name())
686 zip_filename = os.path.join(path, os.path.basename(f.name()))
687 zip_file = open(zip_filename, 'wb')
695 p = subprocess.Popen(["unzip",
700 stdin=None, stderr=sys.stderr,
701 shell=False, close_fds=True)
703 if p.returncode != 0:
705 raise arvados.errors.CommandFailedError(
706 "unzip exited %d" % p.returncode)
707 os.unlink(zip_filename)
708 os.symlink(zipball, os.path.join(path, '.locator'))
709 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
711 if len(tld_extracts) == 1:
712 return os.path.join(path, tld_extracts[0])
715 @_deprecated('3.0', 'arvados.collection.Collection')
716 def collection_extract(collection, path, files=[], decompress=True):
717 """Retrieve a collection from Keep and extract it to a local
718 directory. Return the absolute path where the collection was
721 collection -- collection locator
722 path -- where to extract: absolute, or relative to job tmp
724 from arvados import current_job
725 from arvados.collection import CollectionReader
726 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
728 collection_hash = matches.group(1)
730 collection_hash = hashlib.md5(collection).hexdigest()
731 if not re.search('^/', path):
732 path = os.path.join(current_job().tmpdir, path)
733 lockfile = open(path + '.lock', 'w')
734 fcntl.flock(lockfile, fcntl.LOCK_EX)
739 already_have_it = False
741 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
742 already_have_it = True
746 # emulate "rm -f" (i.e., if the file does not exist, we win)
748 os.unlink(os.path.join(path, '.locator'))
750 if os.path.exists(os.path.join(path, '.locator')):
751 os.unlink(os.path.join(path, '.locator'))
754 for s in CollectionReader(collection).all_streams():
755 stream_name = s.name()
756 for f in s.all_files():
758 ((f.name() not in files_got) and
759 (f.name() in files or
760 (decompress and f.decompressed_name() in files)))):
761 outname = f.decompressed_name() if decompress else f.name()
762 files_got += [outname]
763 if os.path.exists(os.path.join(path, stream_name, outname)):
765 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
766 outfile = open(os.path.join(path, stream_name, outname), 'wb')
767 for buf in (f.readall_decompressed() if decompress
771 if len(files_got) < len(files):
772 raise arvados.errors.AssertionError(
773 "Wanted files %s but only got %s from %s" %
775 [z.name() for z in CollectionReader(collection).all_files()]))
776 os.symlink(collection_hash, os.path.join(path, '.locator'))
781 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
782 def mkdir_dash_p(path):
783 if not os.path.isdir(path):
787 if e.errno == errno.EEXIST and os.path.isdir(path):
788 # It is not an error if someone else creates the
789 # directory between our exists() and makedirs() calls.
794 @_deprecated('3.0', 'arvados.collection.Collection')
795 def stream_extract(stream, path, files=[], decompress=True):
796 """Retrieve a stream from Keep and extract it to a local
797 directory. Return the absolute path where the stream was
800 stream -- StreamReader object
801 path -- where to extract: absolute, or relative to job tmp
803 from arvados import current_job
804 if not re.search('^/', path):
805 path = os.path.join(current_job().tmpdir, path)
806 lockfile = open(path + '.lock', 'w')
807 fcntl.flock(lockfile, fcntl.LOCK_EX)
814 for f in stream.all_files():
816 ((f.name() not in files_got) and
817 (f.name() in files or
818 (decompress and f.decompressed_name() in files)))):
819 outname = f.decompressed_name() if decompress else f.name()
820 files_got += [outname]
821 if os.path.exists(os.path.join(path, outname)):
822 os.unlink(os.path.join(path, outname))
823 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
824 outfile = open(os.path.join(path, outname), 'wb')
825 for buf in (f.readall_decompressed() if decompress
829 if len(files_got) < len(files):
830 raise arvados.errors.AssertionError(
831 "Wanted files %s but only got %s from %s" %
832 (files, files_got, [z.name() for z in stream.all_files()]))
836 @_deprecated('3.0', 'os.walk')
837 def listdir_recursive(dirname, base=None, max_depth=None):
838 """listdir_recursive(dirname, base, max_depth)
840 Return a list of file and directory names found under dirname.
842 If base is not None, prepend "{base}/" to each returned name.
844 If max_depth is None, descend into directories and return only the
845 names of files found in the directory tree.
847 If max_depth is a non-negative integer, stop descending into
848 directories at the given depth, and at that point return directory
851 If max_depth==0 (and base is None) this is equivalent to
852 sorted(os.listdir(dirname)).
855 for ent in sorted(os.listdir(dirname)):
856 ent_path = os.path.join(dirname, ent)
857 ent_base = os.path.join(base, ent) if base else ent
858 if os.path.isdir(ent_path) and max_depth != 0:
859 allfiles += listdir_recursive(
860 ent_path, base=ent_base,
861 max_depth=(max_depth-1 if max_depth else None))
863 allfiles += [ent_base]