11 from arvados.collection import CollectionReader
13 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
15 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
16 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
17 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
18 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
19 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
20 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
21 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
22 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
23 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
25 def clear_tmpdir(path=None):
27 Ensure the given directory (or TASK_TMPDIR if none given)
31 path = arvados.current_task().tmpdir
32 if os.path.exists(path):
33 p = subprocess.Popen(['rm', '-rf', path])
34 stdout, stderr = p.communicate(None)
36 raise Exception('rm -rf %s: %s' % (path, stderr))
39 def run_command(execargs, **kwargs):
40 kwargs.setdefault('stdin', subprocess.PIPE)
41 kwargs.setdefault('stdout', subprocess.PIPE)
42 kwargs.setdefault('stderr', sys.stderr)
43 kwargs.setdefault('close_fds', True)
44 kwargs.setdefault('shell', False)
45 p = subprocess.Popen(execargs, **kwargs)
46 stdoutdata, stderrdata = p.communicate(None)
48 raise arvados.errors.CommandFailedError(
49 "run_command %s exit %d:\n%s" %
50 (execargs, p.returncode, stderrdata))
51 return stdoutdata, stderrdata
53 def git_checkout(url, version, path):
54 if not re.search('^/', path):
55 path = os.path.join(arvados.current_job().tmpdir, path)
56 if not os.path.exists(path):
57 run_command(["git", "clone", url, path],
58 cwd=os.path.dirname(path))
59 run_command(["git", "checkout", version],
63 def tar_extractor(path, decompress_flag):
64 return subprocess.Popen(["tar",
66 ("-x%sf" % decompress_flag),
69 stdin=subprocess.PIPE, stderr=sys.stderr,
70 shell=False, close_fds=True)
72 def tarball_extract(tarball, path):
73 """Retrieve a tarball from Keep and extract it to a local
74 directory. Return the absolute path where the tarball was
75 extracted. If the top level of the tarball contained just one
76 file or directory, return the absolute path of that single
79 tarball -- collection locator
80 path -- where to extract the tarball: absolute, or relative to job tmp
82 if not re.search('^/', path):
83 path = os.path.join(arvados.current_job().tmpdir, path)
84 lockfile = open(path + '.lock', 'w')
85 fcntl.flock(lockfile, fcntl.LOCK_EX)
90 already_have_it = False
92 if os.readlink(os.path.join(path, '.locator')) == tarball:
93 already_have_it = True
96 if not already_have_it:
98 # emulate "rm -f" (i.e., if the file does not exist, we win)
100 os.unlink(os.path.join(path, '.locator'))
102 if os.path.exists(os.path.join(path, '.locator')):
103 os.unlink(os.path.join(path, '.locator'))
105 for f in CollectionReader(tarball).all_files():
106 if re.search('\.(tbz|tar.bz2)$', f.name()):
107 p = tar_extractor(path, 'j')
108 elif re.search('\.(tgz|tar.gz)$', f.name()):
109 p = tar_extractor(path, 'z')
110 elif re.search('\.tar$', f.name()):
111 p = tar_extractor(path, '')
113 raise arvados.errors.AssertionError(
114 "tarball_extract cannot handle filename %s" % f.name())
122 if p.returncode != 0:
124 raise arvados.errors.CommandFailedError(
125 "tar exited %d" % p.returncode)
126 os.symlink(tarball, os.path.join(path, '.locator'))
127 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
129 if len(tld_extracts) == 1:
130 return os.path.join(path, tld_extracts[0])
133 def zipball_extract(zipball, path):
134 """Retrieve a zip archive from Keep and extract it to a local
135 directory. Return the absolute path where the archive was
136 extracted. If the top level of the archive contained just one
137 file or directory, return the absolute path of that single
140 zipball -- collection locator
141 path -- where to extract the archive: absolute, or relative to job tmp
143 if not re.search('^/', path):
144 path = os.path.join(arvados.current_job().tmpdir, path)
145 lockfile = open(path + '.lock', 'w')
146 fcntl.flock(lockfile, fcntl.LOCK_EX)
151 already_have_it = False
153 if os.readlink(os.path.join(path, '.locator')) == zipball:
154 already_have_it = True
157 if not already_have_it:
159 # emulate "rm -f" (i.e., if the file does not exist, we win)
161 os.unlink(os.path.join(path, '.locator'))
163 if os.path.exists(os.path.join(path, '.locator')):
164 os.unlink(os.path.join(path, '.locator'))
166 for f in CollectionReader(zipball).all_files():
167 if not re.search('\.zip$', f.name()):
168 raise arvados.errors.NotImplementedError(
169 "zipball_extract cannot handle filename %s" % f.name())
170 zip_filename = os.path.join(path, os.path.basename(f.name()))
171 zip_file = open(zip_filename, 'wb')
179 p = subprocess.Popen(["unzip",
184 stdin=None, stderr=sys.stderr,
185 shell=False, close_fds=True)
187 if p.returncode != 0:
189 raise arvados.errors.CommandFailedError(
190 "unzip exited %d" % p.returncode)
191 os.unlink(zip_filename)
192 os.symlink(zipball, os.path.join(path, '.locator'))
193 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
195 if len(tld_extracts) == 1:
196 return os.path.join(path, tld_extracts[0])
199 def collection_extract(collection, path, files=[], decompress=True):
200 """Retrieve a collection from Keep and extract it to a local
201 directory. Return the absolute path where the collection was
204 collection -- collection locator
205 path -- where to extract: absolute, or relative to job tmp
207 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
209 collection_hash = matches.group(1)
211 collection_hash = hashlib.md5(collection).hexdigest()
212 if not re.search('^/', path):
213 path = os.path.join(arvados.current_job().tmpdir, path)
214 lockfile = open(path + '.lock', 'w')
215 fcntl.flock(lockfile, fcntl.LOCK_EX)
220 already_have_it = False
222 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
223 already_have_it = True
227 # emulate "rm -f" (i.e., if the file does not exist, we win)
229 os.unlink(os.path.join(path, '.locator'))
231 if os.path.exists(os.path.join(path, '.locator')):
232 os.unlink(os.path.join(path, '.locator'))
235 for s in CollectionReader(collection).all_streams():
236 stream_name = s.name()
237 for f in s.all_files():
239 ((f.name() not in files_got) and
240 (f.name() in files or
241 (decompress and f.decompressed_name() in files)))):
242 outname = f.decompressed_name() if decompress else f.name()
243 files_got += [outname]
244 if os.path.exists(os.path.join(path, stream_name, outname)):
246 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
247 outfile = open(os.path.join(path, stream_name, outname), 'wb')
248 for buf in (f.readall_decompressed() if decompress
252 if len(files_got) < len(files):
253 raise arvados.errors.AssertionError(
254 "Wanted files %s but only got %s from %s" %
256 [z.name() for z in CollectionReader(collection).all_files()]))
257 os.symlink(collection_hash, os.path.join(path, '.locator'))
262 def mkdir_dash_p(path):
263 if not os.path.isdir(path):
267 if e.errno == errno.EEXIST and os.path.isdir(path):
268 # It is not an error if someone else creates the
269 # directory between our exists() and makedirs() calls.
274 def stream_extract(stream, path, files=[], decompress=True):
275 """Retrieve a stream from Keep and extract it to a local
276 directory. Return the absolute path where the stream was
279 stream -- StreamReader object
280 path -- where to extract: absolute, or relative to job tmp
282 if not re.search('^/', path):
283 path = os.path.join(arvados.current_job().tmpdir, path)
284 lockfile = open(path + '.lock', 'w')
285 fcntl.flock(lockfile, fcntl.LOCK_EX)
292 for f in stream.all_files():
294 ((f.name() not in files_got) and
295 (f.name() in files or
296 (decompress and f.decompressed_name() in files)))):
297 outname = f.decompressed_name() if decompress else f.name()
298 files_got += [outname]
299 if os.path.exists(os.path.join(path, outname)):
300 os.unlink(os.path.join(path, outname))
301 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
302 outfile = open(os.path.join(path, outname), 'wb')
303 for buf in (f.readall_decompressed() if decompress
307 if len(files_got) < len(files):
308 raise arvados.errors.AssertionError(
309 "Wanted files %s but only got %s from %s" %
310 (files, files_got, [z.name() for z in stream.all_files()]))
314 def listdir_recursive(dirname, base=None, max_depth=None):
315 """listdir_recursive(dirname, base, max_depth)
317 Return a list of file and directory names found under dirname.
319 If base is not None, prepend "{base}/" to each returned name.
321 If max_depth is None, descend into directories and return only the
322 names of files found in the directory tree.
324 If max_depth is a non-negative integer, stop descending into
325 directories at the given depth, and at that point return directory
328 If max_depth==0 (and base is None) this is equivalent to
329 sorted(os.listdir(dirname)).
332 for ent in sorted(os.listdir(dirname)):
333 ent_path = os.path.join(dirname, ent)
334 ent_base = os.path.join(base, ent) if base else ent
335 if os.path.isdir(ent_path) and max_depth != 0:
336 allfiles += listdir_recursive(
337 ent_path, base=ent_base,
338 max_depth=(max_depth-1 if max_depth else None))
340 allfiles += [ent_base]
343 def is_hex(s, *length_args):
344 """is_hex(s[, length[, max_length]]) -> boolean
346 Return True if s is a string of hexadecimal digits.
347 If one length argument is given, the string must contain exactly
348 that number of digits.
349 If two length arguments are given, the string must contain a number of
350 digits between those two lengths, inclusive.
351 Return False otherwise.
353 num_length_args = len(length_args)
354 if num_length_args > 2:
355 raise arvados.errors.ArgumentError(
356 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
357 elif num_length_args == 2:
358 good_len = (length_args[0] <= len(s) <= length_args[1])
359 elif num_length_args == 1:
360 good_len = (len(s) == length_args[0])
363 return bool(good_len and HEX_RE.match(s))
365 def list_all(fn, num_retries=0, **kwargs):
366 # Default limit to (effectively) api server's MAX_LIMIT
367 kwargs.setdefault('limit', sys.maxsize)
370 items_available = sys.maxsize
371 while len(items) < items_available:
372 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
374 items_available = c['items_available']
375 offset = c['offset'] + len(c['items'])
378 def ca_certs_path(fallback=httplib2.CA_CERTS):
379 """Return the path of the best available CA certs source.
381 This function searches for various distribution sources of CA
382 certificates, and returns the first it finds. If it doesn't find any,
383 it returns the value of `fallback` (httplib2's CA certs by default).
385 for ca_certs_path in [
387 '/etc/arvados/ca-certificates.crt',
389 '/etc/ssl/certs/ca-certificates.crt',
391 '/etc/pki/tls/certs/ca-bundle.crt',
393 if os.path.exists(ca_certs_path):