21639: Adjust test mocking
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Arvados utilities
5
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
8 """
9
10 import errno
11 import fcntl
12 import functools
13 import hashlib
14 import httplib2
15 import os
16 import random
17 import re
18 import subprocess
19 import sys
20 import warnings
21
22 import arvados.errors
23
24 from typing import (
25     Any,
26     Callable,
27     Dict,
28     Iterator,
29     TypeVar,
30     Union,
31 )
32
33 T = TypeVar('T')
34
35 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
36 """Regular expression to match a hexadecimal string (case-insensitive)"""
37 CR_UNCOMMITTED = 'Uncommitted'
38 """Constant `state` value for uncommited container requests"""
39 CR_COMMITTED = 'Committed'
40 """Constant `state` value for committed container requests"""
41 CR_FINAL = 'Final'
42 """Constant `state` value for finalized container requests"""
43
44 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
45 """Regular expression to match any Keep block locator"""
46 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
47 """Regular expression to match any Keep block locator with an access token hint"""
48 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
49 """Regular expression to match any collection portable data hash"""
50 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
51 """Regular expression to match an Arvados collection manifest text"""
52 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
53 """Regular expression to match a file path from a collection identified by portable data hash"""
54 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
55 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
56
57 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
58 """Regular expression to match any Arvados object UUID"""
59 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
60 """Regular expression to match any Arvados collection UUID"""
61 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
62 """Regular expression to match any Arvados container UUID"""
63 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
64 """Regular expression to match any Arvados group UUID"""
65 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
66 """Regular expression to match any Arvados link UUID"""
67 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
68 """Regular expression to match any Arvados user UUID"""
69 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
70 """Regular expression to match any Arvados job UUID
71
72 .. WARNING:: Deprecated
73    Arvados job resources are deprecated and will be removed in a future
74    release. Prefer the containers API instead.
75 """
76
77 def _deprecated(version=None, preferred=None):
78     """Mark a callable as deprecated in the SDK
79
80     This will wrap the callable to emit as a DeprecationWarning
81     and add a deprecation notice to its docstring.
82
83     If the following arguments are given, they'll be included in the
84     notices:
85
86     * preferred: str | None --- The name of an alternative that users should
87       use instead.
88
89     * version: str | None --- The version of Arvados when the callable is
90       scheduled to be removed.
91     """
92     if version is None:
93         version = ''
94     else:
95         version = f' and scheduled to be removed in Arvados {version}'
96     if preferred is None:
97         preferred = ''
98     else:
99         preferred = f' Prefer {preferred} instead.'
100     def deprecated_decorator(func):
101         fullname = f'{func.__module__}.{func.__qualname__}'
102         parent, _, name = fullname.rpartition('.')
103         if name == '__init__':
104             fullname = parent
105         warning_msg = f'{fullname} is deprecated{version}.{preferred}'
106         @functools.wraps(func)
107         def deprecated_wrapper(*args, **kwargs):
108             warnings.warn(warning_msg, DeprecationWarning, 2)
109             return func(*args, **kwargs)
110         # Get func's docstring without any trailing newline or empty lines.
111         func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
112         match = re.search(r'\n([ \t]+)\S', func_doc)
113         indent = '' if match is None else match.group(1)
114         warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent}   {warning_msg}'
115         # Make the deprecation notice the second "paragraph" of the
116         # docstring if possible. Otherwise append it.
117         docstring, count = re.subn(
118             rf'\n[ \t]*\n{indent}',
119             f'{warning_doc}\n\n{indent}',
120             func_doc,
121             count=1,
122         )
123         if not count:
124             docstring = f'{func_doc.lstrip()}{warning_doc}'
125         deprecated_wrapper.__doc__ = docstring
126         return deprecated_wrapper
127     return deprecated_decorator
128
129 def is_hex(s: str, *length_args: int) -> bool:
130     """Indicate whether a string is a hexadecimal number
131
132     This method returns true if all characters in the string are hexadecimal
133     digits. It is case-insensitive.
134
135     You can also pass optional length arguments to check that the string has
136     the expected number of digits. If you pass one integer, the string must
137     have that length exactly, otherwise the method returns False. If you
138     pass two integers, the string's length must fall within that minimum and
139     maximum (inclusive), otherwise the method returns False.
140
141     Arguments:
142
143     * s: str --- The string to check
144
145     * length_args: int --- Optional length limit(s) for the string to check
146     """
147     num_length_args = len(length_args)
148     if num_length_args > 2:
149         raise arvados.errors.ArgumentError(
150             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
151     elif num_length_args == 2:
152         good_len = (length_args[0] <= len(s) <= length_args[1])
153     elif num_length_args == 1:
154         good_len = (len(s) == length_args[0])
155     else:
156         good_len = True
157     return bool(good_len and HEX_RE.match(s))
158
159 def keyset_list_all(
160         fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
161         order_key: str="created_at",
162         num_retries: int=0,
163         ascending: bool=True,
164         **kwargs: Any,
165 ) -> Iterator[Dict[str, Any]]:
166     """Iterate all Arvados resources from an API list call
167
168     This method takes a method that represents an Arvados API list call, and
169     iterates the objects returned by the API server. It can make multiple API
170     calls to retrieve and iterate all objects available from the API server.
171
172     Arguments:
173
174     * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
175       function that wraps an Arvados API method that returns a list of
176       objects. If you have an Arvados API client named `arv`, examples
177       include `arv.collections().list` and `arv.groups().contents`. Note
178       that you should pass the function *without* calling it.
179
180     * order_key: str --- The name of the primary object field that objects
181       should be sorted by. This name is used to build an `order` argument
182       for `fn`. Default `'created_at'`.
183
184     * num_retries: int --- This argument is passed through to
185       `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
186       that method's docstring for details. Default 0 (meaning API calls will
187       use the `num_retries` value set when the Arvados API client was
188       constructed).
189
190     * ascending: bool --- Used to build an `order` argument for `fn`. If True,
191       all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
192       fields will be sorted in `'desc'` (descending) order.
193
194     Additional keyword arguments will be passed directly to `fn` for each API
195     call. Note that this function sets `count`, `limit`, and `order` as part of
196     its work.
197     """
198     pagesize = 1000
199     kwargs["limit"] = pagesize
200     kwargs["count"] = 'none'
201     asc = "asc" if ascending else "desc"
202     kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
203     other_filters = kwargs.get("filters", [])
204
205     try:
206         select = set(kwargs['select'])
207     except KeyError:
208         pass
209     else:
210         select.add(order_key)
211         select.add('uuid')
212         kwargs['select'] = list(select)
213
214     nextpage = []
215     tot = 0
216     expect_full_page = True
217     seen_prevpage = set()
218     seen_thispage = set()
219     lastitem = None
220     prev_page_all_same_order_key = False
221
222     while True:
223         kwargs["filters"] = nextpage+other_filters
224         items = fn(**kwargs).execute(num_retries=num_retries)
225
226         if len(items["items"]) == 0:
227             if prev_page_all_same_order_key:
228                 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
229                 prev_page_all_same_order_key = False
230                 continue
231             else:
232                 return
233
234         seen_prevpage = seen_thispage
235         seen_thispage = set()
236
237         for i in items["items"]:
238             # In cases where there's more than one record with the
239             # same order key, the result could include records we
240             # already saw in the last page.  Skip them.
241             if i["uuid"] in seen_prevpage:
242                 continue
243             seen_thispage.add(i["uuid"])
244             yield i
245
246         firstitem = items["items"][0]
247         lastitem = items["items"][-1]
248
249         if firstitem[order_key] == lastitem[order_key]:
250             # Got a page where every item has the same order key.
251             # Switch to using uuid for paging.
252             nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
253             prev_page_all_same_order_key = True
254         else:
255             # Start from the last order key seen, but skip the last
256             # known uuid to avoid retrieving the same row twice.  If
257             # there are multiple rows with the same order key it is
258             # still likely we'll end up retrieving duplicate rows.
259             # That's handled by tracking the "seen" rows for each page
260             # so they can be skipped if they show up on the next page.
261             nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
262             prev_page_all_same_order_key = False
263
264 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
265     """Return the path of the best available source of CA certificates
266
267     This function checks various known paths that provide trusted CA
268     certificates, and returns the first one that exists. It checks:
269
270     * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
271     * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
272     * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
273       distributions
274     * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
275       distributions
276
277     If none of these paths exist, this function returns the value of `fallback`.
278
279     Arguments:
280
281     * fallback: T --- The value to return if none of the known paths exist.
282       The default value is the certificate store of Mozilla's trusted CAs
283       included with the Python [certifi][] package.
284
285     [certifi]: https://pypi.org/project/certifi/
286     """
287     for ca_certs_path in [
288         # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
289         # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
290         os.environ.get('SSL_CERT_FILE'),
291         # Arvados specific:
292         '/etc/arvados/ca-certificates.crt',
293         # Debian:
294         '/etc/ssl/certs/ca-certificates.crt',
295         # Red Hat:
296         '/etc/pki/tls/certs/ca-bundle.crt',
297         ]:
298         if ca_certs_path and os.path.exists(ca_certs_path):
299             return ca_certs_path
300     return fallback
301
302 def new_request_id() -> str:
303     """Return a random request ID
304
305     This function generates and returns a random string suitable for use as a
306     `X-Request-Id` header value in the Arvados API.
307     """
308     rid = "req-"
309     # 2**104 > 36**20 > 2**103
310     n = random.getrandbits(104)
311     for _ in range(20):
312         c = n % 36
313         if c < 10:
314             rid += chr(c+ord('0'))
315         else:
316             rid += chr(c+ord('a')-10)
317         n = n // 36
318     return rid
319
320 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
321     """Return an Arvados cluster's configuration, with caching
322
323     This function gets and returns the Arvados configuration from the API
324     server. It caches the result on the client object and reuses it on any
325     future calls.
326
327     Arguments:
328
329     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
330       object to use to retrieve and cache the Arvados cluster configuration.
331     """
332     if not svc._rootDesc.get('resources').get('configs', False):
333         # Old API server version, no config export endpoint
334         return {}
335     if not hasattr(svc, '_cached_config'):
336         svc._cached_config = svc.configs().get().execute()
337     return svc._cached_config
338
339 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
340     """Return an Arvados cluster's vocabulary, with caching
341
342     This function gets and returns the Arvados vocabulary from the API
343     server. It caches the result on the client object and reuses it on any
344     future calls.
345
346     .. HINT:: Low-level method
347        This is a relatively low-level wrapper around the Arvados API. Most
348        users will prefer to use `arvados.vocabulary.load_vocabulary`.
349
350     Arguments:
351
352     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
353       object to use to retrieve and cache the Arvados cluster vocabulary.
354     """
355     if not svc._rootDesc.get('resources').get('vocabularies', False):
356         # Old API server version, no vocabulary export endpoint
357         return {}
358     if not hasattr(svc, '_cached_vocabulary'):
359         svc._cached_vocabulary = svc.vocabularies().get().execute()
360     return svc._cached_vocabulary
361
362 def trim_name(collectionname: str) -> str:
363     """Limit the length of a name to fit within Arvados API limits
364
365     This function ensures that a string is short enough to use as an object
366     name in the Arvados API, leaving room for text that may be added by the
367     `ensure_unique_name` argument. If the source name is short enough, it is
368     returned unchanged. Otherwise, this function returns a string with excess
369     characters removed from the middle of the source string and replaced with
370     an ellipsis.
371
372     Arguments:
373
374     * collectionname: str --- The desired source name
375     """
376     max_name_len = 254 - 28
377
378     if len(collectionname) > max_name_len:
379         over = len(collectionname) - max_name_len
380         split = int(max_name_len/2)
381         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
382
383     return collectionname
384
385 @_deprecated('3.0', 'arvados.util.keyset_list_all')
386 def list_all(fn, num_retries=0, **kwargs):
387     # Default limit to (effectively) api server's MAX_LIMIT
388     kwargs.setdefault('limit', sys.maxsize)
389     items = []
390     offset = 0
391     items_available = sys.maxsize
392     while len(items) < items_available:
393         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
394         items += c['items']
395         items_available = c['items_available']
396         offset = c['offset'] + len(c['items'])
397     return items
398
399 @_deprecated('3.0')
400 def clear_tmpdir(path=None):
401     """
402     Ensure the given directory (or TASK_TMPDIR if none given)
403     exists and is empty.
404     """
405     from arvados import current_task
406     if path is None:
407         path = current_task().tmpdir
408     if os.path.exists(path):
409         p = subprocess.Popen(['rm', '-rf', path])
410         stdout, stderr = p.communicate(None)
411         if p.returncode != 0:
412             raise Exception('rm -rf %s: %s' % (path, stderr))
413     os.mkdir(path)
414
415 @_deprecated('3.0', 'subprocess.run')
416 def run_command(execargs, **kwargs):
417     kwargs.setdefault('stdin', subprocess.PIPE)
418     kwargs.setdefault('stdout', subprocess.PIPE)
419     kwargs.setdefault('stderr', sys.stderr)
420     kwargs.setdefault('close_fds', True)
421     kwargs.setdefault('shell', False)
422     p = subprocess.Popen(execargs, **kwargs)
423     stdoutdata, stderrdata = p.communicate(None)
424     if p.returncode != 0:
425         raise arvados.errors.CommandFailedError(
426             "run_command %s exit %d:\n%s" %
427             (execargs, p.returncode, stderrdata))
428     return stdoutdata, stderrdata
429
430 @_deprecated('3.0')
431 def git_checkout(url, version, path):
432     from arvados import current_job
433     if not re.search('^/', path):
434         path = os.path.join(current_job().tmpdir, path)
435     if not os.path.exists(path):
436         run_command(["git", "clone", url, path],
437                     cwd=os.path.dirname(path))
438     run_command(["git", "checkout", version],
439                 cwd=path)
440     return path
441
442 @_deprecated('3.0')
443 def tar_extractor(path, decompress_flag):
444     return subprocess.Popen(["tar",
445                              "-C", path,
446                              ("-x%sf" % decompress_flag),
447                              "-"],
448                             stdout=None,
449                             stdin=subprocess.PIPE, stderr=sys.stderr,
450                             shell=False, close_fds=True)
451
452 @_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
453 def tarball_extract(tarball, path):
454     """Retrieve a tarball from Keep and extract it to a local
455     directory.  Return the absolute path where the tarball was
456     extracted. If the top level of the tarball contained just one
457     file or directory, return the absolute path of that single
458     item.
459
460     tarball -- collection locator
461     path -- where to extract the tarball: absolute, or relative to job tmp
462     """
463     from arvados import current_job
464     from arvados.collection import CollectionReader
465     if not re.search('^/', path):
466         path = os.path.join(current_job().tmpdir, path)
467     lockfile = open(path + '.lock', 'w')
468     fcntl.flock(lockfile, fcntl.LOCK_EX)
469     try:
470         os.stat(path)
471     except OSError:
472         os.mkdir(path)
473     already_have_it = False
474     try:
475         if os.readlink(os.path.join(path, '.locator')) == tarball:
476             already_have_it = True
477     except OSError:
478         pass
479     if not already_have_it:
480
481         # emulate "rm -f" (i.e., if the file does not exist, we win)
482         try:
483             os.unlink(os.path.join(path, '.locator'))
484         except OSError:
485             if os.path.exists(os.path.join(path, '.locator')):
486                 os.unlink(os.path.join(path, '.locator'))
487
488         for f in CollectionReader(tarball).all_files():
489             f_name = f.name()
490             if f_name.endswith(('.tbz', '.tar.bz2')):
491                 p = tar_extractor(path, 'j')
492             elif f_name.endswith(('.tgz', '.tar.gz')):
493                 p = tar_extractor(path, 'z')
494             elif f_name.endswith('.tar'):
495                 p = tar_extractor(path, '')
496             else:
497                 raise arvados.errors.AssertionError(
498                     "tarball_extract cannot handle filename %s" % f.name())
499             while True:
500                 buf = f.read(2**20)
501                 if len(buf) == 0:
502                     break
503                 p.stdin.write(buf)
504             p.stdin.close()
505             p.wait()
506             if p.returncode != 0:
507                 lockfile.close()
508                 raise arvados.errors.CommandFailedError(
509                     "tar exited %d" % p.returncode)
510         os.symlink(tarball, os.path.join(path, '.locator'))
511     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
512     lockfile.close()
513     if len(tld_extracts) == 1:
514         return os.path.join(path, tld_extracts[0])
515     return path
516
517 @_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
518 def zipball_extract(zipball, path):
519     """Retrieve a zip archive from Keep and extract it to a local
520     directory.  Return the absolute path where the archive was
521     extracted. If the top level of the archive contained just one
522     file or directory, return the absolute path of that single
523     item.
524
525     zipball -- collection locator
526     path -- where to extract the archive: absolute, or relative to job tmp
527     """
528     from arvados import current_job
529     from arvados.collection import CollectionReader
530     if not re.search('^/', path):
531         path = os.path.join(current_job().tmpdir, path)
532     lockfile = open(path + '.lock', 'w')
533     fcntl.flock(lockfile, fcntl.LOCK_EX)
534     try:
535         os.stat(path)
536     except OSError:
537         os.mkdir(path)
538     already_have_it = False
539     try:
540         if os.readlink(os.path.join(path, '.locator')) == zipball:
541             already_have_it = True
542     except OSError:
543         pass
544     if not already_have_it:
545
546         # emulate "rm -f" (i.e., if the file does not exist, we win)
547         try:
548             os.unlink(os.path.join(path, '.locator'))
549         except OSError:
550             if os.path.exists(os.path.join(path, '.locator')):
551                 os.unlink(os.path.join(path, '.locator'))
552
553         for f in CollectionReader(zipball).all_files():
554             if not f.name().endswith('.zip'):
555                 raise arvados.errors.NotImplementedError(
556                     "zipball_extract cannot handle filename %s" % f.name())
557             zip_filename = os.path.join(path, os.path.basename(f.name()))
558             zip_file = open(zip_filename, 'wb')
559             while True:
560                 buf = f.read(2**20)
561                 if len(buf) == 0:
562                     break
563                 zip_file.write(buf)
564             zip_file.close()
565
566             p = subprocess.Popen(["unzip",
567                                   "-q", "-o",
568                                   "-d", path,
569                                   zip_filename],
570                                  stdout=None,
571                                  stdin=None, stderr=sys.stderr,
572                                  shell=False, close_fds=True)
573             p.wait()
574             if p.returncode != 0:
575                 lockfile.close()
576                 raise arvados.errors.CommandFailedError(
577                     "unzip exited %d" % p.returncode)
578             os.unlink(zip_filename)
579         os.symlink(zipball, os.path.join(path, '.locator'))
580     tld_extracts = [f for f in os.listdir(path) if f != '.locator']
581     lockfile.close()
582     if len(tld_extracts) == 1:
583         return os.path.join(path, tld_extracts[0])
584     return path
585
586 @_deprecated('3.0', 'arvados.collection.Collection')
587 def collection_extract(collection, path, files=[], decompress=True):
588     """Retrieve a collection from Keep and extract it to a local
589     directory.  Return the absolute path where the collection was
590     extracted.
591
592     collection -- collection locator
593     path -- where to extract: absolute, or relative to job tmp
594     """
595     from arvados import current_job
596     from arvados.collection import CollectionReader
597     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
598     if matches:
599         collection_hash = matches.group(1)
600     else:
601         collection_hash = hashlib.md5(collection).hexdigest()
602     if not re.search('^/', path):
603         path = os.path.join(current_job().tmpdir, path)
604     lockfile = open(path + '.lock', 'w')
605     fcntl.flock(lockfile, fcntl.LOCK_EX)
606     try:
607         os.stat(path)
608     except OSError:
609         os.mkdir(path)
610     already_have_it = False
611     try:
612         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
613             already_have_it = True
614     except OSError:
615         pass
616
617     # emulate "rm -f" (i.e., if the file does not exist, we win)
618     try:
619         os.unlink(os.path.join(path, '.locator'))
620     except OSError:
621         if os.path.exists(os.path.join(path, '.locator')):
622             os.unlink(os.path.join(path, '.locator'))
623
624     files_got = []
625     for s in CollectionReader(collection).all_streams():
626         stream_name = s.name()
627         for f in s.all_files():
628             if (files == [] or
629                 ((f.name() not in files_got) and
630                  (f.name() in files or
631                   (decompress and f.decompressed_name() in files)))):
632                 outname = f.decompressed_name() if decompress else f.name()
633                 files_got += [outname]
634                 if os.path.exists(os.path.join(path, stream_name, outname)):
635                     continue
636                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
637                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
638                 for buf in (f.readall_decompressed() if decompress
639                             else f.readall()):
640                     outfile.write(buf)
641                 outfile.close()
642     if len(files_got) < len(files):
643         raise arvados.errors.AssertionError(
644             "Wanted files %s but only got %s from %s" %
645             (files, files_got,
646              [z.name() for z in CollectionReader(collection).all_files()]))
647     os.symlink(collection_hash, os.path.join(path, '.locator'))
648
649     lockfile.close()
650     return path
651
652 @_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
653 def mkdir_dash_p(path):
654     if not os.path.isdir(path):
655         try:
656             os.makedirs(path)
657         except OSError as e:
658             if e.errno == errno.EEXIST and os.path.isdir(path):
659                 # It is not an error if someone else creates the
660                 # directory between our exists() and makedirs() calls.
661                 pass
662             else:
663                 raise
664
665 @_deprecated('3.0', 'arvados.collection.Collection')
666 def stream_extract(stream, path, files=[], decompress=True):
667     """Retrieve a stream from Keep and extract it to a local
668     directory.  Return the absolute path where the stream was
669     extracted.
670
671     stream -- StreamReader object
672     path -- where to extract: absolute, or relative to job tmp
673     """
674     from arvados import current_job
675     if not re.search('^/', path):
676         path = os.path.join(current_job().tmpdir, path)
677     lockfile = open(path + '.lock', 'w')
678     fcntl.flock(lockfile, fcntl.LOCK_EX)
679     try:
680         os.stat(path)
681     except OSError:
682         os.mkdir(path)
683
684     files_got = []
685     for f in stream.all_files():
686         if (files == [] or
687             ((f.name() not in files_got) and
688              (f.name() in files or
689               (decompress and f.decompressed_name() in files)))):
690             outname = f.decompressed_name() if decompress else f.name()
691             files_got += [outname]
692             if os.path.exists(os.path.join(path, outname)):
693                 os.unlink(os.path.join(path, outname))
694             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
695             outfile = open(os.path.join(path, outname), 'wb')
696             for buf in (f.readall_decompressed() if decompress
697                         else f.readall()):
698                 outfile.write(buf)
699             outfile.close()
700     if len(files_got) < len(files):
701         raise arvados.errors.AssertionError(
702             "Wanted files %s but only got %s from %s" %
703             (files, files_got, [z.name() for z in stream.all_files()]))
704     lockfile.close()
705     return path
706
707 @_deprecated('3.0', 'os.walk')
708 def listdir_recursive(dirname, base=None, max_depth=None):
709     """listdir_recursive(dirname, base, max_depth)
710
711     Return a list of file and directory names found under dirname.
712
713     If base is not None, prepend "{base}/" to each returned name.
714
715     If max_depth is None, descend into directories and return only the
716     names of files found in the directory tree.
717
718     If max_depth is a non-negative integer, stop descending into
719     directories at the given depth, and at that point return directory
720     names instead.
721
722     If max_depth==0 (and base is None) this is equivalent to
723     sorted(os.listdir(dirname)).
724     """
725     allfiles = []
726     for ent in sorted(os.listdir(dirname)):
727         ent_path = os.path.join(dirname, ent)
728         ent_base = os.path.join(base, ent) if base else ent
729         if os.path.isdir(ent_path) and max_depth != 0:
730             allfiles += listdir_recursive(
731                 ent_path, base=ent_base,
732                 max_depth=(max_depth-1 if max_depth else None))
733         else:
734             allfiles += [ent_base]
735     return allfiles