10 from arvados.collection import CollectionReader
12 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
14 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
15 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
16 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
17 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
18 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
19 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
20 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
21 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
22 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
24 def clear_tmpdir(path=None):
26 Ensure the given directory (or TASK_TMPDIR if none given)
30 path = arvados.current_task().tmpdir
31 if os.path.exists(path):
32 p = subprocess.Popen(['rm', '-rf', path])
33 stdout, stderr = p.communicate(None)
35 raise Exception('rm -rf %s: %s' % (path, stderr))
38 def run_command(execargs, **kwargs):
39 kwargs.setdefault('stdin', subprocess.PIPE)
40 kwargs.setdefault('stdout', subprocess.PIPE)
41 kwargs.setdefault('stderr', sys.stderr)
42 kwargs.setdefault('close_fds', True)
43 kwargs.setdefault('shell', False)
44 p = subprocess.Popen(execargs, **kwargs)
45 stdoutdata, stderrdata = p.communicate(None)
47 raise errors.CommandFailedError(
48 "run_command %s exit %d:\n%s" %
49 (execargs, p.returncode, stderrdata))
50 return stdoutdata, stderrdata
52 def git_checkout(url, version, path):
53 if not re.search('^/', path):
54 path = os.path.join(arvados.current_job().tmpdir, path)
55 if not os.path.exists(path):
56 run_command(["git", "clone", url, path],
57 cwd=os.path.dirname(path))
58 run_command(["git", "checkout", version],
62 def tar_extractor(path, decompress_flag):
63 return subprocess.Popen(["tar",
65 ("-x%sf" % decompress_flag),
68 stdin=subprocess.PIPE, stderr=sys.stderr,
69 shell=False, close_fds=True)
71 def tarball_extract(tarball, path):
72 """Retrieve a tarball from Keep and extract it to a local
73 directory. Return the absolute path where the tarball was
74 extracted. If the top level of the tarball contained just one
75 file or directory, return the absolute path of that single
78 tarball -- collection locator
79 path -- where to extract the tarball: absolute, or relative to job tmp
81 if not re.search('^/', path):
82 path = os.path.join(arvados.current_job().tmpdir, path)
83 lockfile = open(path + '.lock', 'w')
84 fcntl.flock(lockfile, fcntl.LOCK_EX)
89 already_have_it = False
91 if os.readlink(os.path.join(path, '.locator')) == tarball:
92 already_have_it = True
95 if not already_have_it:
97 # emulate "rm -f" (i.e., if the file does not exist, we win)
99 os.unlink(os.path.join(path, '.locator'))
101 if os.path.exists(os.path.join(path, '.locator')):
102 os.unlink(os.path.join(path, '.locator'))
104 for f in CollectionReader(tarball).all_files():
105 if re.search('\.(tbz|tar.bz2)$', f.name()):
106 p = tar_extractor(path, 'j')
107 elif re.search('\.(tgz|tar.gz)$', f.name()):
108 p = tar_extractor(path, 'z')
109 elif re.search('\.tar$', f.name()):
110 p = tar_extractor(path, '')
112 raise errors.AssertionError(
113 "tarball_extract cannot handle filename %s" % f.name())
121 if p.returncode != 0:
123 raise errors.CommandFailedError(
124 "tar exited %d" % p.returncode)
125 os.symlink(tarball, os.path.join(path, '.locator'))
126 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
128 if len(tld_extracts) == 1:
129 return os.path.join(path, tld_extracts[0])
132 def zipball_extract(zipball, path):
133 """Retrieve a zip archive from Keep and extract it to a local
134 directory. Return the absolute path where the archive was
135 extracted. If the top level of the archive contained just one
136 file or directory, return the absolute path of that single
139 zipball -- collection locator
140 path -- where to extract the archive: absolute, or relative to job tmp
142 if not re.search('^/', path):
143 path = os.path.join(arvados.current_job().tmpdir, path)
144 lockfile = open(path + '.lock', 'w')
145 fcntl.flock(lockfile, fcntl.LOCK_EX)
150 already_have_it = False
152 if os.readlink(os.path.join(path, '.locator')) == zipball:
153 already_have_it = True
156 if not already_have_it:
158 # emulate "rm -f" (i.e., if the file does not exist, we win)
160 os.unlink(os.path.join(path, '.locator'))
162 if os.path.exists(os.path.join(path, '.locator')):
163 os.unlink(os.path.join(path, '.locator'))
165 for f in CollectionReader(zipball).all_files():
166 if not re.search('\.zip$', f.name()):
167 raise errors.NotImplementedError(
168 "zipball_extract cannot handle filename %s" % f.name())
169 zip_filename = os.path.join(path, os.path.basename(f.name()))
170 zip_file = open(zip_filename, 'wb')
178 p = subprocess.Popen(["unzip",
183 stdin=None, stderr=sys.stderr,
184 shell=False, close_fds=True)
186 if p.returncode != 0:
188 raise errors.CommandFailedError(
189 "unzip exited %d" % p.returncode)
190 os.unlink(zip_filename)
191 os.symlink(zipball, os.path.join(path, '.locator'))
192 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
194 if len(tld_extracts) == 1:
195 return os.path.join(path, tld_extracts[0])
198 def collection_extract(collection, path, files=[], decompress=True):
199 """Retrieve a collection from Keep and extract it to a local
200 directory. Return the absolute path where the collection was
203 collection -- collection locator
204 path -- where to extract: absolute, or relative to job tmp
206 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
208 collection_hash = matches.group(1)
210 collection_hash = hashlib.md5(collection).hexdigest()
211 if not re.search('^/', path):
212 path = os.path.join(arvados.current_job().tmpdir, path)
213 lockfile = open(path + '.lock', 'w')
214 fcntl.flock(lockfile, fcntl.LOCK_EX)
219 already_have_it = False
221 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
222 already_have_it = True
226 # emulate "rm -f" (i.e., if the file does not exist, we win)
228 os.unlink(os.path.join(path, '.locator'))
230 if os.path.exists(os.path.join(path, '.locator')):
231 os.unlink(os.path.join(path, '.locator'))
234 for s in CollectionReader(collection).all_streams():
235 stream_name = s.name()
236 for f in s.all_files():
238 ((f.name() not in files_got) and
239 (f.name() in files or
240 (decompress and f.decompressed_name() in files)))):
241 outname = f.decompressed_name() if decompress else f.name()
242 files_got += [outname]
243 if os.path.exists(os.path.join(path, stream_name, outname)):
245 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
246 outfile = open(os.path.join(path, stream_name, outname), 'wb')
247 for buf in (f.readall_decompressed() if decompress
251 if len(files_got) < len(files):
252 raise errors.AssertionError(
253 "Wanted files %s but only got %s from %s" %
255 [z.name() for z in CollectionReader(collection).all_files()]))
256 os.symlink(collection_hash, os.path.join(path, '.locator'))
261 def mkdir_dash_p(path):
262 if not os.path.isdir(path):
266 if e.errno == errno.EEXIST and os.path.isdir(path):
267 # It is not an error if someone else creates the
268 # directory between our exists() and makedirs() calls.
273 def stream_extract(stream, path, files=[], decompress=True):
274 """Retrieve a stream from Keep and extract it to a local
275 directory. Return the absolute path where the stream was
278 stream -- StreamReader object
279 path -- where to extract: absolute, or relative to job tmp
281 if not re.search('^/', path):
282 path = os.path.join(arvados.current_job().tmpdir, path)
283 lockfile = open(path + '.lock', 'w')
284 fcntl.flock(lockfile, fcntl.LOCK_EX)
291 for f in stream.all_files():
293 ((f.name() not in files_got) and
294 (f.name() in files or
295 (decompress and f.decompressed_name() in files)))):
296 outname = f.decompressed_name() if decompress else f.name()
297 files_got += [outname]
298 if os.path.exists(os.path.join(path, outname)):
299 os.unlink(os.path.join(path, outname))
300 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
301 outfile = open(os.path.join(path, outname), 'wb')
302 for buf in (f.readall_decompressed() if decompress
306 if len(files_got) < len(files):
307 raise errors.AssertionError(
308 "Wanted files %s but only got %s from %s" %
309 (files, files_got, [z.name() for z in stream.all_files()]))
313 def listdir_recursive(dirname, base=None, max_depth=None):
314 """listdir_recursive(dirname, base, max_depth)
316 Return a list of file and directory names found under dirname.
318 If base is not None, prepend "{base}/" to each returned name.
320 If max_depth is None, descend into directories and return only the
321 names of files found in the directory tree.
323 If max_depth is a non-negative integer, stop descending into
324 directories at the given depth, and at that point return directory
327 If max_depth==0 (and base is None) this is equivalent to
328 sorted(os.listdir(dirname)).
331 for ent in sorted(os.listdir(dirname)):
332 ent_path = os.path.join(dirname, ent)
333 ent_base = os.path.join(base, ent) if base else ent
334 if os.path.isdir(ent_path) and max_depth != 0:
335 allfiles += listdir_recursive(
336 ent_path, base=ent_base,
337 max_depth=(max_depth-1 if max_depth else None))
339 allfiles += [ent_base]
342 def is_hex(s, *length_args):
343 """is_hex(s[, length[, max_length]]) -> boolean
345 Return True if s is a string of hexadecimal digits.
346 If one length argument is given, the string must contain exactly
347 that number of digits.
348 If two length arguments are given, the string must contain a number of
349 digits between those two lengths, inclusive.
350 Return False otherwise.
352 num_length_args = len(length_args)
353 if num_length_args > 2:
354 raise errors.ArgumentError("is_hex accepts up to 3 arguments ({} given)"
355 .format(1 + num_length_args))
356 elif num_length_args == 2:
357 good_len = (length_args[0] <= len(s) <= length_args[1])
358 elif num_length_args == 1:
359 good_len = (len(s) == length_args[0])
362 return bool(good_len and HEX_RE.match(s))
364 def list_all(fn, num_retries=0, **kwargs):
367 items_available = sys.maxint
368 while len(items) < items_available:
369 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
371 items_available = c['items_available']
372 offset = c['offset'] + len(c['items'])