1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
30 from pathlib import Path, PurePath
44 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
45 """Regular expression to match a hexadecimal string (case-insensitive)"""
46 CR_UNCOMMITTED = 'Uncommitted'
47 """Constant `state` value for uncommited container requests"""
48 CR_COMMITTED = 'Committed'
49 """Constant `state` value for committed container requests"""
51 """Constant `state` value for finalized container requests"""
53 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
54 """Regular expression to match any Keep block locator"""
55 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
56 """Regular expression to match any Keep block locator with an access token hint"""
57 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
58 """Regular expression to match any collection portable data hash"""
59 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
60 """Regular expression to match an Arvados collection manifest text"""
61 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
62 """Regular expression to match a file path from a collection identified by portable data hash"""
63 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
64 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
66 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
67 """Regular expression to match any Arvados object UUID"""
68 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
69 """Regular expression to match any Arvados collection UUID"""
70 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
71 """Regular expression to match any Arvados container UUID"""
72 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
73 """Regular expression to match any Arvados group UUID"""
74 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
75 """Regular expression to match any Arvados link UUID"""
76 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
77 """Regular expression to match any Arvados user UUID"""
78 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
79 """Regular expression to match any Arvados job UUID
81 .. WARNING:: Deprecated
82 Arvados job resources are deprecated and will be removed in a future
83 release. Prefer the containers API instead.
86 logger = logging.getLogger('arvados')
88 def _deprecated(version=None, preferred=None):
89 """Mark a callable as deprecated in the SDK
91 This will wrap the callable to emit as a DeprecationWarning
92 and add a deprecation notice to its docstring.
94 If the following arguments are given, they'll be included in the
97 * preferred: str | None --- The name of an alternative that users should
100 * version: str | None --- The version of Arvados when the callable is
101 scheduled to be removed.
106 version = f' and scheduled to be removed in Arvados {version}'
107 if preferred is None:
110 preferred = f' Prefer {preferred} instead.'
111 def deprecated_decorator(func):
112 fullname = f'{func.__module__}.{func.__qualname__}'
113 parent, _, name = fullname.rpartition('.')
114 if name == '__init__':
116 warning_msg = f'{fullname} is deprecated{version}.{preferred}'
117 @functools.wraps(func)
118 def deprecated_wrapper(*args, **kwargs):
119 warnings.warn(warning_msg, DeprecationWarning, 2)
120 return func(*args, **kwargs)
121 # Get func's docstring without any trailing newline or empty lines.
122 func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
123 match = re.search(r'\n([ \t]+)\S', func_doc)
124 indent = '' if match is None else match.group(1)
125 warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent} {warning_msg}'
126 # Make the deprecation notice the second "paragraph" of the
127 # docstring if possible. Otherwise append it.
128 docstring, count = re.subn(
129 rf'\n[ \t]*\n{indent}',
130 f'{warning_doc}\n\n{indent}',
135 docstring = f'{func_doc.lstrip()}{warning_doc}'
136 deprecated_wrapper.__doc__ = docstring
137 return deprecated_wrapper
138 return deprecated_decorator
140 @dataclasses.dataclass
141 class _BaseDirectorySpec:
142 """Parse base directories
144 A _BaseDirectorySpec defines all the environment variable keys and defaults
145 related to a set of base directories (cache, config, state, etc.). It
146 provides pure methods to parse environment settings into valid paths.
150 xdg_home_default: PurePath
151 xdg_dirs_key: Optional[str] = None
152 xdg_dirs_default: str = ''
155 def _abspath_from_env(env: Mapping[str, str], key: str) -> Optional[Path]:
157 path = Path(env[key])
158 except (KeyError, ValueError):
161 ok = path.is_absolute()
162 return path if ok else None
165 def _iter_abspaths(value: str) -> Iterator[Path]:
166 for path_s in value.split(':'):
168 if path.is_absolute():
171 def iter_systemd(self, env: Mapping[str, str]) -> Iterator[Path]:
172 return self._iter_abspaths(env.get(self.systemd_key, ''))
174 def iter_xdg(self, env: Mapping[str, str], subdir: PurePath) -> Iterator[Path]:
175 yield self.xdg_home(env, subdir)
176 if self.xdg_dirs_key is not None:
177 for path in self._iter_abspaths(env.get(self.xdg_dirs_key) or self.xdg_dirs_default):
180 def xdg_home(self, env: Mapping[str, str], subdir: PurePath) -> Path:
182 self._abspath_from_env(env, self.xdg_home_key)
183 or self.xdg_home_default_path(env)
186 def xdg_home_default_path(self, env: Mapping[str, str]) -> Path:
187 return (self._abspath_from_env(env, 'HOME') or Path.home()) / self.xdg_home_default
189 def xdg_home_is_customized(self, env: Mapping[str, str]) -> bool:
190 xdg_home = self._abspath_from_env(env, self.xdg_home_key)
191 return xdg_home is not None and xdg_home != self.xdg_home_default_path(env)
194 class _BaseDirectorySpecs(enum.Enum):
195 """Base directory specifications
197 This enum provides easy access to the standard base directory settings.
199 CACHE = _BaseDirectorySpec(
204 CONFIG = _BaseDirectorySpec(
205 'CONFIGURATION_DIRECTORY',
211 STATE = _BaseDirectorySpec(
214 PurePath('.local', 'state'),
218 class _BaseDirectories:
219 """Resolve paths from a base directory spec
221 Given a _BaseDirectorySpec, this class provides stateful methods to find
222 existing files and return the most-preferred directory for writing.
224 _STORE_MODE = stat.S_IFDIR | stat.S_IWUSR
228 spec: Union[_BaseDirectorySpec, _BaseDirectorySpecs, str],
229 env: Mapping[str, str]=os.environ,
230 xdg_subdir: Union[os.PathLike, str]='arvados',
232 if isinstance(spec, str):
233 spec = _BaseDirectorySpecs[spec].value
234 elif isinstance(spec, _BaseDirectorySpecs):
238 self._xdg_subdir = PurePath(xdg_subdir)
240 def search(self, name: str) -> Iterator[Path]:
242 for search_path in itertools.chain(
243 self._spec.iter_systemd(self._env),
244 self._spec.iter_xdg(self._env, self._xdg_subdir),
246 path = search_path / name
250 # The rest of this function is dedicated to warning the user if they
251 # have a custom XDG_*_HOME value that prevented the search from
252 # succeeding. This should be rare.
253 if any_found or not self._spec.xdg_home_is_customized(self._env):
255 default_home = self._spec.xdg_home_default_path(self._env)
256 default_path = Path(self._xdg_subdir / name)
257 if not (default_home / default_path).exists():
259 if self._spec.xdg_dirs_key is None:
260 suggest_key = self._spec.xdg_home_key
261 suggest_value = default_home
263 suggest_key = self._spec.xdg_dirs_key
264 cur_value = self._env.get(suggest_key, '')
265 value_sep = ':' if cur_value else ''
266 suggest_value = f'{cur_value}{value_sep}{default_home}'
269 %s was not found under your configured $%s (%s), \
270 but does exist at the default location (%s) - \
271 consider running this program with the environment setting %s=%s\
274 self._spec.xdg_home_key,
275 self._spec.xdg_home(self._env, ''),
278 shlex.quote(suggest_value),
283 subdir: Union[str, os.PathLike]=PurePath(),
286 for path in self._spec.iter_systemd(self._env):
288 mode = path.stat().st_mode
291 if (mode & self._STORE_MODE) == self._STORE_MODE:
294 path = self._spec.xdg_home(self._env, self._xdg_subdir)
296 path.mkdir(parents=True, exist_ok=True, mode=mode)
300 def is_hex(s: str, *length_args: int) -> bool:
301 """Indicate whether a string is a hexadecimal number
303 This method returns true if all characters in the string are hexadecimal
304 digits. It is case-insensitive.
306 You can also pass optional length arguments to check that the string has
307 the expected number of digits. If you pass one integer, the string must
308 have that length exactly, otherwise the method returns False. If you
309 pass two integers, the string's length must fall within that minimum and
310 maximum (inclusive), otherwise the method returns False.
314 * s: str --- The string to check
316 * length_args: int --- Optional length limit(s) for the string to check
318 num_length_args = len(length_args)
319 if num_length_args > 2:
320 raise arvados.errors.ArgumentError(
321 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
322 elif num_length_args == 2:
323 good_len = (length_args[0] <= len(s) <= length_args[1])
324 elif num_length_args == 1:
325 good_len = (len(s) == length_args[0])
328 return bool(good_len and HEX_RE.match(s))
331 fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
332 order_key: str="created_at",
334 ascending: bool=True,
336 ) -> Iterator[Dict[str, Any]]:
337 """Iterate all Arvados resources from an API list call
339 This method takes a method that represents an Arvados API list call, and
340 iterates the objects returned by the API server. It can make multiple API
341 calls to retrieve and iterate all objects available from the API server.
345 * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
346 function that wraps an Arvados API method that returns a list of
347 objects. If you have an Arvados API client named `arv`, examples
348 include `arv.collections().list` and `arv.groups().contents`. Note
349 that you should pass the function *without* calling it.
351 * order_key: str --- The name of the primary object field that objects
352 should be sorted by. This name is used to build an `order` argument
353 for `fn`. Default `'created_at'`.
355 * num_retries: int --- This argument is passed through to
356 `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
357 that method's docstring for details. Default 0 (meaning API calls will
358 use the `num_retries` value set when the Arvados API client was
361 * ascending: bool --- Used to build an `order` argument for `fn`. If True,
362 all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
363 fields will be sorted in `'desc'` (descending) order.
365 Additional keyword arguments will be passed directly to `fn` for each API
366 call. Note that this function sets `count`, `limit`, and `order` as part of
370 kwargs["limit"] = pagesize
371 kwargs["count"] = 'none'
372 asc = "asc" if ascending else "desc"
373 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
374 other_filters = kwargs.get("filters", [])
377 select = set(kwargs['select'])
381 select.add(order_key)
383 kwargs['select'] = list(select)
387 expect_full_page = True
388 seen_prevpage = set()
389 seen_thispage = set()
391 prev_page_all_same_order_key = False
394 kwargs["filters"] = nextpage+other_filters
395 items = fn(**kwargs).execute(num_retries=num_retries)
397 if len(items["items"]) == 0:
398 if prev_page_all_same_order_key:
399 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
400 prev_page_all_same_order_key = False
405 seen_prevpage = seen_thispage
406 seen_thispage = set()
408 for i in items["items"]:
409 # In cases where there's more than one record with the
410 # same order key, the result could include records we
411 # already saw in the last page. Skip them.
412 if i["uuid"] in seen_prevpage:
414 seen_thispage.add(i["uuid"])
417 firstitem = items["items"][0]
418 lastitem = items["items"][-1]
420 if firstitem[order_key] == lastitem[order_key]:
421 # Got a page where every item has the same order key.
422 # Switch to using uuid for paging.
423 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
424 prev_page_all_same_order_key = True
426 # Start from the last order key seen, but skip the last
427 # known uuid to avoid retrieving the same row twice. If
428 # there are multiple rows with the same order key it is
429 # still likely we'll end up retrieving duplicate rows.
430 # That's handled by tracking the "seen" rows for each page
431 # so they can be skipped if they show up on the next page.
432 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
433 prev_page_all_same_order_key = False
435 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
436 """Return the path of the best available source of CA certificates
438 This function checks various known paths that provide trusted CA
439 certificates, and returns the first one that exists. It checks:
441 * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
442 * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
443 * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
445 * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
448 If none of these paths exist, this function returns the value of `fallback`.
452 * fallback: T --- The value to return if none of the known paths exist.
453 The default value is the certificate store of Mozilla's trusted CAs
454 included with the Python [certifi][] package.
456 [certifi]: https://pypi.org/project/certifi/
458 for ca_certs_path in [
459 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
460 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
461 os.environ.get('SSL_CERT_FILE'),
463 '/etc/arvados/ca-certificates.crt',
465 '/etc/ssl/certs/ca-certificates.crt',
467 '/etc/pki/tls/certs/ca-bundle.crt',
469 if ca_certs_path and os.path.exists(ca_certs_path):
473 def new_request_id() -> str:
474 """Return a random request ID
476 This function generates and returns a random string suitable for use as a
477 `X-Request-Id` header value in the Arvados API.
480 # 2**104 > 36**20 > 2**103
481 n = random.getrandbits(104)
485 rid += chr(c+ord('0'))
487 rid += chr(c+ord('a')-10)
491 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
492 """Return an Arvados cluster's configuration, with caching
494 This function gets and returns the Arvados configuration from the API
495 server. It caches the result on the client object and reuses it on any
500 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
501 object to use to retrieve and cache the Arvados cluster configuration.
503 if not svc._rootDesc.get('resources').get('configs', False):
504 # Old API server version, no config export endpoint
506 if not hasattr(svc, '_cached_config'):
507 svc._cached_config = svc.configs().get().execute()
508 return svc._cached_config
510 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
511 """Return an Arvados cluster's vocabulary, with caching
513 This function gets and returns the Arvados vocabulary from the API
514 server. It caches the result on the client object and reuses it on any
517 .. HINT:: Low-level method
518 This is a relatively low-level wrapper around the Arvados API. Most
519 users will prefer to use `arvados.vocabulary.load_vocabulary`.
523 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
524 object to use to retrieve and cache the Arvados cluster vocabulary.
526 if not svc._rootDesc.get('resources').get('vocabularies', False):
527 # Old API server version, no vocabulary export endpoint
529 if not hasattr(svc, '_cached_vocabulary'):
530 svc._cached_vocabulary = svc.vocabularies().get().execute()
531 return svc._cached_vocabulary
533 def trim_name(collectionname: str) -> str:
534 """Limit the length of a name to fit within Arvados API limits
536 This function ensures that a string is short enough to use as an object
537 name in the Arvados API, leaving room for text that may be added by the
538 `ensure_unique_name` argument. If the source name is short enough, it is
539 returned unchanged. Otherwise, this function returns a string with excess
540 characters removed from the middle of the source string and replaced with
545 * collectionname: str --- The desired source name
547 max_name_len = 254 - 28
549 if len(collectionname) > max_name_len:
550 over = len(collectionname) - max_name_len
551 split = int(max_name_len/2)
552 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
554 return collectionname
556 @_deprecated('3.0', 'arvados.util.keyset_list_all')
557 def list_all(fn, num_retries=0, **kwargs):
558 # Default limit to (effectively) api server's MAX_LIMIT
559 kwargs.setdefault('limit', sys.maxsize)
562 items_available = sys.maxsize
563 while len(items) < items_available:
564 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
566 items_available = c['items_available']
567 offset = c['offset'] + len(c['items'])
571 def clear_tmpdir(path=None):
573 Ensure the given directory (or TASK_TMPDIR if none given)
576 from arvados import current_task
578 path = current_task().tmpdir
579 if os.path.exists(path):
580 p = subprocess.Popen(['rm', '-rf', path])
581 stdout, stderr = p.communicate(None)
582 if p.returncode != 0:
583 raise Exception('rm -rf %s: %s' % (path, stderr))
586 @_deprecated('3.0', 'subprocess.run')
587 def run_command(execargs, **kwargs):
588 kwargs.setdefault('stdin', subprocess.PIPE)
589 kwargs.setdefault('stdout', subprocess.PIPE)
590 kwargs.setdefault('stderr', sys.stderr)
591 kwargs.setdefault('close_fds', True)
592 kwargs.setdefault('shell', False)
593 p = subprocess.Popen(execargs, **kwargs)
594 stdoutdata, stderrdata = p.communicate(None)
595 if p.returncode != 0:
596 raise arvados.errors.CommandFailedError(
597 "run_command %s exit %d:\n%s" %
598 (execargs, p.returncode, stderrdata))
599 return stdoutdata, stderrdata
602 def git_checkout(url, version, path):
603 from arvados import current_job
604 if not re.search('^/', path):
605 path = os.path.join(current_job().tmpdir, path)
606 if not os.path.exists(path):
607 run_command(["git", "clone", url, path],
608 cwd=os.path.dirname(path))
609 run_command(["git", "checkout", version],
614 def tar_extractor(path, decompress_flag):
615 return subprocess.Popen(["tar",
617 ("-x%sf" % decompress_flag),
620 stdin=subprocess.PIPE, stderr=sys.stderr,
621 shell=False, close_fds=True)
623 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
624 def tarball_extract(tarball, path):
625 """Retrieve a tarball from Keep and extract it to a local
626 directory. Return the absolute path where the tarball was
627 extracted. If the top level of the tarball contained just one
628 file or directory, return the absolute path of that single
631 tarball -- collection locator
632 path -- where to extract the tarball: absolute, or relative to job tmp
634 from arvados import current_job
635 from arvados.collection import CollectionReader
636 if not re.search('^/', path):
637 path = os.path.join(current_job().tmpdir, path)
638 lockfile = open(path + '.lock', 'w')
639 fcntl.flock(lockfile, fcntl.LOCK_EX)
644 already_have_it = False
646 if os.readlink(os.path.join(path, '.locator')) == tarball:
647 already_have_it = True
650 if not already_have_it:
652 # emulate "rm -f" (i.e., if the file does not exist, we win)
654 os.unlink(os.path.join(path, '.locator'))
656 if os.path.exists(os.path.join(path, '.locator')):
657 os.unlink(os.path.join(path, '.locator'))
659 for f in CollectionReader(tarball).all_files():
661 if f_name.endswith(('.tbz', '.tar.bz2')):
662 p = tar_extractor(path, 'j')
663 elif f_name.endswith(('.tgz', '.tar.gz')):
664 p = tar_extractor(path, 'z')
665 elif f_name.endswith('.tar'):
666 p = tar_extractor(path, '')
668 raise arvados.errors.AssertionError(
669 "tarball_extract cannot handle filename %s" % f.name())
677 if p.returncode != 0:
679 raise arvados.errors.CommandFailedError(
680 "tar exited %d" % p.returncode)
681 os.symlink(tarball, os.path.join(path, '.locator'))
682 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
684 if len(tld_extracts) == 1:
685 return os.path.join(path, tld_extracts[0])
688 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
689 def zipball_extract(zipball, path):
690 """Retrieve a zip archive from Keep and extract it to a local
691 directory. Return the absolute path where the archive was
692 extracted. If the top level of the archive contained just one
693 file or directory, return the absolute path of that single
696 zipball -- collection locator
697 path -- where to extract the archive: absolute, or relative to job tmp
699 from arvados import current_job
700 from arvados.collection import CollectionReader
701 if not re.search('^/', path):
702 path = os.path.join(current_job().tmpdir, path)
703 lockfile = open(path + '.lock', 'w')
704 fcntl.flock(lockfile, fcntl.LOCK_EX)
709 already_have_it = False
711 if os.readlink(os.path.join(path, '.locator')) == zipball:
712 already_have_it = True
715 if not already_have_it:
717 # emulate "rm -f" (i.e., if the file does not exist, we win)
719 os.unlink(os.path.join(path, '.locator'))
721 if os.path.exists(os.path.join(path, '.locator')):
722 os.unlink(os.path.join(path, '.locator'))
724 for f in CollectionReader(zipball).all_files():
725 if not f.name().endswith('.zip'):
726 raise arvados.errors.NotImplementedError(
727 "zipball_extract cannot handle filename %s" % f.name())
728 zip_filename = os.path.join(path, os.path.basename(f.name()))
729 zip_file = open(zip_filename, 'wb')
737 p = subprocess.Popen(["unzip",
742 stdin=None, stderr=sys.stderr,
743 shell=False, close_fds=True)
745 if p.returncode != 0:
747 raise arvados.errors.CommandFailedError(
748 "unzip exited %d" % p.returncode)
749 os.unlink(zip_filename)
750 os.symlink(zipball, os.path.join(path, '.locator'))
751 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
753 if len(tld_extracts) == 1:
754 return os.path.join(path, tld_extracts[0])
757 @_deprecated('3.0', 'arvados.collection.Collection')
758 def collection_extract(collection, path, files=[], decompress=True):
759 """Retrieve a collection from Keep and extract it to a local
760 directory. Return the absolute path where the collection was
763 collection -- collection locator
764 path -- where to extract: absolute, or relative to job tmp
766 from arvados import current_job
767 from arvados.collection import CollectionReader
768 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
770 collection_hash = matches.group(1)
772 collection_hash = hashlib.md5(collection).hexdigest()
773 if not re.search('^/', path):
774 path = os.path.join(current_job().tmpdir, path)
775 lockfile = open(path + '.lock', 'w')
776 fcntl.flock(lockfile, fcntl.LOCK_EX)
781 already_have_it = False
783 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
784 already_have_it = True
788 # emulate "rm -f" (i.e., if the file does not exist, we win)
790 os.unlink(os.path.join(path, '.locator'))
792 if os.path.exists(os.path.join(path, '.locator')):
793 os.unlink(os.path.join(path, '.locator'))
796 for s in CollectionReader(collection).all_streams():
797 stream_name = s.name()
798 for f in s.all_files():
800 ((f.name() not in files_got) and
801 (f.name() in files or
802 (decompress and f.decompressed_name() in files)))):
803 outname = f.decompressed_name() if decompress else f.name()
804 files_got += [outname]
805 if os.path.exists(os.path.join(path, stream_name, outname)):
807 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
808 outfile = open(os.path.join(path, stream_name, outname), 'wb')
809 for buf in (f.readall_decompressed() if decompress
813 if len(files_got) < len(files):
814 raise arvados.errors.AssertionError(
815 "Wanted files %s but only got %s from %s" %
817 [z.name() for z in CollectionReader(collection).all_files()]))
818 os.symlink(collection_hash, os.path.join(path, '.locator'))
823 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
824 def mkdir_dash_p(path):
825 if not os.path.isdir(path):
829 if e.errno == errno.EEXIST and os.path.isdir(path):
830 # It is not an error if someone else creates the
831 # directory between our exists() and makedirs() calls.
836 @_deprecated('3.0', 'arvados.collection.Collection')
837 def stream_extract(stream, path, files=[], decompress=True):
838 """Retrieve a stream from Keep and extract it to a local
839 directory. Return the absolute path where the stream was
842 stream -- StreamReader object
843 path -- where to extract: absolute, or relative to job tmp
845 from arvados import current_job
846 if not re.search('^/', path):
847 path = os.path.join(current_job().tmpdir, path)
848 lockfile = open(path + '.lock', 'w')
849 fcntl.flock(lockfile, fcntl.LOCK_EX)
856 for f in stream.all_files():
858 ((f.name() not in files_got) and
859 (f.name() in files or
860 (decompress and f.decompressed_name() in files)))):
861 outname = f.decompressed_name() if decompress else f.name()
862 files_got += [outname]
863 if os.path.exists(os.path.join(path, outname)):
864 os.unlink(os.path.join(path, outname))
865 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
866 outfile = open(os.path.join(path, outname), 'wb')
867 for buf in (f.readall_decompressed() if decompress
871 if len(files_got) < len(files):
872 raise arvados.errors.AssertionError(
873 "Wanted files %s but only got %s from %s" %
874 (files, files_got, [z.name() for z in stream.all_files()]))
878 @_deprecated('3.0', 'os.walk')
879 def listdir_recursive(dirname, base=None, max_depth=None):
880 """listdir_recursive(dirname, base, max_depth)
882 Return a list of file and directory names found under dirname.
884 If base is not None, prepend "{base}/" to each returned name.
886 If max_depth is None, descend into directories and return only the
887 names of files found in the directory tree.
889 If max_depth is a non-negative integer, stop descending into
890 directories at the given depth, and at that point return directory
893 If max_depth==0 (and base is None) this is equivalent to
894 sorted(os.listdir(dirname)).
897 for ent in sorted(os.listdir(dirname)):
898 ent_path = os.path.join(dirname, ent)
899 ent_base = os.path.join(base, ent) if base else ent
900 if os.path.isdir(ent_path) and max_depth != 0:
901 allfiles += listdir_recursive(
902 ent_path, base=ent_base,
903 max_depth=(max_depth-1 if max_depth else None))
905 allfiles += [ent_base]