8 from arvados.collection import *
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
12 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
13 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
14 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
15 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
16 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
17 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
19 def clear_tmpdir(path=None):
21 Ensure the given directory (or TASK_TMPDIR if none given)
25 path = arvados.current_task().tmpdir
26 if os.path.exists(path):
27 p = subprocess.Popen(['rm', '-rf', path])
28 stdout, stderr = p.communicate(None)
30 raise Exception('rm -rf %s: %s' % (path, stderr))
33 def run_command(execargs, **kwargs):
34 kwargs.setdefault('stdin', subprocess.PIPE)
35 kwargs.setdefault('stdout', subprocess.PIPE)
36 kwargs.setdefault('stderr', sys.stderr)
37 kwargs.setdefault('close_fds', True)
38 kwargs.setdefault('shell', False)
39 p = subprocess.Popen(execargs, **kwargs)
40 stdoutdata, stderrdata = p.communicate(None)
42 raise errors.CommandFailedError(
43 "run_command %s exit %d:\n%s" %
44 (execargs, p.returncode, stderrdata))
45 return stdoutdata, stderrdata
47 def git_checkout(url, version, path):
48 if not re.search('^/', path):
49 path = os.path.join(arvados.current_job().tmpdir, path)
50 if not os.path.exists(path):
51 run_command(["git", "clone", url, path],
52 cwd=os.path.dirname(path))
53 run_command(["git", "checkout", version],
57 def tar_extractor(path, decompress_flag):
58 return subprocess.Popen(["tar",
60 ("-x%sf" % decompress_flag),
63 stdin=subprocess.PIPE, stderr=sys.stderr,
64 shell=False, close_fds=True)
66 def tarball_extract(tarball, path):
67 """Retrieve a tarball from Keep and extract it to a local
68 directory. Return the absolute path where the tarball was
69 extracted. If the top level of the tarball contained just one
70 file or directory, return the absolute path of that single
73 tarball -- collection locator
74 path -- where to extract the tarball: absolute, or relative to job tmp
76 if not re.search('^/', path):
77 path = os.path.join(arvados.current_job().tmpdir, path)
78 lockfile = open(path + '.lock', 'w')
79 fcntl.flock(lockfile, fcntl.LOCK_EX)
84 already_have_it = False
86 if os.readlink(os.path.join(path, '.locator')) == tarball:
87 already_have_it = True
90 if not already_have_it:
92 # emulate "rm -f" (i.e., if the file does not exist, we win)
94 os.unlink(os.path.join(path, '.locator'))
96 if os.path.exists(os.path.join(path, '.locator')):
97 os.unlink(os.path.join(path, '.locator'))
99 for f in CollectionReader(tarball).all_files():
100 if re.search('\.(tbz|tar.bz2)$', f.name()):
101 p = tar_extractor(path, 'j')
102 elif re.search('\.(tgz|tar.gz)$', f.name()):
103 p = tar_extractor(path, 'z')
104 elif re.search('\.tar$', f.name()):
105 p = tar_extractor(path, '')
107 raise errors.AssertionError(
108 "tarball_extract cannot handle filename %s" % f.name())
116 if p.returncode != 0:
118 raise errors.CommandFailedError(
119 "tar exited %d" % p.returncode)
120 os.symlink(tarball, os.path.join(path, '.locator'))
121 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
123 if len(tld_extracts) == 1:
124 return os.path.join(path, tld_extracts[0])
127 def zipball_extract(zipball, path):
128 """Retrieve a zip archive from Keep and extract it to a local
129 directory. Return the absolute path where the archive was
130 extracted. If the top level of the archive contained just one
131 file or directory, return the absolute path of that single
134 zipball -- collection locator
135 path -- where to extract the archive: absolute, or relative to job tmp
137 if not re.search('^/', path):
138 path = os.path.join(arvados.current_job().tmpdir, path)
139 lockfile = open(path + '.lock', 'w')
140 fcntl.flock(lockfile, fcntl.LOCK_EX)
145 already_have_it = False
147 if os.readlink(os.path.join(path, '.locator')) == zipball:
148 already_have_it = True
151 if not already_have_it:
153 # emulate "rm -f" (i.e., if the file does not exist, we win)
155 os.unlink(os.path.join(path, '.locator'))
157 if os.path.exists(os.path.join(path, '.locator')):
158 os.unlink(os.path.join(path, '.locator'))
160 for f in CollectionReader(zipball).all_files():
161 if not re.search('\.zip$', f.name()):
162 raise errors.NotImplementedError(
163 "zipball_extract cannot handle filename %s" % f.name())
164 zip_filename = os.path.join(path, os.path.basename(f.name()))
165 zip_file = open(zip_filename, 'wb')
173 p = subprocess.Popen(["unzip",
178 stdin=None, stderr=sys.stderr,
179 shell=False, close_fds=True)
181 if p.returncode != 0:
183 raise errors.CommandFailedError(
184 "unzip exited %d" % p.returncode)
185 os.unlink(zip_filename)
186 os.symlink(zipball, os.path.join(path, '.locator'))
187 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
189 if len(tld_extracts) == 1:
190 return os.path.join(path, tld_extracts[0])
193 def collection_extract(collection, path, files=[], decompress=True):
194 """Retrieve a collection from Keep and extract it to a local
195 directory. Return the absolute path where the collection was
198 collection -- collection locator
199 path -- where to extract: absolute, or relative to job tmp
201 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
203 collection_hash = matches.group(1)
205 collection_hash = hashlib.md5(collection).hexdigest()
206 if not re.search('^/', path):
207 path = os.path.join(arvados.current_job().tmpdir, path)
208 lockfile = open(path + '.lock', 'w')
209 fcntl.flock(lockfile, fcntl.LOCK_EX)
214 already_have_it = False
216 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
217 already_have_it = True
221 # emulate "rm -f" (i.e., if the file does not exist, we win)
223 os.unlink(os.path.join(path, '.locator'))
225 if os.path.exists(os.path.join(path, '.locator')):
226 os.unlink(os.path.join(path, '.locator'))
229 for s in CollectionReader(collection).all_streams():
230 stream_name = s.name()
231 for f in s.all_files():
233 ((f.name() not in files_got) and
234 (f.name() in files or
235 (decompress and f.decompressed_name() in files)))):
236 outname = f.decompressed_name() if decompress else f.name()
237 files_got += [outname]
238 if os.path.exists(os.path.join(path, stream_name, outname)):
240 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
241 outfile = open(os.path.join(path, stream_name, outname), 'wb')
242 for buf in (f.readall_decompressed() if decompress
246 if len(files_got) < len(files):
247 raise errors.AssertionError(
248 "Wanted files %s but only got %s from %s" %
250 [z.name() for z in CollectionReader(collection).all_files()]))
251 os.symlink(collection_hash, os.path.join(path, '.locator'))
256 def mkdir_dash_p(path):
257 if not os.path.isdir(path):
261 if e.errno == errno.EEXIST and os.path.isdir(path):
262 # It is not an error if someone else creates the
263 # directory between our exists() and makedirs() calls.
268 def stream_extract(stream, path, files=[], decompress=True):
269 """Retrieve a stream from Keep and extract it to a local
270 directory. Return the absolute path where the stream was
273 stream -- StreamReader object
274 path -- where to extract: absolute, or relative to job tmp
276 if not re.search('^/', path):
277 path = os.path.join(arvados.current_job().tmpdir, path)
278 lockfile = open(path + '.lock', 'w')
279 fcntl.flock(lockfile, fcntl.LOCK_EX)
286 for f in stream.all_files():
288 ((f.name() not in files_got) and
289 (f.name() in files or
290 (decompress and f.decompressed_name() in files)))):
291 outname = f.decompressed_name() if decompress else f.name()
292 files_got += [outname]
293 if os.path.exists(os.path.join(path, outname)):
294 os.unlink(os.path.join(path, outname))
295 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
296 outfile = open(os.path.join(path, outname), 'wb')
297 for buf in (f.readall_decompressed() if decompress
301 if len(files_got) < len(files):
302 raise errors.AssertionError(
303 "Wanted files %s but only got %s from %s" %
304 (files, files_got, [z.name() for z in stream.all_files()]))
308 def listdir_recursive(dirname, base=None):
310 for ent in sorted(os.listdir(dirname)):
311 ent_path = os.path.join(dirname, ent)
312 ent_base = os.path.join(base, ent) if base else ent
313 if os.path.isdir(ent_path):
314 allfiles += listdir_recursive(ent_path, ent_base)
316 allfiles += [ent_base]
319 def is_hex(s, *length_args):
320 """is_hex(s[, length[, max_length]]) -> boolean
322 Return True if s is a string of hexadecimal digits.
323 If one length argument is given, the string must contain exactly
324 that number of digits.
325 If two length arguments are given, the string must contain a number of
326 digits between those two lengths, inclusive.
327 Return False otherwise.
329 num_length_args = len(length_args)
330 if num_length_args > 2:
331 raise ArgumentError("is_hex accepts up to 3 arguments ({} given)".
332 format(1 + num_length_args))
333 elif num_length_args == 2:
334 good_len = (length_args[0] <= len(s) <= length_args[1])
335 elif num_length_args == 1:
336 good_len = (len(s) == length_args[0])
339 return bool(good_len and HEX_RE.match(s))
341 def list_all(fn, **kwargs):
344 items_available = sys.maxint
345 while len(items) < items_available:
346 c = fn(offset=offset, **kwargs).execute()
348 items_available = c['items_available']
349 offset = c['offset'] + len(c['items'])