1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
15 from arvados.collection import CollectionReader
17 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
19 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
20 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
21 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
22 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
23 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
24 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
25 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
26 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
27 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
29 def clear_tmpdir(path=None):
31 Ensure the given directory (or TASK_TMPDIR if none given)
35 path = arvados.current_task().tmpdir
36 if os.path.exists(path):
37 p = subprocess.Popen(['rm', '-rf', path])
38 stdout, stderr = p.communicate(None)
40 raise Exception('rm -rf %s: %s' % (path, stderr))
43 def run_command(execargs, **kwargs):
44 kwargs.setdefault('stdin', subprocess.PIPE)
45 kwargs.setdefault('stdout', subprocess.PIPE)
46 kwargs.setdefault('stderr', sys.stderr)
47 kwargs.setdefault('close_fds', True)
48 kwargs.setdefault('shell', False)
49 p = subprocess.Popen(execargs, **kwargs)
50 stdoutdata, stderrdata = p.communicate(None)
52 raise arvados.errors.CommandFailedError(
53 "run_command %s exit %d:\n%s" %
54 (execargs, p.returncode, stderrdata))
55 return stdoutdata, stderrdata
57 def git_checkout(url, version, path):
58 if not re.search('^/', path):
59 path = os.path.join(arvados.current_job().tmpdir, path)
60 if not os.path.exists(path):
61 run_command(["git", "clone", url, path],
62 cwd=os.path.dirname(path))
63 run_command(["git", "checkout", version],
67 def tar_extractor(path, decompress_flag):
68 return subprocess.Popen(["tar",
70 ("-x%sf" % decompress_flag),
73 stdin=subprocess.PIPE, stderr=sys.stderr,
74 shell=False, close_fds=True)
76 def tarball_extract(tarball, path):
77 """Retrieve a tarball from Keep and extract it to a local
78 directory. Return the absolute path where the tarball was
79 extracted. If the top level of the tarball contained just one
80 file or directory, return the absolute path of that single
83 tarball -- collection locator
84 path -- where to extract the tarball: absolute, or relative to job tmp
86 if not re.search('^/', path):
87 path = os.path.join(arvados.current_job().tmpdir, path)
88 lockfile = open(path + '.lock', 'w')
89 fcntl.flock(lockfile, fcntl.LOCK_EX)
94 already_have_it = False
96 if os.readlink(os.path.join(path, '.locator')) == tarball:
97 already_have_it = True
100 if not already_have_it:
102 # emulate "rm -f" (i.e., if the file does not exist, we win)
104 os.unlink(os.path.join(path, '.locator'))
106 if os.path.exists(os.path.join(path, '.locator')):
107 os.unlink(os.path.join(path, '.locator'))
109 for f in CollectionReader(tarball).all_files():
110 if re.search('\.(tbz|tar.bz2)$', f.name()):
111 p = tar_extractor(path, 'j')
112 elif re.search('\.(tgz|tar.gz)$', f.name()):
113 p = tar_extractor(path, 'z')
114 elif re.search('\.tar$', f.name()):
115 p = tar_extractor(path, '')
117 raise arvados.errors.AssertionError(
118 "tarball_extract cannot handle filename %s" % f.name())
126 if p.returncode != 0:
128 raise arvados.errors.CommandFailedError(
129 "tar exited %d" % p.returncode)
130 os.symlink(tarball, os.path.join(path, '.locator'))
131 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
133 if len(tld_extracts) == 1:
134 return os.path.join(path, tld_extracts[0])
137 def zipball_extract(zipball, path):
138 """Retrieve a zip archive from Keep and extract it to a local
139 directory. Return the absolute path where the archive was
140 extracted. If the top level of the archive contained just one
141 file or directory, return the absolute path of that single
144 zipball -- collection locator
145 path -- where to extract the archive: absolute, or relative to job tmp
147 if not re.search('^/', path):
148 path = os.path.join(arvados.current_job().tmpdir, path)
149 lockfile = open(path + '.lock', 'w')
150 fcntl.flock(lockfile, fcntl.LOCK_EX)
155 already_have_it = False
157 if os.readlink(os.path.join(path, '.locator')) == zipball:
158 already_have_it = True
161 if not already_have_it:
163 # emulate "rm -f" (i.e., if the file does not exist, we win)
165 os.unlink(os.path.join(path, '.locator'))
167 if os.path.exists(os.path.join(path, '.locator')):
168 os.unlink(os.path.join(path, '.locator'))
170 for f in CollectionReader(zipball).all_files():
171 if not re.search('\.zip$', f.name()):
172 raise arvados.errors.NotImplementedError(
173 "zipball_extract cannot handle filename %s" % f.name())
174 zip_filename = os.path.join(path, os.path.basename(f.name()))
175 zip_file = open(zip_filename, 'wb')
183 p = subprocess.Popen(["unzip",
188 stdin=None, stderr=sys.stderr,
189 shell=False, close_fds=True)
191 if p.returncode != 0:
193 raise arvados.errors.CommandFailedError(
194 "unzip exited %d" % p.returncode)
195 os.unlink(zip_filename)
196 os.symlink(zipball, os.path.join(path, '.locator'))
197 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
199 if len(tld_extracts) == 1:
200 return os.path.join(path, tld_extracts[0])
203 def collection_extract(collection, path, files=[], decompress=True):
204 """Retrieve a collection from Keep and extract it to a local
205 directory. Return the absolute path where the collection was
208 collection -- collection locator
209 path -- where to extract: absolute, or relative to job tmp
211 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
213 collection_hash = matches.group(1)
215 collection_hash = hashlib.md5(collection).hexdigest()
216 if not re.search('^/', path):
217 path = os.path.join(arvados.current_job().tmpdir, path)
218 lockfile = open(path + '.lock', 'w')
219 fcntl.flock(lockfile, fcntl.LOCK_EX)
224 already_have_it = False
226 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
227 already_have_it = True
231 # emulate "rm -f" (i.e., if the file does not exist, we win)
233 os.unlink(os.path.join(path, '.locator'))
235 if os.path.exists(os.path.join(path, '.locator')):
236 os.unlink(os.path.join(path, '.locator'))
239 for s in CollectionReader(collection).all_streams():
240 stream_name = s.name()
241 for f in s.all_files():
243 ((f.name() not in files_got) and
244 (f.name() in files or
245 (decompress and f.decompressed_name() in files)))):
246 outname = f.decompressed_name() if decompress else f.name()
247 files_got += [outname]
248 if os.path.exists(os.path.join(path, stream_name, outname)):
250 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
251 outfile = open(os.path.join(path, stream_name, outname), 'wb')
252 for buf in (f.readall_decompressed() if decompress
256 if len(files_got) < len(files):
257 raise arvados.errors.AssertionError(
258 "Wanted files %s but only got %s from %s" %
260 [z.name() for z in CollectionReader(collection).all_files()]))
261 os.symlink(collection_hash, os.path.join(path, '.locator'))
266 def mkdir_dash_p(path):
267 if not os.path.isdir(path):
271 if e.errno == errno.EEXIST and os.path.isdir(path):
272 # It is not an error if someone else creates the
273 # directory between our exists() and makedirs() calls.
278 def stream_extract(stream, path, files=[], decompress=True):
279 """Retrieve a stream from Keep and extract it to a local
280 directory. Return the absolute path where the stream was
283 stream -- StreamReader object
284 path -- where to extract: absolute, or relative to job tmp
286 if not re.search('^/', path):
287 path = os.path.join(arvados.current_job().tmpdir, path)
288 lockfile = open(path + '.lock', 'w')
289 fcntl.flock(lockfile, fcntl.LOCK_EX)
296 for f in stream.all_files():
298 ((f.name() not in files_got) and
299 (f.name() in files or
300 (decompress and f.decompressed_name() in files)))):
301 outname = f.decompressed_name() if decompress else f.name()
302 files_got += [outname]
303 if os.path.exists(os.path.join(path, outname)):
304 os.unlink(os.path.join(path, outname))
305 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
306 outfile = open(os.path.join(path, outname), 'wb')
307 for buf in (f.readall_decompressed() if decompress
311 if len(files_got) < len(files):
312 raise arvados.errors.AssertionError(
313 "Wanted files %s but only got %s from %s" %
314 (files, files_got, [z.name() for z in stream.all_files()]))
318 def listdir_recursive(dirname, base=None, max_depth=None):
319 """listdir_recursive(dirname, base, max_depth)
321 Return a list of file and directory names found under dirname.
323 If base is not None, prepend "{base}/" to each returned name.
325 If max_depth is None, descend into directories and return only the
326 names of files found in the directory tree.
328 If max_depth is a non-negative integer, stop descending into
329 directories at the given depth, and at that point return directory
332 If max_depth==0 (and base is None) this is equivalent to
333 sorted(os.listdir(dirname)).
336 for ent in sorted(os.listdir(dirname)):
337 ent_path = os.path.join(dirname, ent)
338 ent_base = os.path.join(base, ent) if base else ent
339 if os.path.isdir(ent_path) and max_depth != 0:
340 allfiles += listdir_recursive(
341 ent_path, base=ent_base,
342 max_depth=(max_depth-1 if max_depth else None))
344 allfiles += [ent_base]
347 def is_hex(s, *length_args):
348 """is_hex(s[, length[, max_length]]) -> boolean
350 Return True if s is a string of hexadecimal digits.
351 If one length argument is given, the string must contain exactly
352 that number of digits.
353 If two length arguments are given, the string must contain a number of
354 digits between those two lengths, inclusive.
355 Return False otherwise.
357 num_length_args = len(length_args)
358 if num_length_args > 2:
359 raise arvados.errors.ArgumentError(
360 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
361 elif num_length_args == 2:
362 good_len = (length_args[0] <= len(s) <= length_args[1])
363 elif num_length_args == 1:
364 good_len = (len(s) == length_args[0])
367 return bool(good_len and HEX_RE.match(s))
369 def list_all(fn, num_retries=0, **kwargs):
370 # Default limit to (effectively) api server's MAX_LIMIT
371 kwargs.setdefault('limit', sys.maxsize)
374 items_available = sys.maxsize
375 while len(items) < items_available:
376 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
378 items_available = c['items_available']
379 offset = c['offset'] + len(c['items'])
382 def ca_certs_path(fallback=httplib2.CA_CERTS):
383 """Return the path of the best available CA certs source.
385 This function searches for various distribution sources of CA
386 certificates, and returns the first it finds. If it doesn't find any,
387 it returns the value of `fallback` (httplib2's CA certs by default).
389 for ca_certs_path in [
391 '/etc/arvados/ca-certificates.crt',
393 '/etc/ssl/certs/ca-certificates.crt',
395 '/etc/pki/tls/certs/ca-bundle.crt',
397 if os.path.exists(ca_certs_path):