Merge branch '21020-pysdk-env-paths'
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Arvados utilities
5
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
8 """
9
10 import dataclasses
11 import enum
12 import errno
13 import fcntl
14 import functools
15 import hashlib
16 import httplib2
17 import itertools
18 import logging
19 import os
20 import random
21 import re
22 import shlex
23 import stat
24 import subprocess
25 import sys
26 import warnings
27
28 import arvados.errors
29
30 from pathlib import Path, PurePath
31 from typing import (
32     Any,
33     Callable,
34     Dict,
35     Iterator,
36     Mapping,
37     Optional,
38     TypeVar,
39     Union,
40 )
41
42 T = TypeVar('T')
43
44 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
45 """Regular expression to match a hexadecimal string (case-insensitive)"""
46 CR_UNCOMMITTED = 'Uncommitted'
47 """Constant `state` value for uncommited container requests"""
48 CR_COMMITTED = 'Committed'
49 """Constant `state` value for committed container requests"""
50 CR_FINAL = 'Final'
51 """Constant `state` value for finalized container requests"""
52
53 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
54 """Regular expression to match any Keep block locator"""
55 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
56 """Regular expression to match any Keep block locator with an access token hint"""
57 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
58 """Regular expression to match any collection portable data hash"""
59 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
60 """Regular expression to match an Arvados collection manifest text"""
61 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
62 """Regular expression to match a file path from a collection identified by portable data hash"""
63 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
64 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
65
66 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
67 """Regular expression to match any Arvados object UUID"""
68 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
69 """Regular expression to match any Arvados collection UUID"""
70 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
71 """Regular expression to match any Arvados container UUID"""
72 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
73 """Regular expression to match any Arvados group UUID"""
74 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
75 """Regular expression to match any Arvados link UUID"""
76 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
77 """Regular expression to match any Arvados user UUID"""
78 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
79 """Regular expression to match any Arvados job UUID
80
81 .. WARNING:: Deprecated
82    Arvados job resources are deprecated and will be removed in a future
83    release. Prefer the containers API instead.
84 """
85
86 logger = logging.getLogger('arvados')
87
88 def _deprecated(version=None, preferred=None):
89     """Mark a callable as deprecated in the SDK
90
91     This will wrap the callable to emit as a DeprecationWarning
92     and add a deprecation notice to its docstring.
93
94     If the following arguments are given, they'll be included in the
95     notices:
96
97     * preferred: str | None --- The name of an alternative that users should
98       use instead.
99
100     * version: str | None --- The version of Arvados when the callable is
101       scheduled to be removed.
102     """
103     if version is None:
104         version = ''
105     else:
106         version = f' and scheduled to be removed in Arvados {version}'
107     if preferred is None:
108         preferred = ''
109     else:
110         preferred = f' Prefer {preferred} instead.'
111     def deprecated_decorator(func):
112         fullname = f'{func.__module__}.{func.__qualname__}'
113         parent, _, name = fullname.rpartition('.')
114         if name == '__init__':
115             fullname = parent
116         warning_msg = f'{fullname} is deprecated{version}.{preferred}'
117         @functools.wraps(func)
118         def deprecated_wrapper(*args, **kwargs):
119             warnings.warn(warning_msg, DeprecationWarning, 2)
120             return func(*args, **kwargs)
121         # Get func's docstring without any trailing newline or empty lines.
122         func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
123         match = re.search(r'\n([ \t]+)\S', func_doc)
124         indent = '' if match is None else match.group(1)
125         warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent}   {warning_msg}'
126         # Make the deprecation notice the second "paragraph" of the
127         # docstring if possible. Otherwise append it.
128         docstring, count = re.subn(
129             rf'\n[ \t]*\n{indent}',
130             f'{warning_doc}\n\n{indent}',
131             func_doc,
132             count=1,
133         )
134         if not count:
135             docstring = f'{func_doc.lstrip()}{warning_doc}'
136         deprecated_wrapper.__doc__ = docstring
137         return deprecated_wrapper
138     return deprecated_decorator
139
140 @dataclasses.dataclass
141 class _BaseDirectorySpec:
142     """Parse base directories
143
144     A _BaseDirectorySpec defines all the environment variable keys and defaults
145     related to a set of base directories (cache, config, state, etc.). It
146     provides pure methods to parse environment settings into valid paths.
147     """
148     systemd_key: str
149     xdg_home_key: str
150     xdg_home_default: PurePath
151     xdg_dirs_key: Optional[str] = None
152     xdg_dirs_default: str = ''
153
154     @staticmethod
155     def _abspath_from_env(env: Mapping[str, str], key: str) -> Optional[Path]:
156         try:
157             path = Path(env[key])
158         except (KeyError, ValueError):
159             ok = False
160         else:
161             ok = path.is_absolute()
162         return path if ok else None
163
164     @staticmethod
165     def _iter_abspaths(value: str) -> Iterator[Path]:
166         for path_s in value.split(':'):
167             path = Path(path_s)
168             if path.is_absolute():
169                 yield path
170
171     def iter_systemd(self, env: Mapping[str, str]) -> Iterator[Path]:
172         return self._iter_abspaths(env.get(self.systemd_key, ''))
173
174     def iter_xdg(self, env: Mapping[str, str], subdir: PurePath) -> Iterator[Path]:
175         yield self.xdg_home(env, subdir)
176         if self.xdg_dirs_key is not None:
177             for path in self._iter_abspaths(env.get(self.xdg_dirs_key) or self.xdg_dirs_default):
178                 yield path / subdir
179
180     def xdg_home(self, env: Mapping[str, str], subdir: PurePath) -> Path:
181         return (
182             self._abspath_from_env(env, self.xdg_home_key)
183             or self.xdg_home_default_path(env)
184         ) / subdir
185
186     def xdg_home_default_path(self, env: Mapping[str, str]) -> Path:
187         return (self._abspath_from_env(env, 'HOME') or Path.home()) / self.xdg_home_default
188
189     def xdg_home_is_customized(self, env: Mapping[str, str]) -> bool:
190         xdg_home = self._abspath_from_env(env, self.xdg_home_key)
191         return xdg_home is not None and xdg_home != self.xdg_home_default_path(env)
192
193
194 class _BaseDirectorySpecs(enum.Enum):
195     """Base directory specifications
196
197     This enum provides easy access to the standard base directory settings.
198     """
199     CACHE = _BaseDirectorySpec(
200         'CACHE_DIRECTORY',
201         'XDG_CACHE_HOME',
202         PurePath('.cache'),
203     )
204     CONFIG = _BaseDirectorySpec(
205         'CONFIGURATION_DIRECTORY',
206         'XDG_CONFIG_HOME',
207         PurePath('.config'),
208         'XDG_CONFIG_DIRS',
209         '/etc/xdg',
210     )
211     STATE = _BaseDirectorySpec(
212         'STATE_DIRECTORY',
213         'XDG_STATE_HOME',
214         PurePath('.local', 'state'),
215     )
216
217
218 class _BaseDirectories:
219     """Resolve paths from a base directory spec
220
221     Given a _BaseDirectorySpec, this class provides stateful methods to find
222     existing files and return the most-preferred directory for writing.
223     """
224     _STORE_MODE = stat.S_IFDIR | stat.S_IWUSR
225
226     def __init__(
227             self,
228             spec: Union[_BaseDirectorySpec, _BaseDirectorySpecs, str],
229             env: Mapping[str, str]=os.environ,
230             xdg_subdir: Union[os.PathLike, str]='arvados',
231     ) -> None:
232         if isinstance(spec, str):
233             spec = _BaseDirectorySpecs[spec].value
234         elif isinstance(spec, _BaseDirectorySpecs):
235             spec = spec.value
236         self._spec = spec
237         self._env = env
238         self._xdg_subdir = PurePath(xdg_subdir)
239
240     def search(self, name: str) -> Iterator[Path]:
241         any_found = False
242         for search_path in itertools.chain(
243                 self._spec.iter_systemd(self._env),
244                 self._spec.iter_xdg(self._env, self._xdg_subdir),
245         ):
246             path = search_path / name
247             if path.exists():
248                 yield path
249                 any_found = True
250         # The rest of this function is dedicated to warning the user if they
251         # have a custom XDG_*_HOME value that prevented the search from
252         # succeeding. This should be rare.
253         if any_found or not self._spec.xdg_home_is_customized(self._env):
254             return
255         default_home = self._spec.xdg_home_default_path(self._env)
256         default_path = Path(self._xdg_subdir / name)
257         if not (default_home / default_path).exists():
258             return
259         if self._spec.xdg_dirs_key is None:
260             suggest_key = self._spec.xdg_home_key
261             suggest_value = default_home
262         else:
263             suggest_key = self._spec.xdg_dirs_key
264             cur_value = self._env.get(suggest_key, '')
265             value_sep = ':' if cur_value else ''
266             suggest_value = f'{cur_value}{value_sep}{default_home}'
267         logger.warning(
268             "\
269 %s was not found under your configured $%s (%s), \
270 but does exist at the default location (%s) - \
271 consider running this program with the environment setting %s=%s\
272 ",
273             default_path,
274             self._spec.xdg_home_key,
275             self._spec.xdg_home(self._env, ''),
276             default_home,
277             suggest_key,
278             shlex.quote(suggest_value),
279         )
280
281     def storage_path(
282             self,
283             subdir: Union[str, os.PathLike]=PurePath(),
284             mode: int=0o700,
285     ) -> Path:
286         for path in self._spec.iter_systemd(self._env):
287             try:
288                 mode = path.stat().st_mode
289             except OSError:
290                 continue
291             if (mode & self._STORE_MODE) == self._STORE_MODE:
292                 break
293         else:
294             path = self._spec.xdg_home(self._env, self._xdg_subdir)
295         path /= subdir
296         path.mkdir(parents=True, exist_ok=True, mode=mode)
297         return path
298
299
300 def is_hex(s: str, *length_args: int) -> bool:
301     """Indicate whether a string is a hexadecimal number
302
303     This method returns true if all characters in the string are hexadecimal
304     digits. It is case-insensitive.
305
306     You can also pass optional length arguments to check that the string has
307     the expected number of digits. If you pass one integer, the string must
308     have that length exactly, otherwise the method returns False. If you
309     pass two integers, the string's length must fall within that minimum and
310     maximum (inclusive), otherwise the method returns False.
311
312     Arguments:
313
314     * s: str --- The string to check
315
316     * length_args: int --- Optional length limit(s) for the string to check
317     """
318     num_length_args = len(length_args)
319     if num_length_args > 2:
320         raise arvados.errors.ArgumentError(
321             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
322     elif num_length_args == 2:
323         good_len = (length_args[0] <= len(s) <= length_args[1])
324     elif num_length_args == 1:
325         good_len = (len(s) == length_args[0])
326     else:
327         good_len = True
328     return bool(good_len and HEX_RE.match(s))
329
330 def keyset_list_all(
331         fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
332         order_key: str="created_at",
333         num_retries: int=0,
334         ascending: bool=True,
335         **kwargs: Any,
336 ) -> Iterator[Dict[str, Any]]:
337     """Iterate all Arvados resources from an API list call
338
339     This method takes a method that represents an Arvados API list call, and
340     iterates the objects returned by the API server. It can make multiple API
341     calls to retrieve and iterate all objects available from the API server.
342
343     Arguments:
344
345     * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
346       function that wraps an Arvados API method that returns a list of
347       objects. If you have an Arvados API client named `arv`, examples
348       include `arv.collections().list` and `arv.groups().contents`. Note
349       that you should pass the function *without* calling it.
350
351     * order_key: str --- The name of the primary object field that objects
352       should be sorted by. This name is used to build an `order` argument
353       for `fn`. Default `'created_at'`.
354
355     * num_retries: int --- This argument is passed through to
356       `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
357       that method's docstring for details. Default 0 (meaning API calls will
358       use the `num_retries` value set when the Arvados API client was
359       constructed).
360
361     * ascending: bool --- Used to build an `order` argument for `fn`. If True,
362       all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
363       fields will be sorted in `'desc'` (descending) order.
364
365     Additional keyword arguments will be passed directly to `fn` for each API
366     call. Note that this function sets `count`, `limit`, and `order` as part of
367     its work.
368     """
369     pagesize = 1000
370     kwargs["limit"] = pagesize
371     kwargs["count"] = 'none'
372     asc = "asc" if ascending else "desc"
373     kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
374     other_filters = kwargs.get("filters", [])
375
376     try:
377         select = set(kwargs['select'])
378     except KeyError:
379         pass
380     else:
381         select.add(order_key)
382         select.add('uuid')
383         kwargs['select'] = list(select)
384
385     nextpage = []
386     tot = 0
387     expect_full_page = True
388     seen_prevpage = set()
389     seen_thispage = set()
390     lastitem = None
391     prev_page_all_same_order_key = False
392
393     while True:
394         kwargs["filters"] = nextpage+other_filters
395         items = fn(**kwargs).execute(num_retries=num_retries)
396
397         if len(items["items"]) == 0:
398             if prev_page_all_same_order_key:
399                 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
400                 prev_page_all_same_order_key = False
401                 continue
402             else:
403                 return
404
405         seen_prevpage = seen_thispage
406         seen_thispage = set()
407
408         for i in items["items"]:
409             # In cases where there's more than one record with the
410             # same order key, the result could include records we
411             # already saw in the last page.  Skip them.
412             if i["uuid"] in seen_prevpage:
413                 continue
414             seen_thispage.add(i["uuid"])
415             yield i
416
417         firstitem = items["items"][0]
418         lastitem = items["items"][-1]
419
420         if firstitem[order_key] == lastitem[order_key]:
421             # Got a page where every item has the same order key.
422             # Switch to using uuid for paging.
423             nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
424             prev_page_all_same_order_key = True
425         else:
426             # Start from the last order key seen, but skip the last
427             # known uuid to avoid retrieving the same row twice.  If
428             # there are multiple rows with the same order key it is
429             # still likely we'll end up retrieving duplicate rows.
430             # That's handled by tracking the "seen" rows for each page
431             # so they can be skipped if they show up on the next page.
432             nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
433             prev_page_all_same_order_key = False
434
435 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
436     """Return the path of the best available source of CA certificates
437
438     This function checks various known paths that provide trusted CA
439     certificates, and returns the first one that exists. It checks:
440
441     * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
442     * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
443     * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
444       distributions
445     * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
446       distributions
447
448     If none of these paths exist, this function returns the value of `fallback`.
449
450     Arguments:
451
452     * fallback: T --- The value to return if none of the known paths exist.
453       The default value is the certificate store of Mozilla's trusted CAs
454       included with the Python [certifi][] package.
455
456     [certifi]: https://pypi.org/project/certifi/
457     """
458     for ca_certs_path in [
459         # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
460         # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
461         os.environ.get('SSL_CERT_FILE'),
462         # Arvados specific:
463         '/etc/arvados/ca-certificates.crt',
464         # Debian:
465         '/etc/ssl/certs/ca-certificates.crt',
466         # Red Hat:
467         '/etc/pki/tls/certs/ca-bundle.crt',
468         ]:
469         if ca_certs_path and os.path.exists(ca_certs_path):
470             return ca_certs_path
471     return fallback
472
473 def new_request_id() -> str:
474     """Return a random request ID
475
476     This function generates and returns a random string suitable for use as a
477     `X-Request-Id` header value in the Arvados API.
478     """
479     rid = "req-"
480     # 2**104 > 36**20 > 2**103
481     n = random.getrandbits(104)
482     for _ in range(20):
483         c = n % 36
484         if c < 10:
485             rid += chr(c+ord('0'))
486         else:
487             rid += chr(c+ord('a')-10)
488         n = n // 36
489     return rid
490
491 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
492     """Return an Arvados cluster's configuration, with caching
493
494     This function gets and returns the Arvados configuration from the API
495     server. It caches the result on the client object and reuses it on any
496     future calls.
497
498     Arguments:
499
500     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
501       object to use to retrieve and cache the Arvados cluster configuration.
502     """
503     if not svc._rootDesc.get('resources').get('configs', False):
504         # Old API server version, no config export endpoint
505         return {}
506     if not hasattr(svc, '_cached_config'):
507         svc._cached_config = svc.configs().get().execute()
508     return svc._cached_config
509
510 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
511     """Return an Arvados cluster's vocabulary, with caching
512
513     This function gets and returns the Arvados vocabulary from the API
514     server. It caches the result on the client object and reuses it on any
515     future calls.
516
517     .. HINT:: Low-level method
518        This is a relatively low-level wrapper around the Arvados API. Most
519        users will prefer to use `arvados.vocabulary.load_vocabulary`.
520
521     Arguments:
522
523     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
524       object to use to retrieve and cache the Arvados cluster vocabulary.
525     """
526     if not svc._rootDesc.get('resources').get('vocabularies', False):
527         # Old API server version, no vocabulary export endpoint
528         return {}
529     if not hasattr(svc, '_cached_vocabulary'):
530         svc._cached_vocabulary = svc.vocabularies().get().execute()
531     return svc._cached_vocabulary
532
533 def trim_name(collectionname: str) -> str:
534     """Limit the length of a name to fit within Arvados API limits
535
536     This function ensures that a string is short enough to use as an object
537     name in the Arvados API, leaving room for text that may be added by the
538     `ensure_unique_name` argument. If the source name is short enough, it is
539     returned unchanged. Otherwise, this function returns a string with excess
540     characters removed from the middle of the source string and replaced with
541     an ellipsis.
542
543     Arguments:
544
545     * collectionname: str --- The desired source name
546     """
547     max_name_len = 254 - 28
548
549     if len(collectionname) > max_name_len:
550         over = len(collectionname) - max_name_len
551         split = int(max_name_len/2)
552         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
553
554     return collectionname
555
556 @_deprecated('3.0', 'arvados.util.keyset_list_all')
557 def list_all(fn, num_retries=0, **kwargs):
558     # Default limit to (effectively) api server's MAX_LIMIT
559     kwargs.setdefault('limit', sys.maxsize)
560     items = []
561     offset = 0
562     items_available = sys.maxsize
563     while len(items) < items_available:
564         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
565         items += c['items']
566         items_available = c['items_available']
567         offset = c['offset'] + len(c['items'])
568     return items
569
570 @_deprecated('3.0')
571 def clear_tmpdir(path=None):
572     """
573     Ensure the given directory (or TASK_TMPDIR if none given)
574     exists and is empty.
575     """
576     from arvados import current_task
577     if path is None:
578         path = current_task().tmpdir
579     if os.path.exists(path):
580         p = subprocess.Popen(['rm', '-rf', path])
581         stdout, stderr = p.communicate(None)
582         if p.returncode != 0:
583             raise Exception('rm -rf %s: %s' % (path, stderr))
584     os.mkdir(path)
585
586 @_deprecated('3.0', 'subprocess.run')
587 def run_command(execargs, **kwargs):
588     kwargs.setdefault('stdin', subprocess.PIPE)
589     kwargs.setdefault('stdout', subprocess.PIPE)
590     kwargs.setdefault('stderr', sys.stderr)
591     kwargs.setdefault('close_fds', True)
592     kwargs.setdefault('shell', False)
593     p = subprocess.Popen(execargs, **kwargs)
594     stdoutdata, stderrdata = p.communicate(None)
595     if p.returncode != 0:
596         raise arvados.errors.CommandFailedError(
597             "run_command %s exit %d:\n%s" %
598             (execargs, p.returncode, stderrdata))
599     return stdoutdata, stderrdata
600
601 @_deprecated('3.0')
602 def git_checkout(url, version, path):
603     from arvados import current_job
604     if not re.search('^/', path):
605         path = os.path.join(current_job().tmpdir, path)
606     if not os.path.exists(path):
607         run_command(["git", "clone", url, path],
608                     cwd=os.path.dirname(path))
609     run_command(["git", "checkout", version],
610                 cwd=path)
611     return path
612
613 @_deprecated('3.0')
614 def tar_extractor(path, decompress_flag):
615     return subprocess.Popen(["tar",
616                              "-C", path,
617                              ("-x%sf" % decompress_flag),
618                              "-"],
619                             stdout=None,
620                             stdin=subprocess.PIPE, stderr=sys.stderr,
621                             shell=False, close_fds=True)
622
623 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
624 def tarball_extract(tarball, path):
625     """Retrieve a tarball from Keep and extract it to a local
626     directory.  Return the absolute path where the tarball was
627     extracted. If the top level of the tarball contained just one
628     file or directory, return the absolute path of that single
629     item.
630
631     tarball -- collection locator
632     path -- where to extract the tarball: absolute, or relative to job tmp
633     """
634     from arvados import current_job
635     from arvados.collection import CollectionReader
636     if not re.search('^/', path):
637         path = os.path.join(current_job().tmpdir, path)
638     lockfile = open(path + '.lock', 'w')
639     fcntl.flock(lockfile, fcntl.LOCK_EX)
640     try:
641         os.stat(path)
642     except OSError:
643         os.mkdir(path)
644     already_have_it = False
645     try:
646         if os.readlink(os.path.join(path, '.locator')) == tarball:
647             already_have_it = True
648     except OSError:
649         pass
650     if not already_have_it:
651
652         # emulate "rm -f" (i.e., if the file does not exist, we win)
653         try:
654             os.unlink(os.path.join(path, '.locator'))
655         except OSError:
656             if os.path.exists(os.path.join(path, '.locator')):
657                 os.unlink(os.path.join(path, '.locator'))
658
659         for f in CollectionReader(tarball).all_files():
660             f_name = f.name()
661             if f_name.endswith(('.tbz', '.tar.bz2')):
662                 p = tar_extractor(path, 'j')
663             elif f_name.endswith(('.tgz', '.tar.gz')):
664                 p = tar_extractor(path, 'z')
665             elif f_name.endswith('.tar'):
666                 p = tar_extractor(path, '')
667             else:
668                 raise arvados.errors.AssertionError(
669                     "tarball_extract cannot handle filename %s" % f.name())
670             while True:
671                 buf = f.read(2**20)
672                 if len(buf) == 0:
673                     break
674                 p.stdin.write(buf)
675             p.stdin.close()
676             p.wait()
677             if p.returncode != 0:
678                 lockfile.close()
679                 raise arvados.errors.CommandFailedError(
680                     "tar exited %d" % p.returncode)
681         os.symlink(tarball, os.path.join(path, '.locator'))
682     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
683     lockfile.close()
684     if len(tld_extracts) == 1:
685         return os.path.join(path, tld_extracts[0])
686     return path
687
688 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
689 def zipball_extract(zipball, path):
690     """Retrieve a zip archive from Keep and extract it to a local
691     directory.  Return the absolute path where the archive was
692     extracted. If the top level of the archive contained just one
693     file or directory, return the absolute path of that single
694     item.
695
696     zipball -- collection locator
697     path -- where to extract the archive: absolute, or relative to job tmp
698     """
699     from arvados import current_job
700     from arvados.collection import CollectionReader
701     if not re.search('^/', path):
702         path = os.path.join(current_job().tmpdir, path)
703     lockfile = open(path + '.lock', 'w')
704     fcntl.flock(lockfile, fcntl.LOCK_EX)
705     try:
706         os.stat(path)
707     except OSError:
708         os.mkdir(path)
709     already_have_it = False
710     try:
711         if os.readlink(os.path.join(path, '.locator')) == zipball:
712             already_have_it = True
713     except OSError:
714         pass
715     if not already_have_it:
716
717         # emulate "rm -f" (i.e., if the file does not exist, we win)
718         try:
719             os.unlink(os.path.join(path, '.locator'))
720         except OSError:
721             if os.path.exists(os.path.join(path, '.locator')):
722                 os.unlink(os.path.join(path, '.locator'))
723
724         for f in CollectionReader(zipball).all_files():
725             if not f.name().endswith('.zip'):
726                 raise arvados.errors.NotImplementedError(
727                     "zipball_extract cannot handle filename %s" % f.name())
728             zip_filename = os.path.join(path, os.path.basename(f.name()))
729             zip_file = open(zip_filename, 'wb')
730             while True:
731                 buf = f.read(2**20)
732                 if len(buf) == 0:
733                     break
734                 zip_file.write(buf)
735             zip_file.close()
736
737             p = subprocess.Popen(["unzip",
738                                   "-q", "-o",
739                                   "-d", path,
740                                   zip_filename],
741                                  stdout=None,
742                                  stdin=None, stderr=sys.stderr,
743                                  shell=False, close_fds=True)
744             p.wait()
745             if p.returncode != 0:
746                 lockfile.close()
747                 raise arvados.errors.CommandFailedError(
748                     "unzip exited %d" % p.returncode)
749             os.unlink(zip_filename)
750         os.symlink(zipball, os.path.join(path, '.locator'))
751     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
752     lockfile.close()
753     if len(tld_extracts) == 1:
754         return os.path.join(path, tld_extracts[0])
755     return path
756
757 @_deprecated('3.0', 'arvados.collection.Collection')
758 def collection_extract(collection, path, files=[], decompress=True):
759     """Retrieve a collection from Keep and extract it to a local
760     directory.  Return the absolute path where the collection was
761     extracted.
762
763     collection -- collection locator
764     path -- where to extract: absolute, or relative to job tmp
765     """
766     from arvados import current_job
767     from arvados.collection import CollectionReader
768     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
769     if matches:
770         collection_hash = matches.group(1)
771     else:
772         collection_hash = hashlib.md5(collection).hexdigest()
773     if not re.search('^/', path):
774         path = os.path.join(current_job().tmpdir, path)
775     lockfile = open(path + '.lock', 'w')
776     fcntl.flock(lockfile, fcntl.LOCK_EX)
777     try:
778         os.stat(path)
779     except OSError:
780         os.mkdir(path)
781     already_have_it = False
782     try:
783         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
784             already_have_it = True
785     except OSError:
786         pass
787
788     # emulate "rm -f" (i.e., if the file does not exist, we win)
789     try:
790         os.unlink(os.path.join(path, '.locator'))
791     except OSError:
792         if os.path.exists(os.path.join(path, '.locator')):
793             os.unlink(os.path.join(path, '.locator'))
794
795     files_got = []
796     for s in CollectionReader(collection).all_streams():
797         stream_name = s.name()
798         for f in s.all_files():
799             if (files == [] or
800                 ((f.name() not in files_got) and
801                  (f.name() in files or
802                   (decompress and f.decompressed_name() in files)))):
803                 outname = f.decompressed_name() if decompress else f.name()
804                 files_got += [outname]
805                 if os.path.exists(os.path.join(path, stream_name, outname)):
806                     continue
807                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
808                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
809                 for buf in (f.readall_decompressed() if decompress
810                             else f.readall()):
811                     outfile.write(buf)
812                 outfile.close()
813     if len(files_got) < len(files):
814         raise arvados.errors.AssertionError(
815             "Wanted files %s but only got %s from %s" %
816             (files, files_got,
817              [z.name() for z in CollectionReader(collection).all_files()]))
818     os.symlink(collection_hash, os.path.join(path, '.locator'))
819
820     lockfile.close()
821     return path
822
823 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
824 def mkdir_dash_p(path):
825     if not os.path.isdir(path):
826         try:
827             os.makedirs(path)
828         except OSError as e:
829             if e.errno == errno.EEXIST and os.path.isdir(path):
830                 # It is not an error if someone else creates the
831                 # directory between our exists() and makedirs() calls.
832                 pass
833             else:
834                 raise
835
836 @_deprecated('3.0', 'arvados.collection.Collection')
837 def stream_extract(stream, path, files=[], decompress=True):
838     """Retrieve a stream from Keep and extract it to a local
839     directory.  Return the absolute path where the stream was
840     extracted.
841
842     stream -- StreamReader object
843     path -- where to extract: absolute, or relative to job tmp
844     """
845     from arvados import current_job
846     if not re.search('^/', path):
847         path = os.path.join(current_job().tmpdir, path)
848     lockfile = open(path + '.lock', 'w')
849     fcntl.flock(lockfile, fcntl.LOCK_EX)
850     try:
851         os.stat(path)
852     except OSError:
853         os.mkdir(path)
854
855     files_got = []
856     for f in stream.all_files():
857         if (files == [] or
858             ((f.name() not in files_got) and
859              (f.name() in files or
860               (decompress and f.decompressed_name() in files)))):
861             outname = f.decompressed_name() if decompress else f.name()
862             files_got += [outname]
863             if os.path.exists(os.path.join(path, outname)):
864                 os.unlink(os.path.join(path, outname))
865             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
866             outfile = open(os.path.join(path, outname), 'wb')
867             for buf in (f.readall_decompressed() if decompress
868                         else f.readall()):
869                 outfile.write(buf)
870             outfile.close()
871     if len(files_got) < len(files):
872         raise arvados.errors.AssertionError(
873             "Wanted files %s but only got %s from %s" %
874             (files, files_got, [z.name() for z in stream.all_files()]))
875     lockfile.close()
876     return path
877
878 @_deprecated('3.0', 'os.walk')
879 def listdir_recursive(dirname, base=None, max_depth=None):
880     """listdir_recursive(dirname, base, max_depth)
881
882     Return a list of file and directory names found under dirname.
883
884     If base is not None, prepend "{base}/" to each returned name.
885
886     If max_depth is None, descend into directories and return only the
887     names of files found in the directory tree.
888
889     If max_depth is a non-negative integer, stop descending into
890     directories at the given depth, and at that point return directory
891     names instead.
892
893     If max_depth==0 (and base is None) this is equivalent to
894     sorted(os.listdir(dirname)).
895     """
896     allfiles = []
897     for ent in sorted(os.listdir(dirname)):
898         ent_path = os.path.join(dirname, ent)
899         ent_base = os.path.join(base, ent) if base else ent
900         if os.path.isdir(ent_path) and max_depth != 0:
901             allfiles += listdir_recursive(
902                 ent_path, base=ent_base,
903                 max_depth=(max_depth-1 if max_depth else None))
904         else:
905             allfiles += [ent_base]
906     return allfiles