8 from arvados.collection import *
10 def clear_tmpdir(path=None):
12 Ensure the given directory (or TASK_TMPDIR if none given)
16 path = arvados.current_task().tmpdir
17 if os.path.exists(path):
18 p = subprocess.Popen(['rm', '-rf', path])
19 stdout, stderr = p.communicate(None)
21 raise Exception('rm -rf %s: %s' % (path, stderr))
24 def run_command(execargs, **kwargs):
25 kwargs.setdefault('stdin', subprocess.PIPE)
26 kwargs.setdefault('stdout', subprocess.PIPE)
27 kwargs.setdefault('stderr', sys.stderr)
28 kwargs.setdefault('close_fds', True)
29 kwargs.setdefault('shell', False)
30 p = subprocess.Popen(execargs, **kwargs)
31 stdoutdata, stderrdata = p.communicate(None)
33 raise errors.CommandFailedError(
34 "run_command %s exit %d:\n%s" %
35 (execargs, p.returncode, stderrdata))
36 return stdoutdata, stderrdata
38 def git_checkout(url, version, path):
39 if not re.search('^/', path):
40 path = os.path.join(arvados.current_job().tmpdir, path)
41 if not os.path.exists(path):
42 run_command(["git", "clone", url, path],
43 cwd=os.path.dirname(path))
44 run_command(["git", "checkout", version],
48 def tar_extractor(path, decompress_flag):
49 return subprocess.Popen(["tar",
51 ("-x%sf" % decompress_flag),
54 stdin=subprocess.PIPE, stderr=sys.stderr,
55 shell=False, close_fds=True)
57 def tarball_extract(tarball, path):
58 """Retrieve a tarball from Keep and extract it to a local
59 directory. Return the absolute path where the tarball was
60 extracted. If the top level of the tarball contained just one
61 file or directory, return the absolute path of that single
64 tarball -- collection locator
65 path -- where to extract the tarball: absolute, or relative to job tmp
67 if not re.search('^/', path):
68 path = os.path.join(arvados.current_job().tmpdir, path)
69 lockfile = open(path + '.lock', 'w')
70 fcntl.flock(lockfile, fcntl.LOCK_EX)
75 already_have_it = False
77 if os.readlink(os.path.join(path, '.locator')) == tarball:
78 already_have_it = True
81 if not already_have_it:
83 # emulate "rm -f" (i.e., if the file does not exist, we win)
85 os.unlink(os.path.join(path, '.locator'))
87 if os.path.exists(os.path.join(path, '.locator')):
88 os.unlink(os.path.join(path, '.locator'))
90 for f in CollectionReader(tarball).all_files():
91 if re.search('\.(tbz|tar.bz2)$', f.name()):
92 p = tar_extractor(path, 'j')
93 elif re.search('\.(tgz|tar.gz)$', f.name()):
94 p = tar_extractor(path, 'z')
95 elif re.search('\.tar$', f.name()):
96 p = tar_extractor(path, '')
98 raise errors.AssertionError(
99 "tarball_extract cannot handle filename %s" % f.name())
107 if p.returncode != 0:
109 raise errors.CommandFailedError(
110 "tar exited %d" % p.returncode)
111 os.symlink(tarball, os.path.join(path, '.locator'))
112 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
114 if len(tld_extracts) == 1:
115 return os.path.join(path, tld_extracts[0])
118 def zipball_extract(zipball, path):
119 """Retrieve a zip archive from Keep and extract it to a local
120 directory. Return the absolute path where the archive was
121 extracted. If the top level of the archive contained just one
122 file or directory, return the absolute path of that single
125 zipball -- collection locator
126 path -- where to extract the archive: absolute, or relative to job tmp
128 if not re.search('^/', path):
129 path = os.path.join(arvados.current_job().tmpdir, path)
130 lockfile = open(path + '.lock', 'w')
131 fcntl.flock(lockfile, fcntl.LOCK_EX)
136 already_have_it = False
138 if os.readlink(os.path.join(path, '.locator')) == zipball:
139 already_have_it = True
142 if not already_have_it:
144 # emulate "rm -f" (i.e., if the file does not exist, we win)
146 os.unlink(os.path.join(path, '.locator'))
148 if os.path.exists(os.path.join(path, '.locator')):
149 os.unlink(os.path.join(path, '.locator'))
151 for f in CollectionReader(zipball).all_files():
152 if not re.search('\.zip$', f.name()):
153 raise errors.NotImplementedError(
154 "zipball_extract cannot handle filename %s" % f.name())
155 zip_filename = os.path.join(path, os.path.basename(f.name()))
156 zip_file = open(zip_filename, 'wb')
164 p = subprocess.Popen(["unzip",
169 stdin=None, stderr=sys.stderr,
170 shell=False, close_fds=True)
172 if p.returncode != 0:
174 raise errors.CommandFailedError(
175 "unzip exited %d" % p.returncode)
176 os.unlink(zip_filename)
177 os.symlink(zipball, os.path.join(path, '.locator'))
178 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
180 if len(tld_extracts) == 1:
181 return os.path.join(path, tld_extracts[0])
184 def collection_extract(collection, path, files=[], decompress=True):
185 """Retrieve a collection from Keep and extract it to a local
186 directory. Return the absolute path where the collection was
189 collection -- collection locator
190 path -- where to extract: absolute, or relative to job tmp
192 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
194 collection_hash = matches.group(1)
196 collection_hash = hashlib.md5(collection).hexdigest()
197 if not re.search('^/', path):
198 path = os.path.join(arvados.current_job().tmpdir, path)
199 lockfile = open(path + '.lock', 'w')
200 fcntl.flock(lockfile, fcntl.LOCK_EX)
205 already_have_it = False
207 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
208 already_have_it = True
212 # emulate "rm -f" (i.e., if the file does not exist, we win)
214 os.unlink(os.path.join(path, '.locator'))
216 if os.path.exists(os.path.join(path, '.locator')):
217 os.unlink(os.path.join(path, '.locator'))
220 for s in CollectionReader(collection).all_streams():
221 stream_name = s.name()
222 for f in s.all_files():
224 ((f.name() not in files_got) and
225 (f.name() in files or
226 (decompress and f.decompressed_name() in files)))):
227 outname = f.decompressed_name() if decompress else f.name()
228 files_got += [outname]
229 if os.path.exists(os.path.join(path, stream_name, outname)):
231 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
232 outfile = open(os.path.join(path, stream_name, outname), 'wb')
233 for buf in (f.readall_decompressed() if decompress
237 if len(files_got) < len(files):
238 raise errors.AssertionError(
239 "Wanted files %s but only got %s from %s" %
241 [z.name() for z in CollectionReader(collection).all_files()]))
242 os.symlink(collection_hash, os.path.join(path, '.locator'))
247 def mkdir_dash_p(path):
248 if not os.path.isdir(path):
252 if e.errno == errno.EEXIST and os.path.isdir(path):
253 # It is not an error if someone else creates the
254 # directory between our exists() and makedirs() calls.
259 def stream_extract(stream, path, files=[], decompress=True):
260 """Retrieve a stream from Keep and extract it to a local
261 directory. Return the absolute path where the stream was
264 stream -- StreamReader object
265 path -- where to extract: absolute, or relative to job tmp
267 if not re.search('^/', path):
268 path = os.path.join(arvados.current_job().tmpdir, path)
269 lockfile = open(path + '.lock', 'w')
270 fcntl.flock(lockfile, fcntl.LOCK_EX)
277 for f in stream.all_files():
279 ((f.name() not in files_got) and
280 (f.name() in files or
281 (decompress and f.decompressed_name() in files)))):
282 outname = f.decompressed_name() if decompress else f.name()
283 files_got += [outname]
284 if os.path.exists(os.path.join(path, outname)):
285 os.unlink(os.path.join(path, outname))
286 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
287 outfile = open(os.path.join(path, outname), 'wb')
288 for buf in (f.readall_decompressed() if decompress
292 if len(files_got) < len(files):
293 raise errors.AssertionError(
294 "Wanted files %s but only got %s from %s" %
295 (files, files_got, [z.name() for z in stream.all_files()]))
299 def listdir_recursive(dirname, base=None):
301 for ent in sorted(os.listdir(dirname)):
302 ent_path = os.path.join(dirname, ent)
303 ent_base = os.path.join(base, ent) if base else ent
304 if os.path.isdir(ent_path):
305 allfiles += listdir_recursive(ent_path, ent_base)
307 allfiles += [ent_base]