21020: Update arv-put to use cache directories from environment
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Arvados utilities
5
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
8 """
9
10 import dataclasses
11 import enum
12 import errno
13 import fcntl
14 import functools
15 import hashlib
16 import httplib2
17 import itertools
18 import os
19 import random
20 import re
21 import stat
22 import subprocess
23 import sys
24 import warnings
25
26 import arvados.errors
27
28 from pathlib import Path, PurePath
29 from typing import (
30     Any,
31     Callable,
32     Dict,
33     Iterator,
34     Mapping,
35     Optional,
36     TypeVar,
37     Union,
38 )
39
40 T = TypeVar('T')
41
42 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
43 """Regular expression to match a hexadecimal string (case-insensitive)"""
44 CR_UNCOMMITTED = 'Uncommitted'
45 """Constant `state` value for uncommited container requests"""
46 CR_COMMITTED = 'Committed'
47 """Constant `state` value for committed container requests"""
48 CR_FINAL = 'Final'
49 """Constant `state` value for finalized container requests"""
50
51 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
52 """Regular expression to match any Keep block locator"""
53 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
54 """Regular expression to match any Keep block locator with an access token hint"""
55 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
56 """Regular expression to match any collection portable data hash"""
57 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
58 """Regular expression to match an Arvados collection manifest text"""
59 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
60 """Regular expression to match a file path from a collection identified by portable data hash"""
61 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
62 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
63
64 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
65 """Regular expression to match any Arvados object UUID"""
66 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
67 """Regular expression to match any Arvados collection UUID"""
68 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
69 """Regular expression to match any Arvados container UUID"""
70 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
71 """Regular expression to match any Arvados group UUID"""
72 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
73 """Regular expression to match any Arvados link UUID"""
74 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
75 """Regular expression to match any Arvados user UUID"""
76 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
77 """Regular expression to match any Arvados job UUID
78
79 .. WARNING:: Deprecated
80    Arvados job resources are deprecated and will be removed in a future
81    release. Prefer the containers API instead.
82 """
83
84 def _deprecated(version=None, preferred=None):
85     """Mark a callable as deprecated in the SDK
86
87     This will wrap the callable to emit as a DeprecationWarning
88     and add a deprecation notice to its docstring.
89
90     If the following arguments are given, they'll be included in the
91     notices:
92
93     * preferred: str | None --- The name of an alternative that users should
94       use instead.
95
96     * version: str | None --- The version of Arvados when the callable is
97       scheduled to be removed.
98     """
99     if version is None:
100         version = ''
101     else:
102         version = f' and scheduled to be removed in Arvados {version}'
103     if preferred is None:
104         preferred = ''
105     else:
106         preferred = f' Prefer {preferred} instead.'
107     def deprecated_decorator(func):
108         fullname = f'{func.__module__}.{func.__qualname__}'
109         parent, _, name = fullname.rpartition('.')
110         if name == '__init__':
111             fullname = parent
112         warning_msg = f'{fullname} is deprecated{version}.{preferred}'
113         @functools.wraps(func)
114         def deprecated_wrapper(*args, **kwargs):
115             warnings.warn(warning_msg, DeprecationWarning, 2)
116             return func(*args, **kwargs)
117         # Get func's docstring without any trailing newline or empty lines.
118         func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
119         match = re.search(r'\n([ \t]+)\S', func_doc)
120         indent = '' if match is None else match.group(1)
121         warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent}   {warning_msg}'
122         # Make the deprecation notice the second "paragraph" of the
123         # docstring if possible. Otherwise append it.
124         docstring, count = re.subn(
125             rf'\n[ \t]*\n{indent}',
126             f'{warning_doc}\n\n{indent}',
127             func_doc,
128             count=1,
129         )
130         if not count:
131             docstring = f'{func_doc.lstrip()}{warning_doc}'
132         deprecated_wrapper.__doc__ = docstring
133         return deprecated_wrapper
134     return deprecated_decorator
135
136 @dataclasses.dataclass
137 class _BaseDirectorySpec:
138     """Parse base directories
139
140     A _BaseDirectorySpec defines all the environment variable keys and defaults
141     related to a set of base directories (cache, config, state, etc.). It
142     provides pure methods to parse environment settings into valid paths.
143     """
144     systemd_key: str
145     xdg_home_key: str
146     xdg_home_default: PurePath
147     xdg_dirs_key: Optional[str] = None
148     xdg_dirs_default: str = ''
149
150     @staticmethod
151     def _abspath_from_env(env: Mapping[str, str], key: str) -> Optional[Path]:
152         try:
153             path = Path(env[key])
154         except (KeyError, ValueError):
155             ok = False
156         else:
157             ok = path.is_absolute()
158         return path if ok else None
159
160     @staticmethod
161     def _iter_abspaths(value: str) -> Iterator[Path]:
162         for path_s in value.split(':'):
163             path = Path(path_s)
164             if path.is_absolute():
165                 yield path
166
167     def iter_systemd(self, env: Mapping[str, str]) -> Iterator[Path]:
168         return self._iter_abspaths(env.get(self.systemd_key, ''))
169
170     def iter_xdg(self, env: Mapping[str, str], subdir: PurePath) -> Iterator[Path]:
171         yield self.xdg_home(env, subdir)
172         if self.xdg_dirs_key is not None:
173             for path in self._iter_abspaths(env.get(self.xdg_dirs_key) or self.xdg_dirs_default):
174                 yield path / subdir
175
176     def xdg_home(self, env: Mapping[str, str], subdir: PurePath) -> Path:
177         home_path = self._abspath_from_env(env, self.xdg_home_key)
178         if home_path is None:
179             home_path = self._abspath_from_env(env, 'HOME') or Path.home()
180             home_path /= self.xdg_home_default
181         return home_path / subdir
182
183
184 class _BaseDirectorySpecs(enum.Enum):
185     """Base directory specifications
186
187     This enum provides easy access to the standard base directory settings.
188     """
189     CACHE = _BaseDirectorySpec(
190         'CACHE_DIRECTORY',
191         'XDG_CACHE_HOME',
192         PurePath('.cache'),
193     )
194     CONFIG = _BaseDirectorySpec(
195         'CONFIGURATION_DIRECTORY',
196         'XDG_CONFIG_HOME',
197         PurePath('.config'),
198         'XDG_CONFIG_DIRS',
199         '/etc/xdg',
200     )
201     STATE = _BaseDirectorySpec(
202         'STATE_DIRECTORY',
203         'XDG_STATE_HOME',
204         PurePath('.local', 'state'),
205     )
206
207
208 class _BaseDirectories:
209     """Resolve paths from a base directory spec
210
211     Given a _BaseDirectorySpec, this class provides stateful methods to find
212     existing files and return the most-preferred directory for writing.
213     """
214     _STORE_MODE = stat.S_IFDIR | stat.S_IWUSR
215
216     def __init__(
217             self,
218             spec: Union[_BaseDirectorySpec, _BaseDirectorySpecs, str],
219             env: Mapping[str, str]=os.environ,
220             xdg_subdir: Union[os.PathLike, str]='arvados',
221     ) -> None:
222         if isinstance(spec, str):
223             spec = _BaseDirectorySpecs[spec].value
224         elif isinstance(spec, _BaseDirectorySpecs):
225             spec = spec.value
226         self._spec = spec
227         self._env = env
228         self._xdg_subdir = PurePath(xdg_subdir)
229
230     def search(self, name: str) -> Iterator[Path]:
231         for search_path in itertools.chain(
232                 self._spec.iter_systemd(self._env),
233                 self._spec.iter_xdg(self._env, self._xdg_subdir),
234         ):
235             path = search_path / name
236             if path.exists():
237                 yield path
238
239     def storage_path(
240             self,
241             subdir: Union[str, os.PathLike]=PurePath(),
242             mode: int=0o700,
243     ) -> Path:
244         for path in self._spec.iter_systemd(self._env):
245             try:
246                 mode = path.stat().st_mode
247             except OSError:
248                 continue
249             if (mode & self._STORE_MODE) == self._STORE_MODE:
250                 break
251         else:
252             path = self._spec.xdg_home(self._env, self._xdg_subdir)
253         path /= subdir
254         path.mkdir(parents=True, exist_ok=True, mode=mode)
255         return path
256
257
258 def is_hex(s: str, *length_args: int) -> bool:
259     """Indicate whether a string is a hexadecimal number
260
261     This method returns true if all characters in the string are hexadecimal
262     digits. It is case-insensitive.
263
264     You can also pass optional length arguments to check that the string has
265     the expected number of digits. If you pass one integer, the string must
266     have that length exactly, otherwise the method returns False. If you
267     pass two integers, the string's length must fall within that minimum and
268     maximum (inclusive), otherwise the method returns False.
269
270     Arguments:
271
272     * s: str --- The string to check
273
274     * length_args: int --- Optional length limit(s) for the string to check
275     """
276     num_length_args = len(length_args)
277     if num_length_args > 2:
278         raise arvados.errors.ArgumentError(
279             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
280     elif num_length_args == 2:
281         good_len = (length_args[0] <= len(s) <= length_args[1])
282     elif num_length_args == 1:
283         good_len = (len(s) == length_args[0])
284     else:
285         good_len = True
286     return bool(good_len and HEX_RE.match(s))
287
288 def keyset_list_all(
289         fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
290         order_key: str="created_at",
291         num_retries: int=0,
292         ascending: bool=True,
293         **kwargs: Any,
294 ) -> Iterator[Dict[str, Any]]:
295     """Iterate all Arvados resources from an API list call
296
297     This method takes a method that represents an Arvados API list call, and
298     iterates the objects returned by the API server. It can make multiple API
299     calls to retrieve and iterate all objects available from the API server.
300
301     Arguments:
302
303     * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
304       function that wraps an Arvados API method that returns a list of
305       objects. If you have an Arvados API client named `arv`, examples
306       include `arv.collections().list` and `arv.groups().contents`. Note
307       that you should pass the function *without* calling it.
308
309     * order_key: str --- The name of the primary object field that objects
310       should be sorted by. This name is used to build an `order` argument
311       for `fn`. Default `'created_at'`.
312
313     * num_retries: int --- This argument is passed through to
314       `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
315       that method's docstring for details. Default 0 (meaning API calls will
316       use the `num_retries` value set when the Arvados API client was
317       constructed).
318
319     * ascending: bool --- Used to build an `order` argument for `fn`. If True,
320       all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
321       fields will be sorted in `'desc'` (descending) order.
322
323     Additional keyword arguments will be passed directly to `fn` for each API
324     call. Note that this function sets `count`, `limit`, and `order` as part of
325     its work.
326     """
327     pagesize = 1000
328     kwargs["limit"] = pagesize
329     kwargs["count"] = 'none'
330     asc = "asc" if ascending else "desc"
331     kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
332     other_filters = kwargs.get("filters", [])
333
334     try:
335         select = set(kwargs['select'])
336     except KeyError:
337         pass
338     else:
339         select.add(order_key)
340         select.add('uuid')
341         kwargs['select'] = list(select)
342
343     nextpage = []
344     tot = 0
345     expect_full_page = True
346     seen_prevpage = set()
347     seen_thispage = set()
348     lastitem = None
349     prev_page_all_same_order_key = False
350
351     while True:
352         kwargs["filters"] = nextpage+other_filters
353         items = fn(**kwargs).execute(num_retries=num_retries)
354
355         if len(items["items"]) == 0:
356             if prev_page_all_same_order_key:
357                 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
358                 prev_page_all_same_order_key = False
359                 continue
360             else:
361                 return
362
363         seen_prevpage = seen_thispage
364         seen_thispage = set()
365
366         for i in items["items"]:
367             # In cases where there's more than one record with the
368             # same order key, the result could include records we
369             # already saw in the last page.  Skip them.
370             if i["uuid"] in seen_prevpage:
371                 continue
372             seen_thispage.add(i["uuid"])
373             yield i
374
375         firstitem = items["items"][0]
376         lastitem = items["items"][-1]
377
378         if firstitem[order_key] == lastitem[order_key]:
379             # Got a page where every item has the same order key.
380             # Switch to using uuid for paging.
381             nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
382             prev_page_all_same_order_key = True
383         else:
384             # Start from the last order key seen, but skip the last
385             # known uuid to avoid retrieving the same row twice.  If
386             # there are multiple rows with the same order key it is
387             # still likely we'll end up retrieving duplicate rows.
388             # That's handled by tracking the "seen" rows for each page
389             # so they can be skipped if they show up on the next page.
390             nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
391             prev_page_all_same_order_key = False
392
393 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
394     """Return the path of the best available source of CA certificates
395
396     This function checks various known paths that provide trusted CA
397     certificates, and returns the first one that exists. It checks:
398
399     * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
400     * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
401     * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
402       distributions
403     * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
404       distributions
405
406     If none of these paths exist, this function returns the value of `fallback`.
407
408     Arguments:
409
410     * fallback: T --- The value to return if none of the known paths exist.
411       The default value is the certificate store of Mozilla's trusted CAs
412       included with the Python [certifi][] package.
413
414     [certifi]: https://pypi.org/project/certifi/
415     """
416     for ca_certs_path in [
417         # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
418         # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
419         os.environ.get('SSL_CERT_FILE'),
420         # Arvados specific:
421         '/etc/arvados/ca-certificates.crt',
422         # Debian:
423         '/etc/ssl/certs/ca-certificates.crt',
424         # Red Hat:
425         '/etc/pki/tls/certs/ca-bundle.crt',
426         ]:
427         if ca_certs_path and os.path.exists(ca_certs_path):
428             return ca_certs_path
429     return fallback
430
431 def new_request_id() -> str:
432     """Return a random request ID
433
434     This function generates and returns a random string suitable for use as a
435     `X-Request-Id` header value in the Arvados API.
436     """
437     rid = "req-"
438     # 2**104 > 36**20 > 2**103
439     n = random.getrandbits(104)
440     for _ in range(20):
441         c = n % 36
442         if c < 10:
443             rid += chr(c+ord('0'))
444         else:
445             rid += chr(c+ord('a')-10)
446         n = n // 36
447     return rid
448
449 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
450     """Return an Arvados cluster's configuration, with caching
451
452     This function gets and returns the Arvados configuration from the API
453     server. It caches the result on the client object and reuses it on any
454     future calls.
455
456     Arguments:
457
458     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
459       object to use to retrieve and cache the Arvados cluster configuration.
460     """
461     if not svc._rootDesc.get('resources').get('configs', False):
462         # Old API server version, no config export endpoint
463         return {}
464     if not hasattr(svc, '_cached_config'):
465         svc._cached_config = svc.configs().get().execute()
466     return svc._cached_config
467
468 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
469     """Return an Arvados cluster's vocabulary, with caching
470
471     This function gets and returns the Arvados vocabulary from the API
472     server. It caches the result on the client object and reuses it on any
473     future calls.
474
475     .. HINT:: Low-level method
476        This is a relatively low-level wrapper around the Arvados API. Most
477        users will prefer to use `arvados.vocabulary.load_vocabulary`.
478
479     Arguments:
480
481     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
482       object to use to retrieve and cache the Arvados cluster vocabulary.
483     """
484     if not svc._rootDesc.get('resources').get('vocabularies', False):
485         # Old API server version, no vocabulary export endpoint
486         return {}
487     if not hasattr(svc, '_cached_vocabulary'):
488         svc._cached_vocabulary = svc.vocabularies().get().execute()
489     return svc._cached_vocabulary
490
491 def trim_name(collectionname: str) -> str:
492     """Limit the length of a name to fit within Arvados API limits
493
494     This function ensures that a string is short enough to use as an object
495     name in the Arvados API, leaving room for text that may be added by the
496     `ensure_unique_name` argument. If the source name is short enough, it is
497     returned unchanged. Otherwise, this function returns a string with excess
498     characters removed from the middle of the source string and replaced with
499     an ellipsis.
500
501     Arguments:
502
503     * collectionname: str --- The desired source name
504     """
505     max_name_len = 254 - 28
506
507     if len(collectionname) > max_name_len:
508         over = len(collectionname) - max_name_len
509         split = int(max_name_len/2)
510         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
511
512     return collectionname
513
514 @_deprecated('3.0', 'arvados.util.keyset_list_all')
515 def list_all(fn, num_retries=0, **kwargs):
516     # Default limit to (effectively) api server's MAX_LIMIT
517     kwargs.setdefault('limit', sys.maxsize)
518     items = []
519     offset = 0
520     items_available = sys.maxsize
521     while len(items) < items_available:
522         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
523         items += c['items']
524         items_available = c['items_available']
525         offset = c['offset'] + len(c['items'])
526     return items
527
528 @_deprecated('3.0')
529 def clear_tmpdir(path=None):
530     """
531     Ensure the given directory (or TASK_TMPDIR if none given)
532     exists and is empty.
533     """
534     from arvados import current_task
535     if path is None:
536         path = current_task().tmpdir
537     if os.path.exists(path):
538         p = subprocess.Popen(['rm', '-rf', path])
539         stdout, stderr = p.communicate(None)
540         if p.returncode != 0:
541             raise Exception('rm -rf %s: %s' % (path, stderr))
542     os.mkdir(path)
543
544 @_deprecated('3.0', 'subprocess.run')
545 def run_command(execargs, **kwargs):
546     kwargs.setdefault('stdin', subprocess.PIPE)
547     kwargs.setdefault('stdout', subprocess.PIPE)
548     kwargs.setdefault('stderr', sys.stderr)
549     kwargs.setdefault('close_fds', True)
550     kwargs.setdefault('shell', False)
551     p = subprocess.Popen(execargs, **kwargs)
552     stdoutdata, stderrdata = p.communicate(None)
553     if p.returncode != 0:
554         raise arvados.errors.CommandFailedError(
555             "run_command %s exit %d:\n%s" %
556             (execargs, p.returncode, stderrdata))
557     return stdoutdata, stderrdata
558
559 @_deprecated('3.0')
560 def git_checkout(url, version, path):
561     from arvados import current_job
562     if not re.search('^/', path):
563         path = os.path.join(current_job().tmpdir, path)
564     if not os.path.exists(path):
565         run_command(["git", "clone", url, path],
566                     cwd=os.path.dirname(path))
567     run_command(["git", "checkout", version],
568                 cwd=path)
569     return path
570
571 @_deprecated('3.0')
572 def tar_extractor(path, decompress_flag):
573     return subprocess.Popen(["tar",
574                              "-C", path,
575                              ("-x%sf" % decompress_flag),
576                              "-"],
577                             stdout=None,
578                             stdin=subprocess.PIPE, stderr=sys.stderr,
579                             shell=False, close_fds=True)
580
581 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
582 def tarball_extract(tarball, path):
583     """Retrieve a tarball from Keep and extract it to a local
584     directory.  Return the absolute path where the tarball was
585     extracted. If the top level of the tarball contained just one
586     file or directory, return the absolute path of that single
587     item.
588
589     tarball -- collection locator
590     path -- where to extract the tarball: absolute, or relative to job tmp
591     """
592     from arvados import current_job
593     from arvados.collection import CollectionReader
594     if not re.search('^/', path):
595         path = os.path.join(current_job().tmpdir, path)
596     lockfile = open(path + '.lock', 'w')
597     fcntl.flock(lockfile, fcntl.LOCK_EX)
598     try:
599         os.stat(path)
600     except OSError:
601         os.mkdir(path)
602     already_have_it = False
603     try:
604         if os.readlink(os.path.join(path, '.locator')) == tarball:
605             already_have_it = True
606     except OSError:
607         pass
608     if not already_have_it:
609
610         # emulate "rm -f" (i.e., if the file does not exist, we win)
611         try:
612             os.unlink(os.path.join(path, '.locator'))
613         except OSError:
614             if os.path.exists(os.path.join(path, '.locator')):
615                 os.unlink(os.path.join(path, '.locator'))
616
617         for f in CollectionReader(tarball).all_files():
618             f_name = f.name()
619             if f_name.endswith(('.tbz', '.tar.bz2')):
620                 p = tar_extractor(path, 'j')
621             elif f_name.endswith(('.tgz', '.tar.gz')):
622                 p = tar_extractor(path, 'z')
623             elif f_name.endswith('.tar'):
624                 p = tar_extractor(path, '')
625             else:
626                 raise arvados.errors.AssertionError(
627                     "tarball_extract cannot handle filename %s" % f.name())
628             while True:
629                 buf = f.read(2**20)
630                 if len(buf) == 0:
631                     break
632                 p.stdin.write(buf)
633             p.stdin.close()
634             p.wait()
635             if p.returncode != 0:
636                 lockfile.close()
637                 raise arvados.errors.CommandFailedError(
638                     "tar exited %d" % p.returncode)
639         os.symlink(tarball, os.path.join(path, '.locator'))
640     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
641     lockfile.close()
642     if len(tld_extracts) == 1:
643         return os.path.join(path, tld_extracts[0])
644     return path
645
646 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
647 def zipball_extract(zipball, path):
648     """Retrieve a zip archive from Keep and extract it to a local
649     directory.  Return the absolute path where the archive was
650     extracted. If the top level of the archive contained just one
651     file or directory, return the absolute path of that single
652     item.
653
654     zipball -- collection locator
655     path -- where to extract the archive: absolute, or relative to job tmp
656     """
657     from arvados import current_job
658     from arvados.collection import CollectionReader
659     if not re.search('^/', path):
660         path = os.path.join(current_job().tmpdir, path)
661     lockfile = open(path + '.lock', 'w')
662     fcntl.flock(lockfile, fcntl.LOCK_EX)
663     try:
664         os.stat(path)
665     except OSError:
666         os.mkdir(path)
667     already_have_it = False
668     try:
669         if os.readlink(os.path.join(path, '.locator')) == zipball:
670             already_have_it = True
671     except OSError:
672         pass
673     if not already_have_it:
674
675         # emulate "rm -f" (i.e., if the file does not exist, we win)
676         try:
677             os.unlink(os.path.join(path, '.locator'))
678         except OSError:
679             if os.path.exists(os.path.join(path, '.locator')):
680                 os.unlink(os.path.join(path, '.locator'))
681
682         for f in CollectionReader(zipball).all_files():
683             if not f.name().endswith('.zip'):
684                 raise arvados.errors.NotImplementedError(
685                     "zipball_extract cannot handle filename %s" % f.name())
686             zip_filename = os.path.join(path, os.path.basename(f.name()))
687             zip_file = open(zip_filename, 'wb')
688             while True:
689                 buf = f.read(2**20)
690                 if len(buf) == 0:
691                     break
692                 zip_file.write(buf)
693             zip_file.close()
694
695             p = subprocess.Popen(["unzip",
696                                   "-q", "-o",
697                                   "-d", path,
698                                   zip_filename],
699                                  stdout=None,
700                                  stdin=None, stderr=sys.stderr,
701                                  shell=False, close_fds=True)
702             p.wait()
703             if p.returncode != 0:
704                 lockfile.close()
705                 raise arvados.errors.CommandFailedError(
706                     "unzip exited %d" % p.returncode)
707             os.unlink(zip_filename)
708         os.symlink(zipball, os.path.join(path, '.locator'))
709     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
710     lockfile.close()
711     if len(tld_extracts) == 1:
712         return os.path.join(path, tld_extracts[0])
713     return path
714
715 @_deprecated('3.0', 'arvados.collection.Collection')
716 def collection_extract(collection, path, files=[], decompress=True):
717     """Retrieve a collection from Keep and extract it to a local
718     directory.  Return the absolute path where the collection was
719     extracted.
720
721     collection -- collection locator
722     path -- where to extract: absolute, or relative to job tmp
723     """
724     from arvados import current_job
725     from arvados.collection import CollectionReader
726     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
727     if matches:
728         collection_hash = matches.group(1)
729     else:
730         collection_hash = hashlib.md5(collection).hexdigest()
731     if not re.search('^/', path):
732         path = os.path.join(current_job().tmpdir, path)
733     lockfile = open(path + '.lock', 'w')
734     fcntl.flock(lockfile, fcntl.LOCK_EX)
735     try:
736         os.stat(path)
737     except OSError:
738         os.mkdir(path)
739     already_have_it = False
740     try:
741         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
742             already_have_it = True
743     except OSError:
744         pass
745
746     # emulate "rm -f" (i.e., if the file does not exist, we win)
747     try:
748         os.unlink(os.path.join(path, '.locator'))
749     except OSError:
750         if os.path.exists(os.path.join(path, '.locator')):
751             os.unlink(os.path.join(path, '.locator'))
752
753     files_got = []
754     for s in CollectionReader(collection).all_streams():
755         stream_name = s.name()
756         for f in s.all_files():
757             if (files == [] or
758                 ((f.name() not in files_got) and
759                  (f.name() in files or
760                   (decompress and f.decompressed_name() in files)))):
761                 outname = f.decompressed_name() if decompress else f.name()
762                 files_got += [outname]
763                 if os.path.exists(os.path.join(path, stream_name, outname)):
764                     continue
765                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
766                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
767                 for buf in (f.readall_decompressed() if decompress
768                             else f.readall()):
769                     outfile.write(buf)
770                 outfile.close()
771     if len(files_got) < len(files):
772         raise arvados.errors.AssertionError(
773             "Wanted files %s but only got %s from %s" %
774             (files, files_got,
775              [z.name() for z in CollectionReader(collection).all_files()]))
776     os.symlink(collection_hash, os.path.join(path, '.locator'))
777
778     lockfile.close()
779     return path
780
781 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
782 def mkdir_dash_p(path):
783     if not os.path.isdir(path):
784         try:
785             os.makedirs(path)
786         except OSError as e:
787             if e.errno == errno.EEXIST and os.path.isdir(path):
788                 # It is not an error if someone else creates the
789                 # directory between our exists() and makedirs() calls.
790                 pass
791             else:
792                 raise
793
794 @_deprecated('3.0', 'arvados.collection.Collection')
795 def stream_extract(stream, path, files=[], decompress=True):
796     """Retrieve a stream from Keep and extract it to a local
797     directory.  Return the absolute path where the stream was
798     extracted.
799
800     stream -- StreamReader object
801     path -- where to extract: absolute, or relative to job tmp
802     """
803     from arvados import current_job
804     if not re.search('^/', path):
805         path = os.path.join(current_job().tmpdir, path)
806     lockfile = open(path + '.lock', 'w')
807     fcntl.flock(lockfile, fcntl.LOCK_EX)
808     try:
809         os.stat(path)
810     except OSError:
811         os.mkdir(path)
812
813     files_got = []
814     for f in stream.all_files():
815         if (files == [] or
816             ((f.name() not in files_got) and
817              (f.name() in files or
818               (decompress and f.decompressed_name() in files)))):
819             outname = f.decompressed_name() if decompress else f.name()
820             files_got += [outname]
821             if os.path.exists(os.path.join(path, outname)):
822                 os.unlink(os.path.join(path, outname))
823             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
824             outfile = open(os.path.join(path, outname), 'wb')
825             for buf in (f.readall_decompressed() if decompress
826                         else f.readall()):
827                 outfile.write(buf)
828             outfile.close()
829     if len(files_got) < len(files):
830         raise arvados.errors.AssertionError(
831             "Wanted files %s but only got %s from %s" %
832             (files, files_got, [z.name() for z in stream.all_files()]))
833     lockfile.close()
834     return path
835
836 @_deprecated('3.0', 'os.walk')
837 def listdir_recursive(dirname, base=None, max_depth=None):
838     """listdir_recursive(dirname, base, max_depth)
839
840     Return a list of file and directory names found under dirname.
841
842     If base is not None, prepend "{base}/" to each returned name.
843
844     If max_depth is None, descend into directories and return only the
845     names of files found in the directory tree.
846
847     If max_depth is a non-negative integer, stop descending into
848     directories at the given depth, and at that point return directory
849     names instead.
850
851     If max_depth==0 (and base is None) this is equivalent to
852     sorted(os.listdir(dirname)).
853     """
854     allfiles = []
855     for ent in sorted(os.listdir(dirname)):
856         ent_path = os.path.join(dirname, ent)
857         ent_base = os.path.join(base, ent) if base else ent
858         if os.path.isdir(ent_path) and max_depth != 0:
859             allfiles += listdir_recursive(
860                 ent_path, base=ent_base,
861                 max_depth=(max_depth-1 if max_depth else None))
862         else:
863             allfiles += [ent_base]
864     return allfiles