1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from builtins import range
19 from arvados.collection import CollectionReader
21 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
22 CR_UNCOMMITTED = 'Uncommitted'
23 CR_COMMITTED = 'Committed'
26 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
27 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
28 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
29 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
30 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
31 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
32 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
33 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
34 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
35 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
36 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
38 def clear_tmpdir(path=None):
40 Ensure the given directory (or TASK_TMPDIR if none given)
44 path = arvados.current_task().tmpdir
45 if os.path.exists(path):
46 p = subprocess.Popen(['rm', '-rf', path])
47 stdout, stderr = p.communicate(None)
49 raise Exception('rm -rf %s: %s' % (path, stderr))
52 def run_command(execargs, **kwargs):
53 kwargs.setdefault('stdin', subprocess.PIPE)
54 kwargs.setdefault('stdout', subprocess.PIPE)
55 kwargs.setdefault('stderr', sys.stderr)
56 kwargs.setdefault('close_fds', True)
57 kwargs.setdefault('shell', False)
58 p = subprocess.Popen(execargs, **kwargs)
59 stdoutdata, stderrdata = p.communicate(None)
61 raise arvados.errors.CommandFailedError(
62 "run_command %s exit %d:\n%s" %
63 (execargs, p.returncode, stderrdata))
64 return stdoutdata, stderrdata
66 def git_checkout(url, version, path):
67 if not re.search('^/', path):
68 path = os.path.join(arvados.current_job().tmpdir, path)
69 if not os.path.exists(path):
70 run_command(["git", "clone", url, path],
71 cwd=os.path.dirname(path))
72 run_command(["git", "checkout", version],
76 def tar_extractor(path, decompress_flag):
77 return subprocess.Popen(["tar",
79 ("-x%sf" % decompress_flag),
82 stdin=subprocess.PIPE, stderr=sys.stderr,
83 shell=False, close_fds=True)
85 def tarball_extract(tarball, path):
86 """Retrieve a tarball from Keep and extract it to a local
87 directory. Return the absolute path where the tarball was
88 extracted. If the top level of the tarball contained just one
89 file or directory, return the absolute path of that single
92 tarball -- collection locator
93 path -- where to extract the tarball: absolute, or relative to job tmp
95 if not re.search('^/', path):
96 path = os.path.join(arvados.current_job().tmpdir, path)
97 lockfile = open(path + '.lock', 'w')
98 fcntl.flock(lockfile, fcntl.LOCK_EX)
103 already_have_it = False
105 if os.readlink(os.path.join(path, '.locator')) == tarball:
106 already_have_it = True
109 if not already_have_it:
111 # emulate "rm -f" (i.e., if the file does not exist, we win)
113 os.unlink(os.path.join(path, '.locator'))
115 if os.path.exists(os.path.join(path, '.locator')):
116 os.unlink(os.path.join(path, '.locator'))
118 for f in CollectionReader(tarball).all_files():
120 if f_name.endswith(('.tbz', '.tar.bz2')):
121 p = tar_extractor(path, 'j')
122 elif f_name.endswith(('.tgz', '.tar.gz')):
123 p = tar_extractor(path, 'z')
124 elif f_name.endswith('.tar'):
125 p = tar_extractor(path, '')
127 raise arvados.errors.AssertionError(
128 "tarball_extract cannot handle filename %s" % f.name())
136 if p.returncode != 0:
138 raise arvados.errors.CommandFailedError(
139 "tar exited %d" % p.returncode)
140 os.symlink(tarball, os.path.join(path, '.locator'))
141 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
143 if len(tld_extracts) == 1:
144 return os.path.join(path, tld_extracts[0])
147 def zipball_extract(zipball, path):
148 """Retrieve a zip archive from Keep and extract it to a local
149 directory. Return the absolute path where the archive was
150 extracted. If the top level of the archive contained just one
151 file or directory, return the absolute path of that single
154 zipball -- collection locator
155 path -- where to extract the archive: absolute, or relative to job tmp
157 if not re.search('^/', path):
158 path = os.path.join(arvados.current_job().tmpdir, path)
159 lockfile = open(path + '.lock', 'w')
160 fcntl.flock(lockfile, fcntl.LOCK_EX)
165 already_have_it = False
167 if os.readlink(os.path.join(path, '.locator')) == zipball:
168 already_have_it = True
171 if not already_have_it:
173 # emulate "rm -f" (i.e., if the file does not exist, we win)
175 os.unlink(os.path.join(path, '.locator'))
177 if os.path.exists(os.path.join(path, '.locator')):
178 os.unlink(os.path.join(path, '.locator'))
180 for f in CollectionReader(zipball).all_files():
181 if not f.name().endswith('.zip'):
182 raise arvados.errors.NotImplementedError(
183 "zipball_extract cannot handle filename %s" % f.name())
184 zip_filename = os.path.join(path, os.path.basename(f.name()))
185 zip_file = open(zip_filename, 'wb')
193 p = subprocess.Popen(["unzip",
198 stdin=None, stderr=sys.stderr,
199 shell=False, close_fds=True)
201 if p.returncode != 0:
203 raise arvados.errors.CommandFailedError(
204 "unzip exited %d" % p.returncode)
205 os.unlink(zip_filename)
206 os.symlink(zipball, os.path.join(path, '.locator'))
207 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
209 if len(tld_extracts) == 1:
210 return os.path.join(path, tld_extracts[0])
213 def collection_extract(collection, path, files=[], decompress=True):
214 """Retrieve a collection from Keep and extract it to a local
215 directory. Return the absolute path where the collection was
218 collection -- collection locator
219 path -- where to extract: absolute, or relative to job tmp
221 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
223 collection_hash = matches.group(1)
225 collection_hash = hashlib.md5(collection).hexdigest()
226 if not re.search('^/', path):
227 path = os.path.join(arvados.current_job().tmpdir, path)
228 lockfile = open(path + '.lock', 'w')
229 fcntl.flock(lockfile, fcntl.LOCK_EX)
234 already_have_it = False
236 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
237 already_have_it = True
241 # emulate "rm -f" (i.e., if the file does not exist, we win)
243 os.unlink(os.path.join(path, '.locator'))
245 if os.path.exists(os.path.join(path, '.locator')):
246 os.unlink(os.path.join(path, '.locator'))
249 for s in CollectionReader(collection).all_streams():
250 stream_name = s.name()
251 for f in s.all_files():
253 ((f.name() not in files_got) and
254 (f.name() in files or
255 (decompress and f.decompressed_name() in files)))):
256 outname = f.decompressed_name() if decompress else f.name()
257 files_got += [outname]
258 if os.path.exists(os.path.join(path, stream_name, outname)):
260 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
261 outfile = open(os.path.join(path, stream_name, outname), 'wb')
262 for buf in (f.readall_decompressed() if decompress
266 if len(files_got) < len(files):
267 raise arvados.errors.AssertionError(
268 "Wanted files %s but only got %s from %s" %
270 [z.name() for z in CollectionReader(collection).all_files()]))
271 os.symlink(collection_hash, os.path.join(path, '.locator'))
276 def mkdir_dash_p(path):
277 if not os.path.isdir(path):
281 if e.errno == errno.EEXIST and os.path.isdir(path):
282 # It is not an error if someone else creates the
283 # directory between our exists() and makedirs() calls.
288 def stream_extract(stream, path, files=[], decompress=True):
289 """Retrieve a stream from Keep and extract it to a local
290 directory. Return the absolute path where the stream was
293 stream -- StreamReader object
294 path -- where to extract: absolute, or relative to job tmp
296 if not re.search('^/', path):
297 path = os.path.join(arvados.current_job().tmpdir, path)
298 lockfile = open(path + '.lock', 'w')
299 fcntl.flock(lockfile, fcntl.LOCK_EX)
306 for f in stream.all_files():
308 ((f.name() not in files_got) and
309 (f.name() in files or
310 (decompress and f.decompressed_name() in files)))):
311 outname = f.decompressed_name() if decompress else f.name()
312 files_got += [outname]
313 if os.path.exists(os.path.join(path, outname)):
314 os.unlink(os.path.join(path, outname))
315 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
316 outfile = open(os.path.join(path, outname), 'wb')
317 for buf in (f.readall_decompressed() if decompress
321 if len(files_got) < len(files):
322 raise arvados.errors.AssertionError(
323 "Wanted files %s but only got %s from %s" %
324 (files, files_got, [z.name() for z in stream.all_files()]))
328 def listdir_recursive(dirname, base=None, max_depth=None):
329 """listdir_recursive(dirname, base, max_depth)
331 Return a list of file and directory names found under dirname.
333 If base is not None, prepend "{base}/" to each returned name.
335 If max_depth is None, descend into directories and return only the
336 names of files found in the directory tree.
338 If max_depth is a non-negative integer, stop descending into
339 directories at the given depth, and at that point return directory
342 If max_depth==0 (and base is None) this is equivalent to
343 sorted(os.listdir(dirname)).
346 for ent in sorted(os.listdir(dirname)):
347 ent_path = os.path.join(dirname, ent)
348 ent_base = os.path.join(base, ent) if base else ent
349 if os.path.isdir(ent_path) and max_depth != 0:
350 allfiles += listdir_recursive(
351 ent_path, base=ent_base,
352 max_depth=(max_depth-1 if max_depth else None))
354 allfiles += [ent_base]
357 def is_hex(s, *length_args):
358 """is_hex(s[, length[, max_length]]) -> boolean
360 Return True if s is a string of hexadecimal digits.
361 If one length argument is given, the string must contain exactly
362 that number of digits.
363 If two length arguments are given, the string must contain a number of
364 digits between those two lengths, inclusive.
365 Return False otherwise.
367 num_length_args = len(length_args)
368 if num_length_args > 2:
369 raise arvados.errors.ArgumentError(
370 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
371 elif num_length_args == 2:
372 good_len = (length_args[0] <= len(s) <= length_args[1])
373 elif num_length_args == 1:
374 good_len = (len(s) == length_args[0])
377 return bool(good_len and HEX_RE.match(s))
379 def list_all(fn, num_retries=0, **kwargs):
380 # Default limit to (effectively) api server's MAX_LIMIT
381 kwargs.setdefault('limit', sys.maxsize)
384 items_available = sys.maxsize
385 while len(items) < items_available:
386 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
388 items_available = c['items_available']
389 offset = c['offset'] + len(c['items'])
392 def keyset_list_all(fn, order_key="created_at", num_retries=0, ascending=True, **kwargs):
394 kwargs["limit"] = pagesize
395 kwargs["count"] = 'none'
396 asc = "asc" if ascending else "desc"
397 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
398 other_filters = kwargs.get("filters", [])
400 if "select" in kwargs and "uuid" not in kwargs["select"]:
401 kwargs["select"].append("uuid")
405 expect_full_page = True
406 seen_prevpage = set()
407 seen_thispage = set()
409 prev_page_all_same_order_key = False
412 kwargs["filters"] = nextpage+other_filters
413 items = fn(**kwargs).execute(num_retries=num_retries)
415 if len(items["items"]) == 0:
416 if prev_page_all_same_order_key:
417 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
418 prev_page_all_same_order_key = False
423 seen_prevpage = seen_thispage
424 seen_thispage = set()
426 for i in items["items"]:
427 # In cases where there's more than one record with the
428 # same order key, the result could include records we
429 # already saw in the last page. Skip them.
430 if i["uuid"] in seen_prevpage:
432 seen_thispage.add(i["uuid"])
435 firstitem = items["items"][0]
436 lastitem = items["items"][-1]
438 if firstitem[order_key] == lastitem[order_key]:
439 # Got a page where every item has the same order key.
440 # Switch to using uuid for paging.
441 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
442 prev_page_all_same_order_key = True
444 # Start from the last order key seen, but skip the last
445 # known uuid to avoid retrieving the same row twice. If
446 # there are multiple rows with the same order key it is
447 # still likely we'll end up retrieving duplicate rows.
448 # That's handled by tracking the "seen" rows for each page
449 # so they can be skipped if they show up on the next page.
450 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
451 prev_page_all_same_order_key = False
454 def ca_certs_path(fallback=httplib2.CA_CERTS):
455 """Return the path of the best available CA certs source.
457 This function searches for various distribution sources of CA
458 certificates, and returns the first it finds. If it doesn't find any,
459 it returns the value of `fallback` (httplib2's CA certs by default).
461 for ca_certs_path in [
462 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
463 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
464 os.environ.get('SSL_CERT_FILE'),
466 '/etc/arvados/ca-certificates.crt',
468 '/etc/ssl/certs/ca-certificates.crt',
470 '/etc/pki/tls/certs/ca-bundle.crt',
472 if ca_certs_path and os.path.exists(ca_certs_path):
476 def new_request_id():
478 # 2**104 > 36**20 > 2**103
479 n = random.getrandbits(104)
483 rid += chr(c+ord('0'))
485 rid += chr(c+ord('a')-10)
489 def get_config_once(svc):
490 if not svc._rootDesc.get('resources').get('configs', False):
491 # Old API server version, no config export endpoint
493 if not hasattr(svc, '_cached_config'):
494 svc._cached_config = svc.configs().get().execute()
495 return svc._cached_config
497 def get_vocabulary_once(svc):
498 if not svc._rootDesc.get('resources').get('vocabularies', False):
499 # Old API server version, no vocabulary export endpoint
501 if not hasattr(svc, '_cached_vocabulary'):
502 svc._cached_vocabulary = svc.vocabularies().get().execute()
503 return svc._cached_vocabulary
505 def trim_name(collectionname):
507 trim_name takes a record name (collection name, project name, etc)
508 and trims it to fit the 255 character name limit, with additional
509 space for the timestamp added by ensure_unique_name, by removing
510 excess characters from the middle and inserting an ellipse
513 max_name_len = 254 - 28
515 if len(collectionname) > max_name_len:
516 over = len(collectionname) - max_name_len
517 split = int(max_name_len/2)
518 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
520 return collectionname