8 from arvados.collection import *
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
12 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
13 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
14 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
15 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
16 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
17 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
18 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
20 def clear_tmpdir(path=None):
22 Ensure the given directory (or TASK_TMPDIR if none given)
26 path = arvados.current_task().tmpdir
27 if os.path.exists(path):
28 p = subprocess.Popen(['rm', '-rf', path])
29 stdout, stderr = p.communicate(None)
31 raise Exception('rm -rf %s: %s' % (path, stderr))
34 def run_command(execargs, **kwargs):
35 kwargs.setdefault('stdin', subprocess.PIPE)
36 kwargs.setdefault('stdout', subprocess.PIPE)
37 kwargs.setdefault('stderr', sys.stderr)
38 kwargs.setdefault('close_fds', True)
39 kwargs.setdefault('shell', False)
40 p = subprocess.Popen(execargs, **kwargs)
41 stdoutdata, stderrdata = p.communicate(None)
43 raise errors.CommandFailedError(
44 "run_command %s exit %d:\n%s" %
45 (execargs, p.returncode, stderrdata))
46 return stdoutdata, stderrdata
48 def git_checkout(url, version, path):
49 if not re.search('^/', path):
50 path = os.path.join(arvados.current_job().tmpdir, path)
51 if not os.path.exists(path):
52 run_command(["git", "clone", url, path],
53 cwd=os.path.dirname(path))
54 run_command(["git", "checkout", version],
58 def tar_extractor(path, decompress_flag):
59 return subprocess.Popen(["tar",
61 ("-x%sf" % decompress_flag),
64 stdin=subprocess.PIPE, stderr=sys.stderr,
65 shell=False, close_fds=True)
67 def tarball_extract(tarball, path):
68 """Retrieve a tarball from Keep and extract it to a local
69 directory. Return the absolute path where the tarball was
70 extracted. If the top level of the tarball contained just one
71 file or directory, return the absolute path of that single
74 tarball -- collection locator
75 path -- where to extract the tarball: absolute, or relative to job tmp
77 if not re.search('^/', path):
78 path = os.path.join(arvados.current_job().tmpdir, path)
79 lockfile = open(path + '.lock', 'w')
80 fcntl.flock(lockfile, fcntl.LOCK_EX)
85 already_have_it = False
87 if os.readlink(os.path.join(path, '.locator')) == tarball:
88 already_have_it = True
91 if not already_have_it:
93 # emulate "rm -f" (i.e., if the file does not exist, we win)
95 os.unlink(os.path.join(path, '.locator'))
97 if os.path.exists(os.path.join(path, '.locator')):
98 os.unlink(os.path.join(path, '.locator'))
100 for f in CollectionReader(tarball).all_files():
101 if re.search('\.(tbz|tar.bz2)$', f.name()):
102 p = tar_extractor(path, 'j')
103 elif re.search('\.(tgz|tar.gz)$', f.name()):
104 p = tar_extractor(path, 'z')
105 elif re.search('\.tar$', f.name()):
106 p = tar_extractor(path, '')
108 raise errors.AssertionError(
109 "tarball_extract cannot handle filename %s" % f.name())
117 if p.returncode != 0:
119 raise errors.CommandFailedError(
120 "tar exited %d" % p.returncode)
121 os.symlink(tarball, os.path.join(path, '.locator'))
122 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
124 if len(tld_extracts) == 1:
125 return os.path.join(path, tld_extracts[0])
128 def zipball_extract(zipball, path):
129 """Retrieve a zip archive from Keep and extract it to a local
130 directory. Return the absolute path where the archive was
131 extracted. If the top level of the archive contained just one
132 file or directory, return the absolute path of that single
135 zipball -- collection locator
136 path -- where to extract the archive: absolute, or relative to job tmp
138 if not re.search('^/', path):
139 path = os.path.join(arvados.current_job().tmpdir, path)
140 lockfile = open(path + '.lock', 'w')
141 fcntl.flock(lockfile, fcntl.LOCK_EX)
146 already_have_it = False
148 if os.readlink(os.path.join(path, '.locator')) == zipball:
149 already_have_it = True
152 if not already_have_it:
154 # emulate "rm -f" (i.e., if the file does not exist, we win)
156 os.unlink(os.path.join(path, '.locator'))
158 if os.path.exists(os.path.join(path, '.locator')):
159 os.unlink(os.path.join(path, '.locator'))
161 for f in CollectionReader(zipball).all_files():
162 if not re.search('\.zip$', f.name()):
163 raise errors.NotImplementedError(
164 "zipball_extract cannot handle filename %s" % f.name())
165 zip_filename = os.path.join(path, os.path.basename(f.name()))
166 zip_file = open(zip_filename, 'wb')
174 p = subprocess.Popen(["unzip",
179 stdin=None, stderr=sys.stderr,
180 shell=False, close_fds=True)
182 if p.returncode != 0:
184 raise errors.CommandFailedError(
185 "unzip exited %d" % p.returncode)
186 os.unlink(zip_filename)
187 os.symlink(zipball, os.path.join(path, '.locator'))
188 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
190 if len(tld_extracts) == 1:
191 return os.path.join(path, tld_extracts[0])
194 def collection_extract(collection, path, files=[], decompress=True):
195 """Retrieve a collection from Keep and extract it to a local
196 directory. Return the absolute path where the collection was
199 collection -- collection locator
200 path -- where to extract: absolute, or relative to job tmp
202 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
204 collection_hash = matches.group(1)
206 collection_hash = hashlib.md5(collection).hexdigest()
207 if not re.search('^/', path):
208 path = os.path.join(arvados.current_job().tmpdir, path)
209 lockfile = open(path + '.lock', 'w')
210 fcntl.flock(lockfile, fcntl.LOCK_EX)
215 already_have_it = False
217 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
218 already_have_it = True
222 # emulate "rm -f" (i.e., if the file does not exist, we win)
224 os.unlink(os.path.join(path, '.locator'))
226 if os.path.exists(os.path.join(path, '.locator')):
227 os.unlink(os.path.join(path, '.locator'))
230 for s in CollectionReader(collection).all_streams():
231 stream_name = s.name()
232 for f in s.all_files():
234 ((f.name() not in files_got) and
235 (f.name() in files or
236 (decompress and f.decompressed_name() in files)))):
237 outname = f.decompressed_name() if decompress else f.name()
238 files_got += [outname]
239 if os.path.exists(os.path.join(path, stream_name, outname)):
241 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
242 outfile = open(os.path.join(path, stream_name, outname), 'wb')
243 for buf in (f.readall_decompressed() if decompress
247 if len(files_got) < len(files):
248 raise errors.AssertionError(
249 "Wanted files %s but only got %s from %s" %
251 [z.name() for z in CollectionReader(collection).all_files()]))
252 os.symlink(collection_hash, os.path.join(path, '.locator'))
257 def mkdir_dash_p(path):
258 if not os.path.isdir(path):
262 if e.errno == errno.EEXIST and os.path.isdir(path):
263 # It is not an error if someone else creates the
264 # directory between our exists() and makedirs() calls.
269 def stream_extract(stream, path, files=[], decompress=True):
270 """Retrieve a stream from Keep and extract it to a local
271 directory. Return the absolute path where the stream was
274 stream -- StreamReader object
275 path -- where to extract: absolute, or relative to job tmp
277 if not re.search('^/', path):
278 path = os.path.join(arvados.current_job().tmpdir, path)
279 lockfile = open(path + '.lock', 'w')
280 fcntl.flock(lockfile, fcntl.LOCK_EX)
287 for f in stream.all_files():
289 ((f.name() not in files_got) and
290 (f.name() in files or
291 (decompress and f.decompressed_name() in files)))):
292 outname = f.decompressed_name() if decompress else f.name()
293 files_got += [outname]
294 if os.path.exists(os.path.join(path, outname)):
295 os.unlink(os.path.join(path, outname))
296 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
297 outfile = open(os.path.join(path, outname), 'wb')
298 for buf in (f.readall_decompressed() if decompress
302 if len(files_got) < len(files):
303 raise errors.AssertionError(
304 "Wanted files %s but only got %s from %s" %
305 (files, files_got, [z.name() for z in stream.all_files()]))
309 def listdir_recursive(dirname, base=None):
311 for ent in sorted(os.listdir(dirname)):
312 ent_path = os.path.join(dirname, ent)
313 ent_base = os.path.join(base, ent) if base else ent
314 if os.path.isdir(ent_path):
315 allfiles += listdir_recursive(ent_path, ent_base)
317 allfiles += [ent_base]
320 def is_hex(s, *length_args):
321 """is_hex(s[, length[, max_length]]) -> boolean
323 Return True if s is a string of hexadecimal digits.
324 If one length argument is given, the string must contain exactly
325 that number of digits.
326 If two length arguments are given, the string must contain a number of
327 digits between those two lengths, inclusive.
328 Return False otherwise.
330 num_length_args = len(length_args)
331 if num_length_args > 2:
332 raise ArgumentError("is_hex accepts up to 3 arguments ({} given)".
333 format(1 + num_length_args))
334 elif num_length_args == 2:
335 good_len = (length_args[0] <= len(s) <= length_args[1])
336 elif num_length_args == 1:
337 good_len = (len(s) == length_args[0])
340 return bool(good_len and HEX_RE.match(s))
342 def list_all(fn, num_retries=0, **kwargs):
345 items_available = sys.maxint
346 while len(items) < items_available:
347 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
349 items_available = c['items_available']
350 offset = c['offset'] + len(c['items'])