8 from arvados.collection import *
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
12 def clear_tmpdir(path=None):
14 Ensure the given directory (or TASK_TMPDIR if none given)
18 path = arvados.current_task().tmpdir
19 if os.path.exists(path):
20 p = subprocess.Popen(['rm', '-rf', path])
21 stdout, stderr = p.communicate(None)
23 raise Exception('rm -rf %s: %s' % (path, stderr))
26 def run_command(execargs, **kwargs):
27 kwargs.setdefault('stdin', subprocess.PIPE)
28 kwargs.setdefault('stdout', subprocess.PIPE)
29 kwargs.setdefault('stderr', sys.stderr)
30 kwargs.setdefault('close_fds', True)
31 kwargs.setdefault('shell', False)
32 p = subprocess.Popen(execargs, **kwargs)
33 if kwargs['stdout'] == subprocess.PIPE:
34 stdoutdata, stderrdata = p.communicate(None)
38 raise errors.CommandFailedError(
39 "run_command %s exit %d:\n%s" %
40 (execargs, p.returncode, stderrdata))
41 return stdoutdata, stderrdata
43 def git_checkout(url, version, path):
44 if not re.search('^/', path):
45 path = os.path.join(arvados.current_job().tmpdir, path)
46 if not os.path.exists(path):
47 run_command(["git", "clone", url, path],
48 cwd=os.path.dirname(path))
49 run_command(["git", "checkout", version],
53 def tar_extractor(path, decompress_flag):
54 return subprocess.Popen(["tar",
56 ("-x%sf" % decompress_flag),
59 stdin=subprocess.PIPE, stderr=sys.stderr,
60 shell=False, close_fds=True)
62 def tarball_extract(tarball, path):
63 """Retrieve a tarball from Keep and extract it to a local
64 directory. Return the absolute path where the tarball was
65 extracted. If the top level of the tarball contained just one
66 file or directory, return the absolute path of that single
69 tarball -- collection locator
70 path -- where to extract the tarball: absolute, or relative to job tmp
72 if not re.search('^/', path):
73 path = os.path.join(arvados.current_job().tmpdir, path)
74 lockfile = open(path + '.lock', 'w')
75 fcntl.flock(lockfile, fcntl.LOCK_EX)
80 already_have_it = False
82 if os.readlink(os.path.join(path, '.locator')) == tarball:
83 already_have_it = True
86 if not already_have_it:
88 # emulate "rm -f" (i.e., if the file does not exist, we win)
90 os.unlink(os.path.join(path, '.locator'))
92 if os.path.exists(os.path.join(path, '.locator')):
93 os.unlink(os.path.join(path, '.locator'))
95 for f in CollectionReader(tarball).all_files():
96 if re.search('\.(tbz|tar.bz2)$', f.name()):
97 p = tar_extractor(path, 'j')
98 elif re.search('\.(tgz|tar.gz)$', f.name()):
99 p = tar_extractor(path, 'z')
100 elif re.search('\.tar$', f.name()):
101 p = tar_extractor(path, '')
103 raise errors.AssertionError(
104 "tarball_extract cannot handle filename %s" % f.name())
112 if p.returncode != 0:
114 raise errors.CommandFailedError(
115 "tar exited %d" % p.returncode)
116 os.symlink(tarball, os.path.join(path, '.locator'))
117 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
119 if len(tld_extracts) == 1:
120 return os.path.join(path, tld_extracts[0])
123 def zipball_extract(zipball, path):
124 """Retrieve a zip archive from Keep and extract it to a local
125 directory. Return the absolute path where the archive was
126 extracted. If the top level of the archive contained just one
127 file or directory, return the absolute path of that single
130 zipball -- collection locator
131 path -- where to extract the archive: absolute, or relative to job tmp
133 if not re.search('^/', path):
134 path = os.path.join(arvados.current_job().tmpdir, path)
135 lockfile = open(path + '.lock', 'w')
136 fcntl.flock(lockfile, fcntl.LOCK_EX)
141 already_have_it = False
143 if os.readlink(os.path.join(path, '.locator')) == zipball:
144 already_have_it = True
147 if not already_have_it:
149 # emulate "rm -f" (i.e., if the file does not exist, we win)
151 os.unlink(os.path.join(path, '.locator'))
153 if os.path.exists(os.path.join(path, '.locator')):
154 os.unlink(os.path.join(path, '.locator'))
156 for f in CollectionReader(zipball).all_files():
157 if not re.search('\.zip$', f.name()):
158 raise errors.NotImplementedError(
159 "zipball_extract cannot handle filename %s" % f.name())
160 zip_filename = os.path.join(path, os.path.basename(f.name()))
161 zip_file = open(zip_filename, 'wb')
169 p = subprocess.Popen(["unzip",
174 stdin=None, stderr=sys.stderr,
175 shell=False, close_fds=True)
177 if p.returncode != 0:
179 raise errors.CommandFailedError(
180 "unzip exited %d" % p.returncode)
181 os.unlink(zip_filename)
182 os.symlink(zipball, os.path.join(path, '.locator'))
183 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
185 if len(tld_extracts) == 1:
186 return os.path.join(path, tld_extracts[0])
189 def collection_extract(collection, path, files=[], decompress=True):
190 """Retrieve a collection from Keep and extract it to a local
191 directory. Return the absolute path where the collection was
194 collection -- collection locator
195 path -- where to extract: absolute, or relative to job tmp
197 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
199 collection_hash = matches.group(1)
201 collection_hash = hashlib.md5(collection).hexdigest()
202 if not re.search('^/', path):
203 path = os.path.join(arvados.current_job().tmpdir, path)
204 lockfile = open(path + '.lock', 'w')
205 fcntl.flock(lockfile, fcntl.LOCK_EX)
210 already_have_it = False
212 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
213 already_have_it = True
217 # emulate "rm -f" (i.e., if the file does not exist, we win)
219 os.unlink(os.path.join(path, '.locator'))
221 if os.path.exists(os.path.join(path, '.locator')):
222 os.unlink(os.path.join(path, '.locator'))
225 for s in CollectionReader(collection).all_streams():
226 stream_name = s.name()
227 for f in s.all_files():
229 ((f.name() not in files_got) and
230 (f.name() in files or
231 (decompress and f.decompressed_name() in files)))):
232 outname = f.decompressed_name() if decompress else f.name()
233 files_got += [outname]
234 if os.path.exists(os.path.join(path, stream_name, outname)):
236 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
237 outfile = open(os.path.join(path, stream_name, outname), 'wb')
238 for buf in (f.readall_decompressed() if decompress
242 if len(files_got) < len(files):
243 raise errors.AssertionError(
244 "Wanted files %s but only got %s from %s" %
246 [z.name() for z in CollectionReader(collection).all_files()]))
247 os.symlink(collection_hash, os.path.join(path, '.locator'))
252 def mkdir_dash_p(path):
253 if not os.path.isdir(path):
257 if e.errno == errno.EEXIST and os.path.isdir(path):
258 # It is not an error if someone else creates the
259 # directory between our exists() and makedirs() calls.
264 def stream_extract(stream, path, files=[], decompress=True):
265 """Retrieve a stream from Keep and extract it to a local
266 directory. Return the absolute path where the stream was
269 stream -- StreamReader object
270 path -- where to extract: absolute, or relative to job tmp
272 if not re.search('^/', path):
273 path = os.path.join(arvados.current_job().tmpdir, path)
274 lockfile = open(path + '.lock', 'w')
275 fcntl.flock(lockfile, fcntl.LOCK_EX)
282 for f in stream.all_files():
284 ((f.name() not in files_got) and
285 (f.name() in files or
286 (decompress and f.decompressed_name() in files)))):
287 outname = f.decompressed_name() if decompress else f.name()
288 files_got += [outname]
289 if os.path.exists(os.path.join(path, outname)):
290 os.unlink(os.path.join(path, outname))
291 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
292 outfile = open(os.path.join(path, outname), 'wb')
293 for buf in (f.readall_decompressed() if decompress
297 if len(files_got) < len(files):
298 raise errors.AssertionError(
299 "Wanted files %s but only got %s from %s" %
300 (files, files_got, [z.name() for z in stream.all_files()]))
304 def listdir_recursive(dirname, base=None):
306 for ent in sorted(os.listdir(dirname)):
307 ent_path = os.path.join(dirname, ent)
308 ent_base = os.path.join(base, ent) if base else ent
309 if os.path.isdir(ent_path):
310 allfiles += listdir_recursive(ent_path, ent_base)
312 allfiles += [ent_base]
315 def is_hex(s, *length_args):
316 """is_hex(s[, length[, max_length]]) -> boolean
318 Return True if s is a string of hexadecimal digits.
319 If one length argument is given, the string must contain exactly
320 that number of digits.
321 If two length arguments are given, the string must contain a number of
322 digits between those two lengths, inclusive.
323 Return False otherwise.
325 num_length_args = len(length_args)
326 if num_length_args > 2:
327 raise ArgumentError("is_hex accepts up to 3 arguments ({} given)".
328 format(1 + num_length_args))
329 elif num_length_args == 2:
330 good_len = (length_args[0] <= len(s) <= length_args[1])
331 elif num_length_args == 1:
332 good_len = (len(s) == length_args[0])
335 return bool(good_len and HEX_RE.match(s))