1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from builtins import range
19 from arvados.collection import CollectionReader
21 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
22 CR_UNCOMMITTED = 'Uncommitted'
23 CR_COMMITTED = 'Committed'
26 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
27 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
28 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
29 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
30 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
31 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
32 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
33 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
34 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
35 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
36 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
38 def clear_tmpdir(path=None):
40 Ensure the given directory (or TASK_TMPDIR if none given)
44 path = arvados.current_task().tmpdir
45 if os.path.exists(path):
46 p = subprocess.Popen(['rm', '-rf', path])
47 stdout, stderr = p.communicate(None)
49 raise Exception('rm -rf %s: %s' % (path, stderr))
52 def run_command(execargs, **kwargs):
53 kwargs.setdefault('stdin', subprocess.PIPE)
54 kwargs.setdefault('stdout', subprocess.PIPE)
55 kwargs.setdefault('stderr', sys.stderr)
56 kwargs.setdefault('close_fds', True)
57 kwargs.setdefault('shell', False)
58 p = subprocess.Popen(execargs, **kwargs)
59 stdoutdata, stderrdata = p.communicate(None)
61 raise arvados.errors.CommandFailedError(
62 "run_command %s exit %d:\n%s" %
63 (execargs, p.returncode, stderrdata))
64 return stdoutdata, stderrdata
66 def git_checkout(url, version, path):
67 if not re.search('^/', path):
68 path = os.path.join(arvados.current_job().tmpdir, path)
69 if not os.path.exists(path):
70 run_command(["git", "clone", url, path],
71 cwd=os.path.dirname(path))
72 run_command(["git", "checkout", version],
76 def tar_extractor(path, decompress_flag):
77 return subprocess.Popen(["tar",
79 ("-x%sf" % decompress_flag),
82 stdin=subprocess.PIPE, stderr=sys.stderr,
83 shell=False, close_fds=True)
85 def tarball_extract(tarball, path):
86 """Retrieve a tarball from Keep and extract it to a local
87 directory. Return the absolute path where the tarball was
88 extracted. If the top level of the tarball contained just one
89 file or directory, return the absolute path of that single
92 tarball -- collection locator
93 path -- where to extract the tarball: absolute, or relative to job tmp
95 if not re.search('^/', path):
96 path = os.path.join(arvados.current_job().tmpdir, path)
97 lockfile = open(path + '.lock', 'w')
98 fcntl.flock(lockfile, fcntl.LOCK_EX)
103 already_have_it = False
105 if os.readlink(os.path.join(path, '.locator')) == tarball:
106 already_have_it = True
109 if not already_have_it:
111 # emulate "rm -f" (i.e., if the file does not exist, we win)
113 os.unlink(os.path.join(path, '.locator'))
115 if os.path.exists(os.path.join(path, '.locator')):
116 os.unlink(os.path.join(path, '.locator'))
118 for f in CollectionReader(tarball).all_files():
119 if re.search('\.(tbz|tar.bz2)$', f.name()):
120 p = tar_extractor(path, 'j')
121 elif re.search('\.(tgz|tar.gz)$', f.name()):
122 p = tar_extractor(path, 'z')
123 elif re.search('\.tar$', f.name()):
124 p = tar_extractor(path, '')
126 raise arvados.errors.AssertionError(
127 "tarball_extract cannot handle filename %s" % f.name())
135 if p.returncode != 0:
137 raise arvados.errors.CommandFailedError(
138 "tar exited %d" % p.returncode)
139 os.symlink(tarball, os.path.join(path, '.locator'))
140 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
142 if len(tld_extracts) == 1:
143 return os.path.join(path, tld_extracts[0])
146 def zipball_extract(zipball, path):
147 """Retrieve a zip archive from Keep and extract it to a local
148 directory. Return the absolute path where the archive was
149 extracted. If the top level of the archive contained just one
150 file or directory, return the absolute path of that single
153 zipball -- collection locator
154 path -- where to extract the archive: absolute, or relative to job tmp
156 if not re.search('^/', path):
157 path = os.path.join(arvados.current_job().tmpdir, path)
158 lockfile = open(path + '.lock', 'w')
159 fcntl.flock(lockfile, fcntl.LOCK_EX)
164 already_have_it = False
166 if os.readlink(os.path.join(path, '.locator')) == zipball:
167 already_have_it = True
170 if not already_have_it:
172 # emulate "rm -f" (i.e., if the file does not exist, we win)
174 os.unlink(os.path.join(path, '.locator'))
176 if os.path.exists(os.path.join(path, '.locator')):
177 os.unlink(os.path.join(path, '.locator'))
179 for f in CollectionReader(zipball).all_files():
180 if not re.search('\.zip$', f.name()):
181 raise arvados.errors.NotImplementedError(
182 "zipball_extract cannot handle filename %s" % f.name())
183 zip_filename = os.path.join(path, os.path.basename(f.name()))
184 zip_file = open(zip_filename, 'wb')
192 p = subprocess.Popen(["unzip",
197 stdin=None, stderr=sys.stderr,
198 shell=False, close_fds=True)
200 if p.returncode != 0:
202 raise arvados.errors.CommandFailedError(
203 "unzip exited %d" % p.returncode)
204 os.unlink(zip_filename)
205 os.symlink(zipball, os.path.join(path, '.locator'))
206 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
208 if len(tld_extracts) == 1:
209 return os.path.join(path, tld_extracts[0])
212 def collection_extract(collection, path, files=[], decompress=True):
213 """Retrieve a collection from Keep and extract it to a local
214 directory. Return the absolute path where the collection was
217 collection -- collection locator
218 path -- where to extract: absolute, or relative to job tmp
220 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
222 collection_hash = matches.group(1)
224 collection_hash = hashlib.md5(collection).hexdigest()
225 if not re.search('^/', path):
226 path = os.path.join(arvados.current_job().tmpdir, path)
227 lockfile = open(path + '.lock', 'w')
228 fcntl.flock(lockfile, fcntl.LOCK_EX)
233 already_have_it = False
235 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
236 already_have_it = True
240 # emulate "rm -f" (i.e., if the file does not exist, we win)
242 os.unlink(os.path.join(path, '.locator'))
244 if os.path.exists(os.path.join(path, '.locator')):
245 os.unlink(os.path.join(path, '.locator'))
248 for s in CollectionReader(collection).all_streams():
249 stream_name = s.name()
250 for f in s.all_files():
252 ((f.name() not in files_got) and
253 (f.name() in files or
254 (decompress and f.decompressed_name() in files)))):
255 outname = f.decompressed_name() if decompress else f.name()
256 files_got += [outname]
257 if os.path.exists(os.path.join(path, stream_name, outname)):
259 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
260 outfile = open(os.path.join(path, stream_name, outname), 'wb')
261 for buf in (f.readall_decompressed() if decompress
265 if len(files_got) < len(files):
266 raise arvados.errors.AssertionError(
267 "Wanted files %s but only got %s from %s" %
269 [z.name() for z in CollectionReader(collection).all_files()]))
270 os.symlink(collection_hash, os.path.join(path, '.locator'))
275 def mkdir_dash_p(path):
276 if not os.path.isdir(path):
280 if e.errno == errno.EEXIST and os.path.isdir(path):
281 # It is not an error if someone else creates the
282 # directory between our exists() and makedirs() calls.
287 def stream_extract(stream, path, files=[], decompress=True):
288 """Retrieve a stream from Keep and extract it to a local
289 directory. Return the absolute path where the stream was
292 stream -- StreamReader object
293 path -- where to extract: absolute, or relative to job tmp
295 if not re.search('^/', path):
296 path = os.path.join(arvados.current_job().tmpdir, path)
297 lockfile = open(path + '.lock', 'w')
298 fcntl.flock(lockfile, fcntl.LOCK_EX)
305 for f in stream.all_files():
307 ((f.name() not in files_got) and
308 (f.name() in files or
309 (decompress and f.decompressed_name() in files)))):
310 outname = f.decompressed_name() if decompress else f.name()
311 files_got += [outname]
312 if os.path.exists(os.path.join(path, outname)):
313 os.unlink(os.path.join(path, outname))
314 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
315 outfile = open(os.path.join(path, outname), 'wb')
316 for buf in (f.readall_decompressed() if decompress
320 if len(files_got) < len(files):
321 raise arvados.errors.AssertionError(
322 "Wanted files %s but only got %s from %s" %
323 (files, files_got, [z.name() for z in stream.all_files()]))
327 def listdir_recursive(dirname, base=None, max_depth=None):
328 """listdir_recursive(dirname, base, max_depth)
330 Return a list of file and directory names found under dirname.
332 If base is not None, prepend "{base}/" to each returned name.
334 If max_depth is None, descend into directories and return only the
335 names of files found in the directory tree.
337 If max_depth is a non-negative integer, stop descending into
338 directories at the given depth, and at that point return directory
341 If max_depth==0 (and base is None) this is equivalent to
342 sorted(os.listdir(dirname)).
345 for ent in sorted(os.listdir(dirname)):
346 ent_path = os.path.join(dirname, ent)
347 ent_base = os.path.join(base, ent) if base else ent
348 if os.path.isdir(ent_path) and max_depth != 0:
349 allfiles += listdir_recursive(
350 ent_path, base=ent_base,
351 max_depth=(max_depth-1 if max_depth else None))
353 allfiles += [ent_base]
356 def is_hex(s, *length_args):
357 """is_hex(s[, length[, max_length]]) -> boolean
359 Return True if s is a string of hexadecimal digits.
360 If one length argument is given, the string must contain exactly
361 that number of digits.
362 If two length arguments are given, the string must contain a number of
363 digits between those two lengths, inclusive.
364 Return False otherwise.
366 num_length_args = len(length_args)
367 if num_length_args > 2:
368 raise arvados.errors.ArgumentError(
369 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
370 elif num_length_args == 2:
371 good_len = (length_args[0] <= len(s) <= length_args[1])
372 elif num_length_args == 1:
373 good_len = (len(s) == length_args[0])
376 return bool(good_len and HEX_RE.match(s))
378 def list_all(fn, num_retries=0, **kwargs):
379 # Default limit to (effectively) api server's MAX_LIMIT
380 kwargs.setdefault('limit', sys.maxsize)
383 items_available = sys.maxsize
384 while len(items) < items_available:
385 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
387 items_available = c['items_available']
388 offset = c['offset'] + len(c['items'])
391 def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
393 kwargs["limit"] = pagesize
394 kwargs["count"] = 'none'
395 asc = "asc" if ascending else "desc"
396 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
397 other_filters = kwargs.get("filters", [])
399 if "select" in kwargs and "uuid" not in kwargs["select"]:
400 kwargs["select"].append("uuid")
404 expect_full_page = True
405 seen_prevpage = set()
406 seen_thispage = set()
408 prev_page_all_same_order_key = False
411 kwargs["filters"] = nextpage+other_filters
412 items = fn(**kwargs).execute(num_retries=num_retries)
414 if len(items["items"]) == 0:
415 if prev_page_all_same_order_key:
416 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
417 prev_page_all_same_order_key = False
422 seen_prevpage = seen_thispage
423 seen_thispage = set()
425 for i in items["items"]:
426 # In cases where there's more than one record with the
427 # same order key, the result could include records we
428 # already saw in the last page. Skip them.
429 if i["uuid"] in seen_prevpage:
431 seen_thispage.add(i["uuid"])
434 firstitem = items["items"][0]
435 lastitem = items["items"][-1]
437 if firstitem[order_key] == lastitem[order_key]:
438 # Got a page where every item has the same order key.
439 # Switch to using uuid for paging.
440 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
441 prev_page_all_same_order_key = True
443 # Start from the last order key seen, but skip the last
444 # known uuid to avoid retrieving the same row twice. If
445 # there are multiple rows with the same order key it is
446 # still likely we'll end up retrieving duplicate rows.
447 # That's handled by tracking the "seen" rows for each page
448 # so they can be skipped if they show up on the next page.
449 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
450 prev_page_all_same_order_key = False
453 def ca_certs_path(fallback=httplib2.CA_CERTS):
454 """Return the path of the best available CA certs source.
456 This function searches for various distribution sources of CA
457 certificates, and returns the first it finds. If it doesn't find any,
458 it returns the value of `fallback` (httplib2's CA certs by default).
460 for ca_certs_path in [
461 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
462 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
463 os.environ.get('SSL_CERT_FILE'),
465 '/etc/arvados/ca-certificates.crt',
467 '/etc/ssl/certs/ca-certificates.crt',
469 '/etc/pki/tls/certs/ca-bundle.crt',
471 if ca_certs_path and os.path.exists(ca_certs_path):
475 def new_request_id():
477 # 2**104 > 36**20 > 2**103
478 n = random.getrandbits(104)
482 rid += chr(c+ord('0'))
484 rid += chr(c+ord('a')-10)
488 def get_config_once(svc):
489 if not svc._rootDesc.get('resources').get('configs', False):
490 # Old API server version, no config export endpoint
492 if not hasattr(svc, '_cached_config'):
493 svc._cached_config = svc.configs().get().execute()
494 return svc._cached_config
496 def get_vocabulary_once(svc):
497 if not svc._rootDesc.get('resources').get('vocabularies', False):
498 # Old API server version, no vocabulary export endpoint
500 if not hasattr(svc, '_cached_vocabulary'):
501 svc._cached_vocabulary = svc.vocabularies().get().execute()
502 return svc._cached_vocabulary
504 def trim_name(collectionname):
506 trim_name takes a record name (collection name, project name, etc)
507 and trims it to fit the 255 character name limit, with additional
508 space for the timestamp added by ensure_unique_name, by removing
509 excess characters from the middle and inserting an ellipse
512 max_name_len = 254 - 28
514 if len(collectionname) > max_name_len:
515 over = len(collectionname) - max_name_len
516 split = int(max_name_len/2)
517 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
519 return collectionname