8 from arvados.collection import *
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
12 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
13 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
14 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
15 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
16 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
17 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
18 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
19 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
20 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
22 def clear_tmpdir(path=None):
24 Ensure the given directory (or TASK_TMPDIR if none given)
28 path = arvados.current_task().tmpdir
29 if os.path.exists(path):
30 p = subprocess.Popen(['rm', '-rf', path])
31 stdout, stderr = p.communicate(None)
33 raise Exception('rm -rf %s: %s' % (path, stderr))
36 def run_command(execargs, **kwargs):
37 kwargs.setdefault('stdin', subprocess.PIPE)
38 kwargs.setdefault('stdout', subprocess.PIPE)
39 kwargs.setdefault('stderr', sys.stderr)
40 kwargs.setdefault('close_fds', True)
41 kwargs.setdefault('shell', False)
42 p = subprocess.Popen(execargs, **kwargs)
43 stdoutdata, stderrdata = p.communicate(None)
45 raise errors.CommandFailedError(
46 "run_command %s exit %d:\n%s" %
47 (execargs, p.returncode, stderrdata))
48 return stdoutdata, stderrdata
50 def git_checkout(url, version, path):
51 if not re.search('^/', path):
52 path = os.path.join(arvados.current_job().tmpdir, path)
53 if not os.path.exists(path):
54 run_command(["git", "clone", url, path],
55 cwd=os.path.dirname(path))
56 run_command(["git", "checkout", version],
60 def tar_extractor(path, decompress_flag):
61 return subprocess.Popen(["tar",
63 ("-x%sf" % decompress_flag),
66 stdin=subprocess.PIPE, stderr=sys.stderr,
67 shell=False, close_fds=True)
69 def tarball_extract(tarball, path):
70 """Retrieve a tarball from Keep and extract it to a local
71 directory. Return the absolute path where the tarball was
72 extracted. If the top level of the tarball contained just one
73 file or directory, return the absolute path of that single
76 tarball -- collection locator
77 path -- where to extract the tarball: absolute, or relative to job tmp
79 if not re.search('^/', path):
80 path = os.path.join(arvados.current_job().tmpdir, path)
81 lockfile = open(path + '.lock', 'w')
82 fcntl.flock(lockfile, fcntl.LOCK_EX)
87 already_have_it = False
89 if os.readlink(os.path.join(path, '.locator')) == tarball:
90 already_have_it = True
93 if not already_have_it:
95 # emulate "rm -f" (i.e., if the file does not exist, we win)
97 os.unlink(os.path.join(path, '.locator'))
99 if os.path.exists(os.path.join(path, '.locator')):
100 os.unlink(os.path.join(path, '.locator'))
102 for f in CollectionReader(tarball).all_files():
103 if re.search('\.(tbz|tar.bz2)$', f.name()):
104 p = tar_extractor(path, 'j')
105 elif re.search('\.(tgz|tar.gz)$', f.name()):
106 p = tar_extractor(path, 'z')
107 elif re.search('\.tar$', f.name()):
108 p = tar_extractor(path, '')
110 raise errors.AssertionError(
111 "tarball_extract cannot handle filename %s" % f.name())
119 if p.returncode != 0:
121 raise errors.CommandFailedError(
122 "tar exited %d" % p.returncode)
123 os.symlink(tarball, os.path.join(path, '.locator'))
124 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
126 if len(tld_extracts) == 1:
127 return os.path.join(path, tld_extracts[0])
130 def zipball_extract(zipball, path):
131 """Retrieve a zip archive from Keep and extract it to a local
132 directory. Return the absolute path where the archive was
133 extracted. If the top level of the archive contained just one
134 file or directory, return the absolute path of that single
137 zipball -- collection locator
138 path -- where to extract the archive: absolute, or relative to job tmp
140 if not re.search('^/', path):
141 path = os.path.join(arvados.current_job().tmpdir, path)
142 lockfile = open(path + '.lock', 'w')
143 fcntl.flock(lockfile, fcntl.LOCK_EX)
148 already_have_it = False
150 if os.readlink(os.path.join(path, '.locator')) == zipball:
151 already_have_it = True
154 if not already_have_it:
156 # emulate "rm -f" (i.e., if the file does not exist, we win)
158 os.unlink(os.path.join(path, '.locator'))
160 if os.path.exists(os.path.join(path, '.locator')):
161 os.unlink(os.path.join(path, '.locator'))
163 for f in CollectionReader(zipball).all_files():
164 if not re.search('\.zip$', f.name()):
165 raise errors.NotImplementedError(
166 "zipball_extract cannot handle filename %s" % f.name())
167 zip_filename = os.path.join(path, os.path.basename(f.name()))
168 zip_file = open(zip_filename, 'wb')
176 p = subprocess.Popen(["unzip",
181 stdin=None, stderr=sys.stderr,
182 shell=False, close_fds=True)
184 if p.returncode != 0:
186 raise errors.CommandFailedError(
187 "unzip exited %d" % p.returncode)
188 os.unlink(zip_filename)
189 os.symlink(zipball, os.path.join(path, '.locator'))
190 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
192 if len(tld_extracts) == 1:
193 return os.path.join(path, tld_extracts[0])
196 def collection_extract(collection, path, files=[], decompress=True):
197 """Retrieve a collection from Keep and extract it to a local
198 directory. Return the absolute path where the collection was
201 collection -- collection locator
202 path -- where to extract: absolute, or relative to job tmp
204 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
206 collection_hash = matches.group(1)
208 collection_hash = hashlib.md5(collection).hexdigest()
209 if not re.search('^/', path):
210 path = os.path.join(arvados.current_job().tmpdir, path)
211 lockfile = open(path + '.lock', 'w')
212 fcntl.flock(lockfile, fcntl.LOCK_EX)
217 already_have_it = False
219 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
220 already_have_it = True
224 # emulate "rm -f" (i.e., if the file does not exist, we win)
226 os.unlink(os.path.join(path, '.locator'))
228 if os.path.exists(os.path.join(path, '.locator')):
229 os.unlink(os.path.join(path, '.locator'))
232 for s in CollectionReader(collection).all_streams():
233 stream_name = s.name()
234 for f in s.all_files():
236 ((f.name() not in files_got) and
237 (f.name() in files or
238 (decompress and f.decompressed_name() in files)))):
239 outname = f.decompressed_name() if decompress else f.name()
240 files_got += [outname]
241 if os.path.exists(os.path.join(path, stream_name, outname)):
243 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
244 outfile = open(os.path.join(path, stream_name, outname), 'wb')
245 for buf in (f.readall_decompressed() if decompress
249 if len(files_got) < len(files):
250 raise errors.AssertionError(
251 "Wanted files %s but only got %s from %s" %
253 [z.name() for z in CollectionReader(collection).all_files()]))
254 os.symlink(collection_hash, os.path.join(path, '.locator'))
259 def mkdir_dash_p(path):
260 if not os.path.isdir(path):
264 if e.errno == errno.EEXIST and os.path.isdir(path):
265 # It is not an error if someone else creates the
266 # directory between our exists() and makedirs() calls.
271 def stream_extract(stream, path, files=[], decompress=True):
272 """Retrieve a stream from Keep and extract it to a local
273 directory. Return the absolute path where the stream was
276 stream -- StreamReader object
277 path -- where to extract: absolute, or relative to job tmp
279 if not re.search('^/', path):
280 path = os.path.join(arvados.current_job().tmpdir, path)
281 lockfile = open(path + '.lock', 'w')
282 fcntl.flock(lockfile, fcntl.LOCK_EX)
289 for f in stream.all_files():
291 ((f.name() not in files_got) and
292 (f.name() in files or
293 (decompress and f.decompressed_name() in files)))):
294 outname = f.decompressed_name() if decompress else f.name()
295 files_got += [outname]
296 if os.path.exists(os.path.join(path, outname)):
297 os.unlink(os.path.join(path, outname))
298 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
299 outfile = open(os.path.join(path, outname), 'wb')
300 for buf in (f.readall_decompressed() if decompress
304 if len(files_got) < len(files):
305 raise errors.AssertionError(
306 "Wanted files %s but only got %s from %s" %
307 (files, files_got, [z.name() for z in stream.all_files()]))
311 def listdir_recursive(dirname, base=None, max_depth=None):
312 """listdir_recursive(dirname, base, max_depth)
314 Return a list of file and directory names found under dirname.
316 If base is not None, prepend "{base}/" to each returned name.
318 If max_depth is None, descend into directories and return only the
319 names of files found in the directory tree.
321 If max_depth is a non-negative integer, stop descending into
322 directories at the given depth, and at that point return directory
325 If max_depth==0 (and base is None) this is equivalent to
326 sorted(os.listdir(dirname)).
329 for ent in sorted(os.listdir(dirname)):
330 ent_path = os.path.join(dirname, ent)
331 ent_base = os.path.join(base, ent) if base else ent
332 if os.path.isdir(ent_path) and max_depth != 0:
333 allfiles += listdir_recursive(
334 ent_path, base=ent_base,
335 max_depth=(max_depth-1 if max_depth else None))
337 allfiles += [ent_base]
340 def is_hex(s, *length_args):
341 """is_hex(s[, length[, max_length]]) -> boolean
343 Return True if s is a string of hexadecimal digits.
344 If one length argument is given, the string must contain exactly
345 that number of digits.
346 If two length arguments are given, the string must contain a number of
347 digits between those two lengths, inclusive.
348 Return False otherwise.
350 num_length_args = len(length_args)
351 if num_length_args > 2:
352 raise errors.ArgumentError("is_hex accepts up to 3 arguments ({} given)"
353 .format(1 + num_length_args))
354 elif num_length_args == 2:
355 good_len = (length_args[0] <= len(s) <= length_args[1])
356 elif num_length_args == 1:
357 good_len = (len(s) == length_args[0])
360 return bool(good_len and HEX_RE.match(s))
362 def list_all(fn, num_retries=0, **kwargs):
365 items_available = sys.maxint
366 while len(items) < items_available:
367 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
369 items_available = c['items_available']
370 offset = c['offset'] + len(c['items'])