8 from arvados.collection import *
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
12 def clear_tmpdir(path=None):
14 Ensure the given directory (or TASK_TMPDIR if none given)
18 path = arvados.current_task().tmpdir
19 if os.path.exists(path):
20 p = subprocess.Popen(['rm', '-rf', path])
21 stdout, stderr = p.communicate(None)
23 raise Exception('rm -rf %s: %s' % (path, stderr))
26 def run_command(execargs, **kwargs):
27 kwargs.setdefault('stdin', subprocess.PIPE)
28 kwargs.setdefault('stdout', subprocess.PIPE)
29 kwargs.setdefault('stderr', sys.stderr)
30 kwargs.setdefault('close_fds', True)
31 kwargs.setdefault('shell', False)
32 p = subprocess.Popen(execargs, **kwargs)
33 stdoutdata, stderrdata = p.communicate(None)
35 raise errors.CommandFailedError(
36 "run_command %s exit %d:\n%s" %
37 (execargs, p.returncode, stderrdata))
38 return stdoutdata, stderrdata
40 def git_checkout(url, version, path):
41 if not re.search('^/', path):
42 path = os.path.join(arvados.current_job().tmpdir, path)
43 if not os.path.exists(path):
44 run_command(["git", "clone", url, path],
45 cwd=os.path.dirname(path))
46 run_command(["git", "checkout", version],
50 def tar_extractor(path, decompress_flag):
51 return subprocess.Popen(["tar",
53 ("-x%sf" % decompress_flag),
56 stdin=subprocess.PIPE, stderr=sys.stderr,
57 shell=False, close_fds=True)
59 def tarball_extract(tarball, path):
60 """Retrieve a tarball from Keep and extract it to a local
61 directory. Return the absolute path where the tarball was
62 extracted. If the top level of the tarball contained just one
63 file or directory, return the absolute path of that single
66 tarball -- collection locator
67 path -- where to extract the tarball: absolute, or relative to job tmp
69 if not re.search('^/', path):
70 path = os.path.join(arvados.current_job().tmpdir, path)
71 lockfile = open(path + '.lock', 'w')
72 fcntl.flock(lockfile, fcntl.LOCK_EX)
77 already_have_it = False
79 if os.readlink(os.path.join(path, '.locator')) == tarball:
80 already_have_it = True
83 if not already_have_it:
85 # emulate "rm -f" (i.e., if the file does not exist, we win)
87 os.unlink(os.path.join(path, '.locator'))
89 if os.path.exists(os.path.join(path, '.locator')):
90 os.unlink(os.path.join(path, '.locator'))
92 for f in CollectionReader(tarball).all_files():
93 if re.search('\.(tbz|tar.bz2)$', f.name()):
94 p = tar_extractor(path, 'j')
95 elif re.search('\.(tgz|tar.gz)$', f.name()):
96 p = tar_extractor(path, 'z')
97 elif re.search('\.tar$', f.name()):
98 p = tar_extractor(path, '')
100 raise errors.AssertionError(
101 "tarball_extract cannot handle filename %s" % f.name())
109 if p.returncode != 0:
111 raise errors.CommandFailedError(
112 "tar exited %d" % p.returncode)
113 os.symlink(tarball, os.path.join(path, '.locator'))
114 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
116 if len(tld_extracts) == 1:
117 return os.path.join(path, tld_extracts[0])
120 def zipball_extract(zipball, path):
121 """Retrieve a zip archive from Keep and extract it to a local
122 directory. Return the absolute path where the archive was
123 extracted. If the top level of the archive contained just one
124 file or directory, return the absolute path of that single
127 zipball -- collection locator
128 path -- where to extract the archive: absolute, or relative to job tmp
130 if not re.search('^/', path):
131 path = os.path.join(arvados.current_job().tmpdir, path)
132 lockfile = open(path + '.lock', 'w')
133 fcntl.flock(lockfile, fcntl.LOCK_EX)
138 already_have_it = False
140 if os.readlink(os.path.join(path, '.locator')) == zipball:
141 already_have_it = True
144 if not already_have_it:
146 # emulate "rm -f" (i.e., if the file does not exist, we win)
148 os.unlink(os.path.join(path, '.locator'))
150 if os.path.exists(os.path.join(path, '.locator')):
151 os.unlink(os.path.join(path, '.locator'))
153 for f in CollectionReader(zipball).all_files():
154 if not re.search('\.zip$', f.name()):
155 raise errors.NotImplementedError(
156 "zipball_extract cannot handle filename %s" % f.name())
157 zip_filename = os.path.join(path, os.path.basename(f.name()))
158 zip_file = open(zip_filename, 'wb')
166 p = subprocess.Popen(["unzip",
171 stdin=None, stderr=sys.stderr,
172 shell=False, close_fds=True)
174 if p.returncode != 0:
176 raise errors.CommandFailedError(
177 "unzip exited %d" % p.returncode)
178 os.unlink(zip_filename)
179 os.symlink(zipball, os.path.join(path, '.locator'))
180 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
182 if len(tld_extracts) == 1:
183 return os.path.join(path, tld_extracts[0])
186 def collection_extract(collection, path, files=[], decompress=True):
187 """Retrieve a collection from Keep and extract it to a local
188 directory. Return the absolute path where the collection was
191 collection -- collection locator
192 path -- where to extract: absolute, or relative to job tmp
194 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
196 collection_hash = matches.group(1)
198 collection_hash = hashlib.md5(collection).hexdigest()
199 if not re.search('^/', path):
200 path = os.path.join(arvados.current_job().tmpdir, path)
201 lockfile = open(path + '.lock', 'w')
202 fcntl.flock(lockfile, fcntl.LOCK_EX)
207 already_have_it = False
209 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
210 already_have_it = True
214 # emulate "rm -f" (i.e., if the file does not exist, we win)
216 os.unlink(os.path.join(path, '.locator'))
218 if os.path.exists(os.path.join(path, '.locator')):
219 os.unlink(os.path.join(path, '.locator'))
222 for s in CollectionReader(collection).all_streams():
223 stream_name = s.name()
224 for f in s.all_files():
226 ((f.name() not in files_got) and
227 (f.name() in files or
228 (decompress and f.decompressed_name() in files)))):
229 outname = f.decompressed_name() if decompress else f.name()
230 files_got += [outname]
231 if os.path.exists(os.path.join(path, stream_name, outname)):
233 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
234 outfile = open(os.path.join(path, stream_name, outname), 'wb')
235 for buf in (f.readall_decompressed() if decompress
239 if len(files_got) < len(files):
240 raise errors.AssertionError(
241 "Wanted files %s but only got %s from %s" %
243 [z.name() for z in CollectionReader(collection).all_files()]))
244 os.symlink(collection_hash, os.path.join(path, '.locator'))
249 def mkdir_dash_p(path):
250 if not os.path.isdir(path):
254 if e.errno == errno.EEXIST and os.path.isdir(path):
255 # It is not an error if someone else creates the
256 # directory between our exists() and makedirs() calls.
261 def stream_extract(stream, path, files=[], decompress=True):
262 """Retrieve a stream from Keep and extract it to a local
263 directory. Return the absolute path where the stream was
266 stream -- StreamReader object
267 path -- where to extract: absolute, or relative to job tmp
269 if not re.search('^/', path):
270 path = os.path.join(arvados.current_job().tmpdir, path)
271 lockfile = open(path + '.lock', 'w')
272 fcntl.flock(lockfile, fcntl.LOCK_EX)
279 for f in stream.all_files():
281 ((f.name() not in files_got) and
282 (f.name() in files or
283 (decompress and f.decompressed_name() in files)))):
284 outname = f.decompressed_name() if decompress else f.name()
285 files_got += [outname]
286 if os.path.exists(os.path.join(path, outname)):
287 os.unlink(os.path.join(path, outname))
288 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
289 outfile = open(os.path.join(path, outname), 'wb')
290 for buf in (f.readall_decompressed() if decompress
294 if len(files_got) < len(files):
295 raise errors.AssertionError(
296 "Wanted files %s but only got %s from %s" %
297 (files, files_got, [z.name() for z in stream.all_files()]))
301 def listdir_recursive(dirname, base=None):
303 for ent in sorted(os.listdir(dirname)):
304 ent_path = os.path.join(dirname, ent)
305 ent_base = os.path.join(base, ent) if base else ent
306 if os.path.isdir(ent_path):
307 allfiles += listdir_recursive(ent_path, ent_base)
309 allfiles += [ent_base]
312 def is_hex(s, *length_args):
313 """is_hex(s[, length[, max_length]]) -> boolean
315 Return True if s is a string of hexadecimal digits.
316 If one length argument is given, the string must contain exactly
317 that number of digits.
318 If two length arguments are given, the string must contain a number of
319 digits between those two lengths, inclusive.
320 Return False otherwise.
322 num_length_args = len(length_args)
323 if num_length_args > 2:
324 raise ArgumentError("is_hex accepts up to 3 arguments ({} given)".
325 format(1 + num_length_args))
326 elif num_length_args == 2:
327 good_len = (length_args[0] <= len(s) <= length_args[1])
328 elif num_length_args == 1:
329 good_len = (len(s) == length_args[0])
332 return bool(good_len and HEX_RE.match(s))