1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
15 from arvados.collection import CollectionReader
17 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
19 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
20 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
21 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
22 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
23 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
24 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
25 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
26 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
27 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
28 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
29 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
31 def clear_tmpdir(path=None):
33 Ensure the given directory (or TASK_TMPDIR if none given)
37 path = arvados.current_task().tmpdir
38 if os.path.exists(path):
39 p = subprocess.Popen(['rm', '-rf', path])
40 stdout, stderr = p.communicate(None)
42 raise Exception('rm -rf %s: %s' % (path, stderr))
45 def run_command(execargs, **kwargs):
46 kwargs.setdefault('stdin', subprocess.PIPE)
47 kwargs.setdefault('stdout', subprocess.PIPE)
48 kwargs.setdefault('stderr', sys.stderr)
49 kwargs.setdefault('close_fds', True)
50 kwargs.setdefault('shell', False)
51 p = subprocess.Popen(execargs, **kwargs)
52 stdoutdata, stderrdata = p.communicate(None)
54 raise arvados.errors.CommandFailedError(
55 "run_command %s exit %d:\n%s" %
56 (execargs, p.returncode, stderrdata))
57 return stdoutdata, stderrdata
59 def git_checkout(url, version, path):
60 if not re.search('^/', path):
61 path = os.path.join(arvados.current_job().tmpdir, path)
62 if not os.path.exists(path):
63 run_command(["git", "clone", url, path],
64 cwd=os.path.dirname(path))
65 run_command(["git", "checkout", version],
69 def tar_extractor(path, decompress_flag):
70 return subprocess.Popen(["tar",
72 ("-x%sf" % decompress_flag),
75 stdin=subprocess.PIPE, stderr=sys.stderr,
76 shell=False, close_fds=True)
78 def tarball_extract(tarball, path):
79 """Retrieve a tarball from Keep and extract it to a local
80 directory. Return the absolute path where the tarball was
81 extracted. If the top level of the tarball contained just one
82 file or directory, return the absolute path of that single
85 tarball -- collection locator
86 path -- where to extract the tarball: absolute, or relative to job tmp
88 if not re.search('^/', path):
89 path = os.path.join(arvados.current_job().tmpdir, path)
90 lockfile = open(path + '.lock', 'w')
91 fcntl.flock(lockfile, fcntl.LOCK_EX)
96 already_have_it = False
98 if os.readlink(os.path.join(path, '.locator')) == tarball:
99 already_have_it = True
102 if not already_have_it:
104 # emulate "rm -f" (i.e., if the file does not exist, we win)
106 os.unlink(os.path.join(path, '.locator'))
108 if os.path.exists(os.path.join(path, '.locator')):
109 os.unlink(os.path.join(path, '.locator'))
111 for f in CollectionReader(tarball).all_files():
112 if re.search('\.(tbz|tar.bz2)$', f.name()):
113 p = tar_extractor(path, 'j')
114 elif re.search('\.(tgz|tar.gz)$', f.name()):
115 p = tar_extractor(path, 'z')
116 elif re.search('\.tar$', f.name()):
117 p = tar_extractor(path, '')
119 raise arvados.errors.AssertionError(
120 "tarball_extract cannot handle filename %s" % f.name())
128 if p.returncode != 0:
130 raise arvados.errors.CommandFailedError(
131 "tar exited %d" % p.returncode)
132 os.symlink(tarball, os.path.join(path, '.locator'))
133 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
135 if len(tld_extracts) == 1:
136 return os.path.join(path, tld_extracts[0])
139 def zipball_extract(zipball, path):
140 """Retrieve a zip archive from Keep and extract it to a local
141 directory. Return the absolute path where the archive was
142 extracted. If the top level of the archive contained just one
143 file or directory, return the absolute path of that single
146 zipball -- collection locator
147 path -- where to extract the archive: absolute, or relative to job tmp
149 if not re.search('^/', path):
150 path = os.path.join(arvados.current_job().tmpdir, path)
151 lockfile = open(path + '.lock', 'w')
152 fcntl.flock(lockfile, fcntl.LOCK_EX)
157 already_have_it = False
159 if os.readlink(os.path.join(path, '.locator')) == zipball:
160 already_have_it = True
163 if not already_have_it:
165 # emulate "rm -f" (i.e., if the file does not exist, we win)
167 os.unlink(os.path.join(path, '.locator'))
169 if os.path.exists(os.path.join(path, '.locator')):
170 os.unlink(os.path.join(path, '.locator'))
172 for f in CollectionReader(zipball).all_files():
173 if not re.search('\.zip$', f.name()):
174 raise arvados.errors.NotImplementedError(
175 "zipball_extract cannot handle filename %s" % f.name())
176 zip_filename = os.path.join(path, os.path.basename(f.name()))
177 zip_file = open(zip_filename, 'wb')
185 p = subprocess.Popen(["unzip",
190 stdin=None, stderr=sys.stderr,
191 shell=False, close_fds=True)
193 if p.returncode != 0:
195 raise arvados.errors.CommandFailedError(
196 "unzip exited %d" % p.returncode)
197 os.unlink(zip_filename)
198 os.symlink(zipball, os.path.join(path, '.locator'))
199 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
201 if len(tld_extracts) == 1:
202 return os.path.join(path, tld_extracts[0])
205 def collection_extract(collection, path, files=[], decompress=True):
206 """Retrieve a collection from Keep and extract it to a local
207 directory. Return the absolute path where the collection was
210 collection -- collection locator
211 path -- where to extract: absolute, or relative to job tmp
213 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
215 collection_hash = matches.group(1)
217 collection_hash = hashlib.md5(collection).hexdigest()
218 if not re.search('^/', path):
219 path = os.path.join(arvados.current_job().tmpdir, path)
220 lockfile = open(path + '.lock', 'w')
221 fcntl.flock(lockfile, fcntl.LOCK_EX)
226 already_have_it = False
228 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
229 already_have_it = True
233 # emulate "rm -f" (i.e., if the file does not exist, we win)
235 os.unlink(os.path.join(path, '.locator'))
237 if os.path.exists(os.path.join(path, '.locator')):
238 os.unlink(os.path.join(path, '.locator'))
241 for s in CollectionReader(collection).all_streams():
242 stream_name = s.name()
243 for f in s.all_files():
245 ((f.name() not in files_got) and
246 (f.name() in files or
247 (decompress and f.decompressed_name() in files)))):
248 outname = f.decompressed_name() if decompress else f.name()
249 files_got += [outname]
250 if os.path.exists(os.path.join(path, stream_name, outname)):
252 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
253 outfile = open(os.path.join(path, stream_name, outname), 'wb')
254 for buf in (f.readall_decompressed() if decompress
258 if len(files_got) < len(files):
259 raise arvados.errors.AssertionError(
260 "Wanted files %s but only got %s from %s" %
262 [z.name() for z in CollectionReader(collection).all_files()]))
263 os.symlink(collection_hash, os.path.join(path, '.locator'))
268 def mkdir_dash_p(path):
269 if not os.path.isdir(path):
273 if e.errno == errno.EEXIST and os.path.isdir(path):
274 # It is not an error if someone else creates the
275 # directory between our exists() and makedirs() calls.
280 def stream_extract(stream, path, files=[], decompress=True):
281 """Retrieve a stream from Keep and extract it to a local
282 directory. Return the absolute path where the stream was
285 stream -- StreamReader object
286 path -- where to extract: absolute, or relative to job tmp
288 if not re.search('^/', path):
289 path = os.path.join(arvados.current_job().tmpdir, path)
290 lockfile = open(path + '.lock', 'w')
291 fcntl.flock(lockfile, fcntl.LOCK_EX)
298 for f in stream.all_files():
300 ((f.name() not in files_got) and
301 (f.name() in files or
302 (decompress and f.decompressed_name() in files)))):
303 outname = f.decompressed_name() if decompress else f.name()
304 files_got += [outname]
305 if os.path.exists(os.path.join(path, outname)):
306 os.unlink(os.path.join(path, outname))
307 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
308 outfile = open(os.path.join(path, outname), 'wb')
309 for buf in (f.readall_decompressed() if decompress
313 if len(files_got) < len(files):
314 raise arvados.errors.AssertionError(
315 "Wanted files %s but only got %s from %s" %
316 (files, files_got, [z.name() for z in stream.all_files()]))
320 def listdir_recursive(dirname, base=None, max_depth=None):
321 """listdir_recursive(dirname, base, max_depth)
323 Return a list of file and directory names found under dirname.
325 If base is not None, prepend "{base}/" to each returned name.
327 If max_depth is None, descend into directories and return only the
328 names of files found in the directory tree.
330 If max_depth is a non-negative integer, stop descending into
331 directories at the given depth, and at that point return directory
334 If max_depth==0 (and base is None) this is equivalent to
335 sorted(os.listdir(dirname)).
338 for ent in sorted(os.listdir(dirname)):
339 ent_path = os.path.join(dirname, ent)
340 ent_base = os.path.join(base, ent) if base else ent
341 if os.path.isdir(ent_path) and max_depth != 0:
342 allfiles += listdir_recursive(
343 ent_path, base=ent_base,
344 max_depth=(max_depth-1 if max_depth else None))
346 allfiles += [ent_base]
349 def is_hex(s, *length_args):
350 """is_hex(s[, length[, max_length]]) -> boolean
352 Return True if s is a string of hexadecimal digits.
353 If one length argument is given, the string must contain exactly
354 that number of digits.
355 If two length arguments are given, the string must contain a number of
356 digits between those two lengths, inclusive.
357 Return False otherwise.
359 num_length_args = len(length_args)
360 if num_length_args > 2:
361 raise arvados.errors.ArgumentError(
362 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
363 elif num_length_args == 2:
364 good_len = (length_args[0] <= len(s) <= length_args[1])
365 elif num_length_args == 1:
366 good_len = (len(s) == length_args[0])
369 return bool(good_len and HEX_RE.match(s))
371 def list_all(fn, num_retries=0, **kwargs):
372 # Default limit to (effectively) api server's MAX_LIMIT
373 kwargs.setdefault('limit', sys.maxsize)
376 items_available = sys.maxsize
377 while len(items) < items_available:
378 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
380 items_available = c['items_available']
381 offset = c['offset'] + len(c['items'])
384 def ca_certs_path(fallback=httplib2.CA_CERTS):
385 """Return the path of the best available CA certs source.
387 This function searches for various distribution sources of CA
388 certificates, and returns the first it finds. If it doesn't find any,
389 it returns the value of `fallback` (httplib2's CA certs by default).
391 for ca_certs_path in [
393 '/etc/arvados/ca-certificates.crt',
395 '/etc/ssl/certs/ca-certificates.crt',
397 '/etc/pki/tls/certs/ca-bundle.crt',
399 if os.path.exists(ca_certs_path):