1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from builtins import range
19 from arvados.collection import CollectionReader
21 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
22 CR_UNCOMMITTED = 'Uncommitted'
23 CR_COMMITTED = 'Committed'
26 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
27 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
28 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
29 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
30 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
31 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
32 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
33 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
34 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
35 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
36 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
38 def clear_tmpdir(path=None):
40 Ensure the given directory (or TASK_TMPDIR if none given)
44 path = arvados.current_task().tmpdir
45 if os.path.exists(path):
46 p = subprocess.Popen(['rm', '-rf', path])
47 stdout, stderr = p.communicate(None)
49 raise Exception('rm -rf %s: %s' % (path, stderr))
52 def run_command(execargs, **kwargs):
53 kwargs.setdefault('stdin', subprocess.PIPE)
54 kwargs.setdefault('stdout', subprocess.PIPE)
55 kwargs.setdefault('stderr', sys.stderr)
56 kwargs.setdefault('close_fds', True)
57 kwargs.setdefault('shell', False)
58 p = subprocess.Popen(execargs, **kwargs)
59 stdoutdata, stderrdata = p.communicate(None)
61 raise arvados.errors.CommandFailedError(
62 "run_command %s exit %d:\n%s" %
63 (execargs, p.returncode, stderrdata))
64 return stdoutdata, stderrdata
66 def git_checkout(url, version, path):
67 if not re.search('^/', path):
68 path = os.path.join(arvados.current_job().tmpdir, path)
69 if not os.path.exists(path):
70 run_command(["git", "clone", url, path],
71 cwd=os.path.dirname(path))
72 run_command(["git", "checkout", version],
76 def tar_extractor(path, decompress_flag):
77 return subprocess.Popen(["tar",
79 ("-x%sf" % decompress_flag),
82 stdin=subprocess.PIPE, stderr=sys.stderr,
83 shell=False, close_fds=True)
85 def tarball_extract(tarball, path):
86 """Retrieve a tarball from Keep and extract it to a local
87 directory. Return the absolute path where the tarball was
88 extracted. If the top level of the tarball contained just one
89 file or directory, return the absolute path of that single
92 tarball -- collection locator
93 path -- where to extract the tarball: absolute, or relative to job tmp
95 if not re.search('^/', path):
96 path = os.path.join(arvados.current_job().tmpdir, path)
97 lockfile = open(path + '.lock', 'w')
98 fcntl.flock(lockfile, fcntl.LOCK_EX)
103 already_have_it = False
105 if os.readlink(os.path.join(path, '.locator')) == tarball:
106 already_have_it = True
109 if not already_have_it:
111 # emulate "rm -f" (i.e., if the file does not exist, we win)
113 os.unlink(os.path.join(path, '.locator'))
115 if os.path.exists(os.path.join(path, '.locator')):
116 os.unlink(os.path.join(path, '.locator'))
118 for f in CollectionReader(tarball).all_files():
119 if re.search('\.(tbz|tar.bz2)$', f.name()):
120 p = tar_extractor(path, 'j')
121 elif re.search('\.(tgz|tar.gz)$', f.name()):
122 p = tar_extractor(path, 'z')
123 elif re.search('\.tar$', f.name()):
124 p = tar_extractor(path, '')
126 raise arvados.errors.AssertionError(
127 "tarball_extract cannot handle filename %s" % f.name())
135 if p.returncode != 0:
137 raise arvados.errors.CommandFailedError(
138 "tar exited %d" % p.returncode)
139 os.symlink(tarball, os.path.join(path, '.locator'))
140 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
142 if len(tld_extracts) == 1:
143 return os.path.join(path, tld_extracts[0])
146 def zipball_extract(zipball, path):
147 """Retrieve a zip archive from Keep and extract it to a local
148 directory. Return the absolute path where the archive was
149 extracted. If the top level of the archive contained just one
150 file or directory, return the absolute path of that single
153 zipball -- collection locator
154 path -- where to extract the archive: absolute, or relative to job tmp
156 if not re.search('^/', path):
157 path = os.path.join(arvados.current_job().tmpdir, path)
158 lockfile = open(path + '.lock', 'w')
159 fcntl.flock(lockfile, fcntl.LOCK_EX)
164 already_have_it = False
166 if os.readlink(os.path.join(path, '.locator')) == zipball:
167 already_have_it = True
170 if not already_have_it:
172 # emulate "rm -f" (i.e., if the file does not exist, we win)
174 os.unlink(os.path.join(path, '.locator'))
176 if os.path.exists(os.path.join(path, '.locator')):
177 os.unlink(os.path.join(path, '.locator'))
179 for f in CollectionReader(zipball).all_files():
180 if not re.search('\.zip$', f.name()):
181 raise arvados.errors.NotImplementedError(
182 "zipball_extract cannot handle filename %s" % f.name())
183 zip_filename = os.path.join(path, os.path.basename(f.name()))
184 zip_file = open(zip_filename, 'wb')
192 p = subprocess.Popen(["unzip",
197 stdin=None, stderr=sys.stderr,
198 shell=False, close_fds=True)
200 if p.returncode != 0:
202 raise arvados.errors.CommandFailedError(
203 "unzip exited %d" % p.returncode)
204 os.unlink(zip_filename)
205 os.symlink(zipball, os.path.join(path, '.locator'))
206 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
208 if len(tld_extracts) == 1:
209 return os.path.join(path, tld_extracts[0])
212 def collection_extract(collection, path, files=[], decompress=True):
213 """Retrieve a collection from Keep and extract it to a local
214 directory. Return the absolute path where the collection was
217 collection -- collection locator
218 path -- where to extract: absolute, or relative to job tmp
220 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
222 collection_hash = matches.group(1)
224 collection_hash = hashlib.md5(collection).hexdigest()
225 if not re.search('^/', path):
226 path = os.path.join(arvados.current_job().tmpdir, path)
227 lockfile = open(path + '.lock', 'w')
228 fcntl.flock(lockfile, fcntl.LOCK_EX)
233 already_have_it = False
235 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
236 already_have_it = True
240 # emulate "rm -f" (i.e., if the file does not exist, we win)
242 os.unlink(os.path.join(path, '.locator'))
244 if os.path.exists(os.path.join(path, '.locator')):
245 os.unlink(os.path.join(path, '.locator'))
248 for s in CollectionReader(collection).all_streams():
249 stream_name = s.name()
250 for f in s.all_files():
252 ((f.name() not in files_got) and
253 (f.name() in files or
254 (decompress and f.decompressed_name() in files)))):
255 outname = f.decompressed_name() if decompress else f.name()
256 files_got += [outname]
257 if os.path.exists(os.path.join(path, stream_name, outname)):
259 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
260 outfile = open(os.path.join(path, stream_name, outname), 'wb')
261 for buf in (f.readall_decompressed() if decompress
265 if len(files_got) < len(files):
266 raise arvados.errors.AssertionError(
267 "Wanted files %s but only got %s from %s" %
269 [z.name() for z in CollectionReader(collection).all_files()]))
270 os.symlink(collection_hash, os.path.join(path, '.locator'))
275 def mkdir_dash_p(path):
276 if not os.path.isdir(path):
280 if e.errno == errno.EEXIST and os.path.isdir(path):
281 # It is not an error if someone else creates the
282 # directory between our exists() and makedirs() calls.
287 def stream_extract(stream, path, files=[], decompress=True):
288 """Retrieve a stream from Keep and extract it to a local
289 directory. Return the absolute path where the stream was
292 stream -- StreamReader object
293 path -- where to extract: absolute, or relative to job tmp
295 if not re.search('^/', path):
296 path = os.path.join(arvados.current_job().tmpdir, path)
297 lockfile = open(path + '.lock', 'w')
298 fcntl.flock(lockfile, fcntl.LOCK_EX)
305 for f in stream.all_files():
307 ((f.name() not in files_got) and
308 (f.name() in files or
309 (decompress and f.decompressed_name() in files)))):
310 outname = f.decompressed_name() if decompress else f.name()
311 files_got += [outname]
312 if os.path.exists(os.path.join(path, outname)):
313 os.unlink(os.path.join(path, outname))
314 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
315 outfile = open(os.path.join(path, outname), 'wb')
316 for buf in (f.readall_decompressed() if decompress
320 if len(files_got) < len(files):
321 raise arvados.errors.AssertionError(
322 "Wanted files %s but only got %s from %s" %
323 (files, files_got, [z.name() for z in stream.all_files()]))
327 def listdir_recursive(dirname, base=None, max_depth=None):
328 """listdir_recursive(dirname, base, max_depth)
330 Return a list of file and directory names found under dirname.
332 If base is not None, prepend "{base}/" to each returned name.
334 If max_depth is None, descend into directories and return only the
335 names of files found in the directory tree.
337 If max_depth is a non-negative integer, stop descending into
338 directories at the given depth, and at that point return directory
341 If max_depth==0 (and base is None) this is equivalent to
342 sorted(os.listdir(dirname)).
345 for ent in sorted(os.listdir(dirname)):
346 ent_path = os.path.join(dirname, ent)
347 ent_base = os.path.join(base, ent) if base else ent
348 if os.path.isdir(ent_path) and max_depth != 0:
349 allfiles += listdir_recursive(
350 ent_path, base=ent_base,
351 max_depth=(max_depth-1 if max_depth else None))
353 allfiles += [ent_base]
356 def is_hex(s, *length_args):
357 """is_hex(s[, length[, max_length]]) -> boolean
359 Return True if s is a string of hexadecimal digits.
360 If one length argument is given, the string must contain exactly
361 that number of digits.
362 If two length arguments are given, the string must contain a number of
363 digits between those two lengths, inclusive.
364 Return False otherwise.
366 num_length_args = len(length_args)
367 if num_length_args > 2:
368 raise arvados.errors.ArgumentError(
369 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
370 elif num_length_args == 2:
371 good_len = (length_args[0] <= len(s) <= length_args[1])
372 elif num_length_args == 1:
373 good_len = (len(s) == length_args[0])
376 return bool(good_len and HEX_RE.match(s))
378 def list_all(fn, num_retries=0, **kwargs):
379 # Default limit to (effectively) api server's MAX_LIMIT
380 kwargs.setdefault('limit', sys.maxsize)
383 items_available = sys.maxsize
384 while len(items) < items_available:
385 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
387 items_available = c['items_available']
388 offset = c['offset'] + len(c['items'])
391 def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
393 kwargs["limit"] = pagesize
394 kwargs["count"] = 'none'
395 kwargs["order"] = ["%s %s" % (order_key, "asc" if ascending else "desc"), "uuid asc"]
396 other_filters = kwargs.get("filters", [])
398 if "select" in kwargs and "uuid" not in kwargs["select"]:
399 kwargs["select"].append("uuid")
403 expect_full_page = True
404 seen_prevpage = set()
405 seen_thispage = set()
407 prev_page_all_same_order_key = False
410 kwargs["filters"] = nextpage+other_filters
411 items = fn(**kwargs).execute(num_retries=num_retries)
413 if len(items["items"]) == 0:
414 if prev_page_all_same_order_key:
415 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
416 prev_page_all_same_order_key = False
421 seen_prevpage = seen_thispage
422 seen_thispage = set()
424 for i in items["items"]:
425 # In cases where there's more than one record with the
426 # same order key, the result could include records we
427 # already saw in the last page. Skip them.
428 if i["uuid"] in seen_prevpage:
430 seen_thispage.add(i["uuid"])
433 firstitem = items["items"][0]
434 lastitem = items["items"][-1]
436 if firstitem[order_key] == lastitem[order_key]:
437 # Got a page where every item has the same order key.
438 # Switch to using uuid for paging.
439 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">", lastitem["uuid"]]]
440 prev_page_all_same_order_key = True
442 # Start from the last order key seen, but skip the last
443 # known uuid to avoid retrieving the same row twice. If
444 # there are multiple rows with the same order key it is
445 # still likely we'll end up retrieving duplicate rows.
446 # That's handled by tracking the "seen" rows for each page
447 # so they can be skipped if they show up on the next page.
448 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
449 prev_page_all_same_order_key = False
452 def ca_certs_path(fallback=httplib2.CA_CERTS):
453 """Return the path of the best available CA certs source.
455 This function searches for various distribution sources of CA
456 certificates, and returns the first it finds. If it doesn't find any,
457 it returns the value of `fallback` (httplib2's CA certs by default).
459 for ca_certs_path in [
460 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
461 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
462 os.environ.get('SSL_CERT_FILE'),
464 '/etc/arvados/ca-certificates.crt',
466 '/etc/ssl/certs/ca-certificates.crt',
468 '/etc/pki/tls/certs/ca-bundle.crt',
470 if ca_certs_path and os.path.exists(ca_certs_path):
474 def new_request_id():
476 # 2**104 > 36**20 > 2**103
477 n = random.getrandbits(104)
481 rid += chr(c+ord('0'))
483 rid += chr(c+ord('a')-10)
487 def get_config_once(svc):
488 if not svc._rootDesc.get('resources').get('configs', False):
489 # Old API server version, no config export endpoint
491 if not hasattr(svc, '_cached_config'):
492 svc._cached_config = svc.configs().get().execute()
493 return svc._cached_config
495 def get_vocabulary_once(svc):
496 if not svc._rootDesc.get('resources').get('vocabularies', False):
497 # Old API server version, no vocabulary export endpoint
499 if not hasattr(svc, '_cached_vocabulary'):
500 svc._cached_vocabulary = svc.vocabularies().get().execute()
501 return svc._cached_vocabulary