1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import division
6 from builtins import range
19 from arvados.collection import CollectionReader
21 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
23 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
24 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
25 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
26 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
27 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
28 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
29 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
30 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
31 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
32 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
33 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
35 def clear_tmpdir(path=None):
37 Ensure the given directory (or TASK_TMPDIR if none given)
41 path = arvados.current_task().tmpdir
42 if os.path.exists(path):
43 p = subprocess.Popen(['rm', '-rf', path])
44 stdout, stderr = p.communicate(None)
46 raise Exception('rm -rf %s: %s' % (path, stderr))
49 def run_command(execargs, **kwargs):
50 kwargs.setdefault('stdin', subprocess.PIPE)
51 kwargs.setdefault('stdout', subprocess.PIPE)
52 kwargs.setdefault('stderr', sys.stderr)
53 kwargs.setdefault('close_fds', True)
54 kwargs.setdefault('shell', False)
55 p = subprocess.Popen(execargs, **kwargs)
56 stdoutdata, stderrdata = p.communicate(None)
58 raise arvados.errors.CommandFailedError(
59 "run_command %s exit %d:\n%s" %
60 (execargs, p.returncode, stderrdata))
61 return stdoutdata, stderrdata
63 def git_checkout(url, version, path):
64 if not re.search('^/', path):
65 path = os.path.join(arvados.current_job().tmpdir, path)
66 if not os.path.exists(path):
67 run_command(["git", "clone", url, path],
68 cwd=os.path.dirname(path))
69 run_command(["git", "checkout", version],
73 def tar_extractor(path, decompress_flag):
74 return subprocess.Popen(["tar",
76 ("-x%sf" % decompress_flag),
79 stdin=subprocess.PIPE, stderr=sys.stderr,
80 shell=False, close_fds=True)
82 def tarball_extract(tarball, path):
83 """Retrieve a tarball from Keep and extract it to a local
84 directory. Return the absolute path where the tarball was
85 extracted. If the top level of the tarball contained just one
86 file or directory, return the absolute path of that single
89 tarball -- collection locator
90 path -- where to extract the tarball: absolute, or relative to job tmp
92 if not re.search('^/', path):
93 path = os.path.join(arvados.current_job().tmpdir, path)
94 lockfile = open(path + '.lock', 'w')
95 fcntl.flock(lockfile, fcntl.LOCK_EX)
100 already_have_it = False
102 if os.readlink(os.path.join(path, '.locator')) == tarball:
103 already_have_it = True
106 if not already_have_it:
108 # emulate "rm -f" (i.e., if the file does not exist, we win)
110 os.unlink(os.path.join(path, '.locator'))
112 if os.path.exists(os.path.join(path, '.locator')):
113 os.unlink(os.path.join(path, '.locator'))
115 for f in CollectionReader(tarball).all_files():
116 if re.search('\.(tbz|tar.bz2)$', f.name()):
117 p = tar_extractor(path, 'j')
118 elif re.search('\.(tgz|tar.gz)$', f.name()):
119 p = tar_extractor(path, 'z')
120 elif re.search('\.tar$', f.name()):
121 p = tar_extractor(path, '')
123 raise arvados.errors.AssertionError(
124 "tarball_extract cannot handle filename %s" % f.name())
132 if p.returncode != 0:
134 raise arvados.errors.CommandFailedError(
135 "tar exited %d" % p.returncode)
136 os.symlink(tarball, os.path.join(path, '.locator'))
137 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
139 if len(tld_extracts) == 1:
140 return os.path.join(path, tld_extracts[0])
143 def zipball_extract(zipball, path):
144 """Retrieve a zip archive from Keep and extract it to a local
145 directory. Return the absolute path where the archive was
146 extracted. If the top level of the archive contained just one
147 file or directory, return the absolute path of that single
150 zipball -- collection locator
151 path -- where to extract the archive: absolute, or relative to job tmp
153 if not re.search('^/', path):
154 path = os.path.join(arvados.current_job().tmpdir, path)
155 lockfile = open(path + '.lock', 'w')
156 fcntl.flock(lockfile, fcntl.LOCK_EX)
161 already_have_it = False
163 if os.readlink(os.path.join(path, '.locator')) == zipball:
164 already_have_it = True
167 if not already_have_it:
169 # emulate "rm -f" (i.e., if the file does not exist, we win)
171 os.unlink(os.path.join(path, '.locator'))
173 if os.path.exists(os.path.join(path, '.locator')):
174 os.unlink(os.path.join(path, '.locator'))
176 for f in CollectionReader(zipball).all_files():
177 if not re.search('\.zip$', f.name()):
178 raise arvados.errors.NotImplementedError(
179 "zipball_extract cannot handle filename %s" % f.name())
180 zip_filename = os.path.join(path, os.path.basename(f.name()))
181 zip_file = open(zip_filename, 'wb')
189 p = subprocess.Popen(["unzip",
194 stdin=None, stderr=sys.stderr,
195 shell=False, close_fds=True)
197 if p.returncode != 0:
199 raise arvados.errors.CommandFailedError(
200 "unzip exited %d" % p.returncode)
201 os.unlink(zip_filename)
202 os.symlink(zipball, os.path.join(path, '.locator'))
203 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
205 if len(tld_extracts) == 1:
206 return os.path.join(path, tld_extracts[0])
209 def collection_extract(collection, path, files=[], decompress=True):
210 """Retrieve a collection from Keep and extract it to a local
211 directory. Return the absolute path where the collection was
214 collection -- collection locator
215 path -- where to extract: absolute, or relative to job tmp
217 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
219 collection_hash = matches.group(1)
221 collection_hash = hashlib.md5(collection).hexdigest()
222 if not re.search('^/', path):
223 path = os.path.join(arvados.current_job().tmpdir, path)
224 lockfile = open(path + '.lock', 'w')
225 fcntl.flock(lockfile, fcntl.LOCK_EX)
230 already_have_it = False
232 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
233 already_have_it = True
237 # emulate "rm -f" (i.e., if the file does not exist, we win)
239 os.unlink(os.path.join(path, '.locator'))
241 if os.path.exists(os.path.join(path, '.locator')):
242 os.unlink(os.path.join(path, '.locator'))
245 for s in CollectionReader(collection).all_streams():
246 stream_name = s.name()
247 for f in s.all_files():
249 ((f.name() not in files_got) and
250 (f.name() in files or
251 (decompress and f.decompressed_name() in files)))):
252 outname = f.decompressed_name() if decompress else f.name()
253 files_got += [outname]
254 if os.path.exists(os.path.join(path, stream_name, outname)):
256 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
257 outfile = open(os.path.join(path, stream_name, outname), 'wb')
258 for buf in (f.readall_decompressed() if decompress
262 if len(files_got) < len(files):
263 raise arvados.errors.AssertionError(
264 "Wanted files %s but only got %s from %s" %
266 [z.name() for z in CollectionReader(collection).all_files()]))
267 os.symlink(collection_hash, os.path.join(path, '.locator'))
272 def mkdir_dash_p(path):
273 if not os.path.isdir(path):
277 if e.errno == errno.EEXIST and os.path.isdir(path):
278 # It is not an error if someone else creates the
279 # directory between our exists() and makedirs() calls.
284 def stream_extract(stream, path, files=[], decompress=True):
285 """Retrieve a stream from Keep and extract it to a local
286 directory. Return the absolute path where the stream was
289 stream -- StreamReader object
290 path -- where to extract: absolute, or relative to job tmp
292 if not re.search('^/', path):
293 path = os.path.join(arvados.current_job().tmpdir, path)
294 lockfile = open(path + '.lock', 'w')
295 fcntl.flock(lockfile, fcntl.LOCK_EX)
302 for f in stream.all_files():
304 ((f.name() not in files_got) and
305 (f.name() in files or
306 (decompress and f.decompressed_name() in files)))):
307 outname = f.decompressed_name() if decompress else f.name()
308 files_got += [outname]
309 if os.path.exists(os.path.join(path, outname)):
310 os.unlink(os.path.join(path, outname))
311 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
312 outfile = open(os.path.join(path, outname), 'wb')
313 for buf in (f.readall_decompressed() if decompress
317 if len(files_got) < len(files):
318 raise arvados.errors.AssertionError(
319 "Wanted files %s but only got %s from %s" %
320 (files, files_got, [z.name() for z in stream.all_files()]))
324 def listdir_recursive(dirname, base=None, max_depth=None):
325 """listdir_recursive(dirname, base, max_depth)
327 Return a list of file and directory names found under dirname.
329 If base is not None, prepend "{base}/" to each returned name.
331 If max_depth is None, descend into directories and return only the
332 names of files found in the directory tree.
334 If max_depth is a non-negative integer, stop descending into
335 directories at the given depth, and at that point return directory
338 If max_depth==0 (and base is None) this is equivalent to
339 sorted(os.listdir(dirname)).
342 for ent in sorted(os.listdir(dirname)):
343 ent_path = os.path.join(dirname, ent)
344 ent_base = os.path.join(base, ent) if base else ent
345 if os.path.isdir(ent_path) and max_depth != 0:
346 allfiles += listdir_recursive(
347 ent_path, base=ent_base,
348 max_depth=(max_depth-1 if max_depth else None))
350 allfiles += [ent_base]
353 def is_hex(s, *length_args):
354 """is_hex(s[, length[, max_length]]) -> boolean
356 Return True if s is a string of hexadecimal digits.
357 If one length argument is given, the string must contain exactly
358 that number of digits.
359 If two length arguments are given, the string must contain a number of
360 digits between those two lengths, inclusive.
361 Return False otherwise.
363 num_length_args = len(length_args)
364 if num_length_args > 2:
365 raise arvados.errors.ArgumentError(
366 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
367 elif num_length_args == 2:
368 good_len = (length_args[0] <= len(s) <= length_args[1])
369 elif num_length_args == 1:
370 good_len = (len(s) == length_args[0])
373 return bool(good_len and HEX_RE.match(s))
375 def list_all(fn, num_retries=0, **kwargs):
376 # Default limit to (effectively) api server's MAX_LIMIT
377 kwargs.setdefault('limit', sys.maxsize)
380 items_available = sys.maxsize
381 while len(items) < items_available:
382 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
384 items_available = c['items_available']
385 offset = c['offset'] + len(c['items'])
388 def ca_certs_path(fallback=httplib2.CA_CERTS):
389 """Return the path of the best available CA certs source.
391 This function searches for various distribution sources of CA
392 certificates, and returns the first it finds. If it doesn't find any,
393 it returns the value of `fallback` (httplib2's CA certs by default).
395 for ca_certs_path in [
397 '/etc/arvados/ca-certificates.crt',
399 '/etc/ssl/certs/ca-certificates.crt',
401 '/etc/pki/tls/certs/ca-bundle.crt',
403 if os.path.exists(ca_certs_path):
407 def new_request_id():
409 # 2**104 > 36**20 > 2**103
410 n = random.getrandbits(104)
414 rid += chr(c+ord('0'))
416 rid += chr(c+ord('a')-10)