21020: Introduce BaseDirectory classes to arvados.util
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Arvados utilities
5
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
8 """
9
10 import dataclasses
11 import enum
12 import errno
13 import fcntl
14 import functools
15 import hashlib
16 import httplib2
17 import itertools
18 import os
19 import random
20 import re
21 import stat
22 import subprocess
23 import sys
24 import warnings
25
26 import arvados.errors
27
28 from pathlib import Path, PurePath
29 from typing import (
30     Any,
31     Callable,
32     Dict,
33     Iterator,
34     Mapping,
35     Optional,
36     TypeVar,
37     Union,
38 )
39
40 T = TypeVar('T')
41
42 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
43 """Regular expression to match a hexadecimal string (case-insensitive)"""
44 CR_UNCOMMITTED = 'Uncommitted'
45 """Constant `state` value for uncommited container requests"""
46 CR_COMMITTED = 'Committed'
47 """Constant `state` value for committed container requests"""
48 CR_FINAL = 'Final'
49 """Constant `state` value for finalized container requests"""
50
51 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
52 """Regular expression to match any Keep block locator"""
53 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
54 """Regular expression to match any Keep block locator with an access token hint"""
55 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
56 """Regular expression to match any collection portable data hash"""
57 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
58 """Regular expression to match an Arvados collection manifest text"""
59 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
60 """Regular expression to match a file path from a collection identified by portable data hash"""
61 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
62 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
63
64 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
65 """Regular expression to match any Arvados object UUID"""
66 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
67 """Regular expression to match any Arvados collection UUID"""
68 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
69 """Regular expression to match any Arvados container UUID"""
70 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
71 """Regular expression to match any Arvados group UUID"""
72 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
73 """Regular expression to match any Arvados link UUID"""
74 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
75 """Regular expression to match any Arvados user UUID"""
76 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
77 """Regular expression to match any Arvados job UUID
78
79 .. WARNING:: Deprecated
80    Arvados job resources are deprecated and will be removed in a future
81    release. Prefer the containers API instead.
82 """
83
84 def _deprecated(version=None, preferred=None):
85     """Mark a callable as deprecated in the SDK
86
87     This will wrap the callable to emit as a DeprecationWarning
88     and add a deprecation notice to its docstring.
89
90     If the following arguments are given, they'll be included in the
91     notices:
92
93     * preferred: str | None --- The name of an alternative that users should
94       use instead.
95
96     * version: str | None --- The version of Arvados when the callable is
97       scheduled to be removed.
98     """
99     if version is None:
100         version = ''
101     else:
102         version = f' and scheduled to be removed in Arvados {version}'
103     if preferred is None:
104         preferred = ''
105     else:
106         preferred = f' Prefer {preferred} instead.'
107     def deprecated_decorator(func):
108         fullname = f'{func.__module__}.{func.__qualname__}'
109         parent, _, name = fullname.rpartition('.')
110         if name == '__init__':
111             fullname = parent
112         warning_msg = f'{fullname} is deprecated{version}.{preferred}'
113         @functools.wraps(func)
114         def deprecated_wrapper(*args, **kwargs):
115             warnings.warn(warning_msg, DeprecationWarning, 2)
116             return func(*args, **kwargs)
117         # Get func's docstring without any trailing newline or empty lines.
118         func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
119         match = re.search(r'\n([ \t]+)\S', func_doc)
120         indent = '' if match is None else match.group(1)
121         warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent}   {warning_msg}'
122         # Make the deprecation notice the second "paragraph" of the
123         # docstring if possible. Otherwise append it.
124         docstring, count = re.subn(
125             rf'\n[ \t]*\n{indent}',
126             f'{warning_doc}\n\n{indent}',
127             func_doc,
128             count=1,
129         )
130         if not count:
131             docstring = f'{func_doc.lstrip()}{warning_doc}'
132         deprecated_wrapper.__doc__ = docstring
133         return deprecated_wrapper
134     return deprecated_decorator
135
136 @dataclasses.dataclass
137 class _BaseDirectorySpec:
138     """Parse base directories
139
140     A _BaseDirectorySpec defines all the environment variable keys and defaults
141     related to a set of base directories (cache, config, state, etc.). It
142     provides pure methods to parse environment settings into valid paths.
143     """
144     systemd_key: str
145     xdg_home_key: str
146     xdg_home_default: PurePath
147     xdg_dirs_key: Optional[str] = None
148     xdg_dirs_default: str = ''
149
150     @staticmethod
151     def _abspath_from_env(env: Mapping[str, str], key: str) -> Optional[Path]:
152         try:
153             path = Path(env[key])
154         except (KeyError, ValueError):
155             ok = False
156         else:
157             ok = path.is_absolute()
158         return path if ok else None
159
160     @staticmethod
161     def _iter_abspaths(value: str) -> Iterator[Path]:
162         for path_s in value.split(':'):
163             path = Path(path_s)
164             if path.is_absolute():
165                 yield path
166
167     def iter_systemd(self, env: Mapping[str, str]) -> Iterator[Path]:
168         return self._iter_abspaths(env.get(self.systemd_key, ''))
169
170     def iter_xdg(self, env: Mapping[str, str], subdir: PurePath) -> Iterator[Path]:
171         yield self.xdg_home(env, subdir)
172         if self.xdg_dirs_key is not None:
173             for path in self._iter_abspaths(env.get(self.xdg_dirs_key) or self.xdg_dirs_default):
174                 yield path / subdir
175
176     def xdg_home(self, env: Mapping[str, str], subdir: PurePath) -> Path:
177         home_path = self._abspath_from_env(env, self.xdg_home_key)
178         if home_path is None:
179             home_path = self._abspath_from_env(env, 'HOME') or Path.home()
180             home_path /= self.xdg_home_default
181         return home_path / subdir
182
183
184 class _BaseDirectorySpecs(enum.Enum):
185     """Base directory specifications
186
187     This enum provides easy access to the standard base directory settings.
188     """
189     CACHE = _BaseDirectorySpec(
190         'CACHE_DIRECTORY',
191         'XDG_CACHE_HOME',
192         PurePath('.cache'),
193     )
194     CONFIG = _BaseDirectorySpec(
195         'CONFIGURATION_DIRECTORY',
196         'XDG_CONFIG_HOME',
197         PurePath('.config'),
198         'XDG_CONFIG_DIRS',
199         '/etc/xdg',
200     )
201     STATE = _BaseDirectorySpec(
202         'STATE_DIRECTORY',
203         'XDG_STATE_HOME',
204         PurePath('.local', 'state'),
205     )
206
207
208 class _BaseDirectories:
209     """Resolve paths from a base directory spec
210
211     Given a _BaseDirectorySpec, this class provides stateful methods to find
212     existing files and return the most-preferred directory for writing.
213     """
214     _STORE_MODE = stat.S_IFDIR | stat.S_IWUSR
215
216     def __init__(
217             self,
218             spec: Union[_BaseDirectorySpec, _BaseDirectorySpecs, str],
219             env: Mapping[str, str]=os.environ,
220             xdg_subdir: Union[os.PathLike, str]='arvados',
221     ) -> None:
222         if isinstance(spec, str):
223             spec = _BaseDirectorySpecs[spec].value
224         elif isinstance(spec, _BaseDirectorySpecs):
225             spec = spec.value
226         self._spec = spec
227         self._env = env
228         self._xdg_subdir = PurePath(xdg_subdir)
229
230     def search(self, name: str) -> Iterator[Path]:
231         for search_path in itertools.chain(
232                 self._spec.iter_systemd(self._env),
233                 self._spec.iter_xdg(self._env, self._xdg_subdir),
234         ):
235             path = search_path / name
236             if path.exists():
237                 yield path
238
239     def storage_path(self) -> Path:
240         for path in self._spec.iter_systemd(self._env):
241             try:
242                 mode = path.stat().st_mode
243             except OSError:
244                 continue
245             if (mode & self._STORE_MODE) == self._STORE_MODE:
246                 return path
247         path = self._spec.xdg_home(self._env, self._xdg_subdir)
248         path.mkdir(parents=True, exist_ok=True)
249         return path
250
251
252 def is_hex(s: str, *length_args: int) -> bool:
253     """Indicate whether a string is a hexadecimal number
254
255     This method returns true if all characters in the string are hexadecimal
256     digits. It is case-insensitive.
257
258     You can also pass optional length arguments to check that the string has
259     the expected number of digits. If you pass one integer, the string must
260     have that length exactly, otherwise the method returns False. If you
261     pass two integers, the string's length must fall within that minimum and
262     maximum (inclusive), otherwise the method returns False.
263
264     Arguments:
265
266     * s: str --- The string to check
267
268     * length_args: int --- Optional length limit(s) for the string to check
269     """
270     num_length_args = len(length_args)
271     if num_length_args > 2:
272         raise arvados.errors.ArgumentError(
273             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
274     elif num_length_args == 2:
275         good_len = (length_args[0] <= len(s) <= length_args[1])
276     elif num_length_args == 1:
277         good_len = (len(s) == length_args[0])
278     else:
279         good_len = True
280     return bool(good_len and HEX_RE.match(s))
281
282 def keyset_list_all(
283         fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
284         order_key: str="created_at",
285         num_retries: int=0,
286         ascending: bool=True,
287         **kwargs: Any,
288 ) -> Iterator[Dict[str, Any]]:
289     """Iterate all Arvados resources from an API list call
290
291     This method takes a method that represents an Arvados API list call, and
292     iterates the objects returned by the API server. It can make multiple API
293     calls to retrieve and iterate all objects available from the API server.
294
295     Arguments:
296
297     * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
298       function that wraps an Arvados API method that returns a list of
299       objects. If you have an Arvados API client named `arv`, examples
300       include `arv.collections().list` and `arv.groups().contents`. Note
301       that you should pass the function *without* calling it.
302
303     * order_key: str --- The name of the primary object field that objects
304       should be sorted by. This name is used to build an `order` argument
305       for `fn`. Default `'created_at'`.
306
307     * num_retries: int --- This argument is passed through to
308       `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
309       that method's docstring for details. Default 0 (meaning API calls will
310       use the `num_retries` value set when the Arvados API client was
311       constructed).
312
313     * ascending: bool --- Used to build an `order` argument for `fn`. If True,
314       all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
315       fields will be sorted in `'desc'` (descending) order.
316
317     Additional keyword arguments will be passed directly to `fn` for each API
318     call. Note that this function sets `count`, `limit`, and `order` as part of
319     its work.
320     """
321     pagesize = 1000
322     kwargs["limit"] = pagesize
323     kwargs["count"] = 'none'
324     asc = "asc" if ascending else "desc"
325     kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
326     other_filters = kwargs.get("filters", [])
327
328     try:
329         select = set(kwargs['select'])
330     except KeyError:
331         pass
332     else:
333         select.add(order_key)
334         select.add('uuid')
335         kwargs['select'] = list(select)
336
337     nextpage = []
338     tot = 0
339     expect_full_page = True
340     seen_prevpage = set()
341     seen_thispage = set()
342     lastitem = None
343     prev_page_all_same_order_key = False
344
345     while True:
346         kwargs["filters"] = nextpage+other_filters
347         items = fn(**kwargs).execute(num_retries=num_retries)
348
349         if len(items["items"]) == 0:
350             if prev_page_all_same_order_key:
351                 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
352                 prev_page_all_same_order_key = False
353                 continue
354             else:
355                 return
356
357         seen_prevpage = seen_thispage
358         seen_thispage = set()
359
360         for i in items["items"]:
361             # In cases where there's more than one record with the
362             # same order key, the result could include records we
363             # already saw in the last page.  Skip them.
364             if i["uuid"] in seen_prevpage:
365                 continue
366             seen_thispage.add(i["uuid"])
367             yield i
368
369         firstitem = items["items"][0]
370         lastitem = items["items"][-1]
371
372         if firstitem[order_key] == lastitem[order_key]:
373             # Got a page where every item has the same order key.
374             # Switch to using uuid for paging.
375             nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
376             prev_page_all_same_order_key = True
377         else:
378             # Start from the last order key seen, but skip the last
379             # known uuid to avoid retrieving the same row twice.  If
380             # there are multiple rows with the same order key it is
381             # still likely we'll end up retrieving duplicate rows.
382             # That's handled by tracking the "seen" rows for each page
383             # so they can be skipped if they show up on the next page.
384             nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
385             prev_page_all_same_order_key = False
386
387 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
388     """Return the path of the best available source of CA certificates
389
390     This function checks various known paths that provide trusted CA
391     certificates, and returns the first one that exists. It checks:
392
393     * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
394     * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
395     * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
396       distributions
397     * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
398       distributions
399
400     If none of these paths exist, this function returns the value of `fallback`.
401
402     Arguments:
403
404     * fallback: T --- The value to return if none of the known paths exist.
405       The default value is the certificate store of Mozilla's trusted CAs
406       included with the Python [certifi][] package.
407
408     [certifi]: https://pypi.org/project/certifi/
409     """
410     for ca_certs_path in [
411         # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
412         # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
413         os.environ.get('SSL_CERT_FILE'),
414         # Arvados specific:
415         '/etc/arvados/ca-certificates.crt',
416         # Debian:
417         '/etc/ssl/certs/ca-certificates.crt',
418         # Red Hat:
419         '/etc/pki/tls/certs/ca-bundle.crt',
420         ]:
421         if ca_certs_path and os.path.exists(ca_certs_path):
422             return ca_certs_path
423     return fallback
424
425 def new_request_id() -> str:
426     """Return a random request ID
427
428     This function generates and returns a random string suitable for use as a
429     `X-Request-Id` header value in the Arvados API.
430     """
431     rid = "req-"
432     # 2**104 > 36**20 > 2**103
433     n = random.getrandbits(104)
434     for _ in range(20):
435         c = n % 36
436         if c < 10:
437             rid += chr(c+ord('0'))
438         else:
439             rid += chr(c+ord('a')-10)
440         n = n // 36
441     return rid
442
443 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
444     """Return an Arvados cluster's configuration, with caching
445
446     This function gets and returns the Arvados configuration from the API
447     server. It caches the result on the client object and reuses it on any
448     future calls.
449
450     Arguments:
451
452     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
453       object to use to retrieve and cache the Arvados cluster configuration.
454     """
455     if not svc._rootDesc.get('resources').get('configs', False):
456         # Old API server version, no config export endpoint
457         return {}
458     if not hasattr(svc, '_cached_config'):
459         svc._cached_config = svc.configs().get().execute()
460     return svc._cached_config
461
462 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
463     """Return an Arvados cluster's vocabulary, with caching
464
465     This function gets and returns the Arvados vocabulary from the API
466     server. It caches the result on the client object and reuses it on any
467     future calls.
468
469     .. HINT:: Low-level method
470        This is a relatively low-level wrapper around the Arvados API. Most
471        users will prefer to use `arvados.vocabulary.load_vocabulary`.
472
473     Arguments:
474
475     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
476       object to use to retrieve and cache the Arvados cluster vocabulary.
477     """
478     if not svc._rootDesc.get('resources').get('vocabularies', False):
479         # Old API server version, no vocabulary export endpoint
480         return {}
481     if not hasattr(svc, '_cached_vocabulary'):
482         svc._cached_vocabulary = svc.vocabularies().get().execute()
483     return svc._cached_vocabulary
484
485 def trim_name(collectionname: str) -> str:
486     """Limit the length of a name to fit within Arvados API limits
487
488     This function ensures that a string is short enough to use as an object
489     name in the Arvados API, leaving room for text that may be added by the
490     `ensure_unique_name` argument. If the source name is short enough, it is
491     returned unchanged. Otherwise, this function returns a string with excess
492     characters removed from the middle of the source string and replaced with
493     an ellipsis.
494
495     Arguments:
496
497     * collectionname: str --- The desired source name
498     """
499     max_name_len = 254 - 28
500
501     if len(collectionname) > max_name_len:
502         over = len(collectionname) - max_name_len
503         split = int(max_name_len/2)
504         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
505
506     return collectionname
507
508 @_deprecated('3.0', 'arvados.util.keyset_list_all')
509 def list_all(fn, num_retries=0, **kwargs):
510     # Default limit to (effectively) api server's MAX_LIMIT
511     kwargs.setdefault('limit', sys.maxsize)
512     items = []
513     offset = 0
514     items_available = sys.maxsize
515     while len(items) < items_available:
516         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
517         items += c['items']
518         items_available = c['items_available']
519         offset = c['offset'] + len(c['items'])
520     return items
521
522 @_deprecated('3.0')
523 def clear_tmpdir(path=None):
524     """
525     Ensure the given directory (or TASK_TMPDIR if none given)
526     exists and is empty.
527     """
528     from arvados import current_task
529     if path is None:
530         path = current_task().tmpdir
531     if os.path.exists(path):
532         p = subprocess.Popen(['rm', '-rf', path])
533         stdout, stderr = p.communicate(None)
534         if p.returncode != 0:
535             raise Exception('rm -rf %s: %s' % (path, stderr))
536     os.mkdir(path)
537
538 @_deprecated('3.0', 'subprocess.run')
539 def run_command(execargs, **kwargs):
540     kwargs.setdefault('stdin', subprocess.PIPE)
541     kwargs.setdefault('stdout', subprocess.PIPE)
542     kwargs.setdefault('stderr', sys.stderr)
543     kwargs.setdefault('close_fds', True)
544     kwargs.setdefault('shell', False)
545     p = subprocess.Popen(execargs, **kwargs)
546     stdoutdata, stderrdata = p.communicate(None)
547     if p.returncode != 0:
548         raise arvados.errors.CommandFailedError(
549             "run_command %s exit %d:\n%s" %
550             (execargs, p.returncode, stderrdata))
551     return stdoutdata, stderrdata
552
553 @_deprecated('3.0')
554 def git_checkout(url, version, path):
555     from arvados import current_job
556     if not re.search('^/', path):
557         path = os.path.join(current_job().tmpdir, path)
558     if not os.path.exists(path):
559         run_command(["git", "clone", url, path],
560                     cwd=os.path.dirname(path))
561     run_command(["git", "checkout", version],
562                 cwd=path)
563     return path
564
565 @_deprecated('3.0')
566 def tar_extractor(path, decompress_flag):
567     return subprocess.Popen(["tar",
568                              "-C", path,
569                              ("-x%sf" % decompress_flag),
570                              "-"],
571                             stdout=None,
572                             stdin=subprocess.PIPE, stderr=sys.stderr,
573                             shell=False, close_fds=True)
574
575 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
576 def tarball_extract(tarball, path):
577     """Retrieve a tarball from Keep and extract it to a local
578     directory.  Return the absolute path where the tarball was
579     extracted. If the top level of the tarball contained just one
580     file or directory, return the absolute path of that single
581     item.
582
583     tarball -- collection locator
584     path -- where to extract the tarball: absolute, or relative to job tmp
585     """
586     from arvados import current_job
587     from arvados.collection import CollectionReader
588     if not re.search('^/', path):
589         path = os.path.join(current_job().tmpdir, path)
590     lockfile = open(path + '.lock', 'w')
591     fcntl.flock(lockfile, fcntl.LOCK_EX)
592     try:
593         os.stat(path)
594     except OSError:
595         os.mkdir(path)
596     already_have_it = False
597     try:
598         if os.readlink(os.path.join(path, '.locator')) == tarball:
599             already_have_it = True
600     except OSError:
601         pass
602     if not already_have_it:
603
604         # emulate "rm -f" (i.e., if the file does not exist, we win)
605         try:
606             os.unlink(os.path.join(path, '.locator'))
607         except OSError:
608             if os.path.exists(os.path.join(path, '.locator')):
609                 os.unlink(os.path.join(path, '.locator'))
610
611         for f in CollectionReader(tarball).all_files():
612             f_name = f.name()
613             if f_name.endswith(('.tbz', '.tar.bz2')):
614                 p = tar_extractor(path, 'j')
615             elif f_name.endswith(('.tgz', '.tar.gz')):
616                 p = tar_extractor(path, 'z')
617             elif f_name.endswith('.tar'):
618                 p = tar_extractor(path, '')
619             else:
620                 raise arvados.errors.AssertionError(
621                     "tarball_extract cannot handle filename %s" % f.name())
622             while True:
623                 buf = f.read(2**20)
624                 if len(buf) == 0:
625                     break
626                 p.stdin.write(buf)
627             p.stdin.close()
628             p.wait()
629             if p.returncode != 0:
630                 lockfile.close()
631                 raise arvados.errors.CommandFailedError(
632                     "tar exited %d" % p.returncode)
633         os.symlink(tarball, os.path.join(path, '.locator'))
634     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
635     lockfile.close()
636     if len(tld_extracts) == 1:
637         return os.path.join(path, tld_extracts[0])
638     return path
639
640 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
641 def zipball_extract(zipball, path):
642     """Retrieve a zip archive from Keep and extract it to a local
643     directory.  Return the absolute path where the archive was
644     extracted. If the top level of the archive contained just one
645     file or directory, return the absolute path of that single
646     item.
647
648     zipball -- collection locator
649     path -- where to extract the archive: absolute, or relative to job tmp
650     """
651     from arvados import current_job
652     from arvados.collection import CollectionReader
653     if not re.search('^/', path):
654         path = os.path.join(current_job().tmpdir, path)
655     lockfile = open(path + '.lock', 'w')
656     fcntl.flock(lockfile, fcntl.LOCK_EX)
657     try:
658         os.stat(path)
659     except OSError:
660         os.mkdir(path)
661     already_have_it = False
662     try:
663         if os.readlink(os.path.join(path, '.locator')) == zipball:
664             already_have_it = True
665     except OSError:
666         pass
667     if not already_have_it:
668
669         # emulate "rm -f" (i.e., if the file does not exist, we win)
670         try:
671             os.unlink(os.path.join(path, '.locator'))
672         except OSError:
673             if os.path.exists(os.path.join(path, '.locator')):
674                 os.unlink(os.path.join(path, '.locator'))
675
676         for f in CollectionReader(zipball).all_files():
677             if not f.name().endswith('.zip'):
678                 raise arvados.errors.NotImplementedError(
679                     "zipball_extract cannot handle filename %s" % f.name())
680             zip_filename = os.path.join(path, os.path.basename(f.name()))
681             zip_file = open(zip_filename, 'wb')
682             while True:
683                 buf = f.read(2**20)
684                 if len(buf) == 0:
685                     break
686                 zip_file.write(buf)
687             zip_file.close()
688
689             p = subprocess.Popen(["unzip",
690                                   "-q", "-o",
691                                   "-d", path,
692                                   zip_filename],
693                                  stdout=None,
694                                  stdin=None, stderr=sys.stderr,
695                                  shell=False, close_fds=True)
696             p.wait()
697             if p.returncode != 0:
698                 lockfile.close()
699                 raise arvados.errors.CommandFailedError(
700                     "unzip exited %d" % p.returncode)
701             os.unlink(zip_filename)
702         os.symlink(zipball, os.path.join(path, '.locator'))
703     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
704     lockfile.close()
705     if len(tld_extracts) == 1:
706         return os.path.join(path, tld_extracts[0])
707     return path
708
709 @_deprecated('3.0', 'arvados.collection.Collection')
710 def collection_extract(collection, path, files=[], decompress=True):
711     """Retrieve a collection from Keep and extract it to a local
712     directory.  Return the absolute path where the collection was
713     extracted.
714
715     collection -- collection locator
716     path -- where to extract: absolute, or relative to job tmp
717     """
718     from arvados import current_job
719     from arvados.collection import CollectionReader
720     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
721     if matches:
722         collection_hash = matches.group(1)
723     else:
724         collection_hash = hashlib.md5(collection).hexdigest()
725     if not re.search('^/', path):
726         path = os.path.join(current_job().tmpdir, path)
727     lockfile = open(path + '.lock', 'w')
728     fcntl.flock(lockfile, fcntl.LOCK_EX)
729     try:
730         os.stat(path)
731     except OSError:
732         os.mkdir(path)
733     already_have_it = False
734     try:
735         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
736             already_have_it = True
737     except OSError:
738         pass
739
740     # emulate "rm -f" (i.e., if the file does not exist, we win)
741     try:
742         os.unlink(os.path.join(path, '.locator'))
743     except OSError:
744         if os.path.exists(os.path.join(path, '.locator')):
745             os.unlink(os.path.join(path, '.locator'))
746
747     files_got = []
748     for s in CollectionReader(collection).all_streams():
749         stream_name = s.name()
750         for f in s.all_files():
751             if (files == [] or
752                 ((f.name() not in files_got) and
753                  (f.name() in files or
754                   (decompress and f.decompressed_name() in files)))):
755                 outname = f.decompressed_name() if decompress else f.name()
756                 files_got += [outname]
757                 if os.path.exists(os.path.join(path, stream_name, outname)):
758                     continue
759                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
760                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
761                 for buf in (f.readall_decompressed() if decompress
762                             else f.readall()):
763                     outfile.write(buf)
764                 outfile.close()
765     if len(files_got) < len(files):
766         raise arvados.errors.AssertionError(
767             "Wanted files %s but only got %s from %s" %
768             (files, files_got,
769              [z.name() for z in CollectionReader(collection).all_files()]))
770     os.symlink(collection_hash, os.path.join(path, '.locator'))
771
772     lockfile.close()
773     return path
774
775 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
776 def mkdir_dash_p(path):
777     if not os.path.isdir(path):
778         try:
779             os.makedirs(path)
780         except OSError as e:
781             if e.errno == errno.EEXIST and os.path.isdir(path):
782                 # It is not an error if someone else creates the
783                 # directory between our exists() and makedirs() calls.
784                 pass
785             else:
786                 raise
787
788 @_deprecated('3.0', 'arvados.collection.Collection')
789 def stream_extract(stream, path, files=[], decompress=True):
790     """Retrieve a stream from Keep and extract it to a local
791     directory.  Return the absolute path where the stream was
792     extracted.
793
794     stream -- StreamReader object
795     path -- where to extract: absolute, or relative to job tmp
796     """
797     from arvados import current_job
798     if not re.search('^/', path):
799         path = os.path.join(current_job().tmpdir, path)
800     lockfile = open(path + '.lock', 'w')
801     fcntl.flock(lockfile, fcntl.LOCK_EX)
802     try:
803         os.stat(path)
804     except OSError:
805         os.mkdir(path)
806
807     files_got = []
808     for f in stream.all_files():
809         if (files == [] or
810             ((f.name() not in files_got) and
811              (f.name() in files or
812               (decompress and f.decompressed_name() in files)))):
813             outname = f.decompressed_name() if decompress else f.name()
814             files_got += [outname]
815             if os.path.exists(os.path.join(path, outname)):
816                 os.unlink(os.path.join(path, outname))
817             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
818             outfile = open(os.path.join(path, outname), 'wb')
819             for buf in (f.readall_decompressed() if decompress
820                         else f.readall()):
821                 outfile.write(buf)
822             outfile.close()
823     if len(files_got) < len(files):
824         raise arvados.errors.AssertionError(
825             "Wanted files %s but only got %s from %s" %
826             (files, files_got, [z.name() for z in stream.all_files()]))
827     lockfile.close()
828     return path
829
830 @_deprecated('3.0', 'os.walk')
831 def listdir_recursive(dirname, base=None, max_depth=None):
832     """listdir_recursive(dirname, base, max_depth)
833
834     Return a list of file and directory names found under dirname.
835
836     If base is not None, prepend "{base}/" to each returned name.
837
838     If max_depth is None, descend into directories and return only the
839     names of files found in the directory tree.
840
841     If max_depth is a non-negative integer, stop descending into
842     directories at the given depth, and at that point return directory
843     names instead.
844
845     If max_depth==0 (and base is None) this is equivalent to
846     sorted(os.listdir(dirname)).
847     """
848     allfiles = []
849     for ent in sorted(os.listdir(dirname)):
850         ent_path = os.path.join(dirname, ent)
851         ent_base = os.path.join(base, ent) if base else ent
852         if os.path.isdir(ent_path) and max_depth != 0:
853             allfiles += listdir_recursive(
854                 ent_path, base=ent_base,
855                 max_depth=(max_depth-1 if max_depth else None))
856         else:
857             allfiles += [ent_base]
858     return allfiles