8 from arvados.collection import *
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
12 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
13 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
14 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
15 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
16 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
17 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
18 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
19 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
20 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
22 def clear_tmpdir(path=None):
24 Ensure the given directory (or TASK_TMPDIR if none given)
28 path = arvados.current_task().tmpdir
29 if os.path.exists(path):
30 p = subprocess.Popen(['rm', '-rf', path])
31 stdout, stderr = p.communicate(None)
33 raise Exception('rm -rf %s: %s' % (path, stderr))
36 def run_command(execargs, **kwargs):
37 kwargs.setdefault('stdin', subprocess.PIPE)
38 kwargs.setdefault('stdout', subprocess.PIPE)
39 kwargs.setdefault('stderr', sys.stderr)
40 kwargs.setdefault('close_fds', True)
41 kwargs.setdefault('shell', False)
42 p = subprocess.Popen(execargs, **kwargs)
43 stdoutdata, stderrdata = p.communicate(None)
45 raise errors.CommandFailedError(
46 "run_command %s exit %d:\n%s" %
47 (execargs, p.returncode, stderrdata))
48 return stdoutdata, stderrdata
50 def git_checkout(url, version, path):
51 if not re.search('^/', path):
52 path = os.path.join(arvados.current_job().tmpdir, path)
53 if not os.path.exists(path):
54 run_command(["git", "clone", url, path],
55 cwd=os.path.dirname(path))
56 run_command(["git", "checkout", version],
60 def tar_extractor(path, decompress_flag):
61 return subprocess.Popen(["tar",
63 ("-x%sf" % decompress_flag),
66 stdin=subprocess.PIPE, stderr=sys.stderr,
67 shell=False, close_fds=True)
69 def tarball_extract(tarball, path):
70 """Retrieve a tarball from Keep and extract it to a local
71 directory. Return the absolute path where the tarball was
72 extracted. If the top level of the tarball contained just one
73 file or directory, return the absolute path of that single
76 tarball -- collection locator
77 path -- where to extract the tarball: absolute, or relative to job tmp
79 if not re.search('^/', path):
80 path = os.path.join(arvados.current_job().tmpdir, path)
81 lockfile = open(path + '.lock', 'w')
82 fcntl.flock(lockfile, fcntl.LOCK_EX)
87 already_have_it = False
89 if os.readlink(os.path.join(path, '.locator')) == tarball:
90 already_have_it = True
93 if not already_have_it:
95 # emulate "rm -f" (i.e., if the file does not exist, we win)
97 os.unlink(os.path.join(path, '.locator'))
99 if os.path.exists(os.path.join(path, '.locator')):
100 os.unlink(os.path.join(path, '.locator'))
102 for f in CollectionReader(tarball).all_files():
103 if re.search('\.(tbz|tar.bz2)$', f.name()):
104 p = tar_extractor(path, 'j')
105 elif re.search('\.(tgz|tar.gz)$', f.name()):
106 p = tar_extractor(path, 'z')
107 elif re.search('\.tar$', f.name()):
108 p = tar_extractor(path, '')
110 raise errors.AssertionError(
111 "tarball_extract cannot handle filename %s" % f.name())
119 if p.returncode != 0:
121 raise errors.CommandFailedError(
122 "tar exited %d" % p.returncode)
123 os.symlink(tarball, os.path.join(path, '.locator'))
124 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
126 if len(tld_extracts) == 1:
127 return os.path.join(path, tld_extracts[0])
130 def zipball_extract(zipball, path):
131 """Retrieve a zip archive from Keep and extract it to a local
132 directory. Return the absolute path where the archive was
133 extracted. If the top level of the archive contained just one
134 file or directory, return the absolute path of that single
137 zipball -- collection locator
138 path -- where to extract the archive: absolute, or relative to job tmp
140 if not re.search('^/', path):
141 path = os.path.join(arvados.current_job().tmpdir, path)
142 lockfile = open(path + '.lock', 'w')
143 fcntl.flock(lockfile, fcntl.LOCK_EX)
148 already_have_it = False
150 if os.readlink(os.path.join(path, '.locator')) == zipball:
151 already_have_it = True
154 if not already_have_it:
156 # emulate "rm -f" (i.e., if the file does not exist, we win)
158 os.unlink(os.path.join(path, '.locator'))
160 if os.path.exists(os.path.join(path, '.locator')):
161 os.unlink(os.path.join(path, '.locator'))
163 for f in CollectionReader(zipball).all_files():
164 if not re.search('\.zip$', f.name()):
165 raise errors.NotImplementedError(
166 "zipball_extract cannot handle filename %s" % f.name())
167 zip_filename = os.path.join(path, os.path.basename(f.name()))
168 zip_file = open(zip_filename, 'wb')
176 p = subprocess.Popen(["unzip",
181 stdin=None, stderr=sys.stderr,
182 shell=False, close_fds=True)
184 if p.returncode != 0:
186 raise errors.CommandFailedError(
187 "unzip exited %d" % p.returncode)
188 os.unlink(zip_filename)
189 os.symlink(zipball, os.path.join(path, '.locator'))
190 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
192 if len(tld_extracts) == 1:
193 return os.path.join(path, tld_extracts[0])
196 def collection_extract(collection, path, files=[], decompress=True):
197 """Retrieve a collection from Keep and extract it to a local
198 directory. Return the absolute path where the collection was
201 collection -- collection locator
202 path -- where to extract: absolute, or relative to job tmp
204 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
206 collection_hash = matches.group(1)
208 collection_hash = hashlib.md5(collection).hexdigest()
209 if not re.search('^/', path):
210 path = os.path.join(arvados.current_job().tmpdir, path)
211 lockfile = open(path + '.lock', 'w')
212 fcntl.flock(lockfile, fcntl.LOCK_EX)
217 already_have_it = False
219 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
220 already_have_it = True
224 # emulate "rm -f" (i.e., if the file does not exist, we win)
226 os.unlink(os.path.join(path, '.locator'))
228 if os.path.exists(os.path.join(path, '.locator')):
229 os.unlink(os.path.join(path, '.locator'))
232 for s in CollectionReader(collection).all_streams():
233 stream_name = s.name()
234 for f in s.all_files():
236 ((f.name() not in files_got) and
237 (f.name() in files or
238 (decompress and f.decompressed_name() in files)))):
239 outname = f.decompressed_name() if decompress else f.name()
240 files_got += [outname]
241 if os.path.exists(os.path.join(path, stream_name, outname)):
243 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
244 outfile = open(os.path.join(path, stream_name, outname), 'wb')
245 for buf in (f.readall_decompressed() if decompress
249 if len(files_got) < len(files):
250 raise errors.AssertionError(
251 "Wanted files %s but only got %s from %s" %
253 [z.name() for z in CollectionReader(collection).all_files()]))
254 os.symlink(collection_hash, os.path.join(path, '.locator'))
259 def mkdir_dash_p(path):
260 if not os.path.isdir(path):
264 if e.errno == errno.EEXIST and os.path.isdir(path):
265 # It is not an error if someone else creates the
266 # directory between our exists() and makedirs() calls.
271 def stream_extract(stream, path, files=[], decompress=True):
272 """Retrieve a stream from Keep and extract it to a local
273 directory. Return the absolute path where the stream was
276 stream -- StreamReader object
277 path -- where to extract: absolute, or relative to job tmp
279 if not re.search('^/', path):
280 path = os.path.join(arvados.current_job().tmpdir, path)
281 lockfile = open(path + '.lock', 'w')
282 fcntl.flock(lockfile, fcntl.LOCK_EX)
289 for f in stream.all_files():
291 ((f.name() not in files_got) and
292 (f.name() in files or
293 (decompress and f.decompressed_name() in files)))):
294 outname = f.decompressed_name() if decompress else f.name()
295 files_got += [outname]
296 if os.path.exists(os.path.join(path, outname)):
297 os.unlink(os.path.join(path, outname))
298 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
299 outfile = open(os.path.join(path, outname), 'wb')
300 for buf in (f.readall_decompressed() if decompress
304 if len(files_got) < len(files):
305 raise errors.AssertionError(
306 "Wanted files %s but only got %s from %s" %
307 (files, files_got, [z.name() for z in stream.all_files()]))
311 def listdir_recursive(dirname, base=None):
313 for ent in sorted(os.listdir(dirname)):
314 ent_path = os.path.join(dirname, ent)
315 ent_base = os.path.join(base, ent) if base else ent
316 if os.path.isdir(ent_path):
317 allfiles += listdir_recursive(ent_path, ent_base)
319 allfiles += [ent_base]
322 def is_hex(s, *length_args):
323 """is_hex(s[, length[, max_length]]) -> boolean
325 Return True if s is a string of hexadecimal digits.
326 If one length argument is given, the string must contain exactly
327 that number of digits.
328 If two length arguments are given, the string must contain a number of
329 digits between those two lengths, inclusive.
330 Return False otherwise.
332 num_length_args = len(length_args)
333 if num_length_args > 2:
334 raise ArgumentError("is_hex accepts up to 3 arguments ({} given)".
335 format(1 + num_length_args))
336 elif num_length_args == 2:
337 good_len = (length_args[0] <= len(s) <= length_args[1])
338 elif num_length_args == 1:
339 good_len = (len(s) == length_args[0])
342 return bool(good_len and HEX_RE.match(s))
344 def list_all(fn, num_retries=0, **kwargs):
347 items_available = sys.maxint
348 while len(items) < items_available:
349 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
351 items_available = c['items_available']
352 offset = c['offset'] + len(c['items'])