Merge branch '2934-limit-crunch-logs' of git.curoverse.com:arvados into 2934-limit...
[arvados.git] / sdk / python / arvados / util.py
1 import fcntl
2 import hashlib
3 import os
4 import re
5 import subprocess
6 import errno
7 import sys
8 from arvados.collection import *
9
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
11
12 def clear_tmpdir(path=None):
13     """
14     Ensure the given directory (or TASK_TMPDIR if none given)
15     exists and is empty.
16     """
17     if path == None:
18         path = arvados.current_task().tmpdir
19     if os.path.exists(path):
20         p = subprocess.Popen(['rm', '-rf', path])
21         stdout, stderr = p.communicate(None)
22         if p.returncode != 0:
23             raise Exception('rm -rf %s: %s' % (path, stderr))
24     os.mkdir(path)
25
26 def run_command(execargs, **kwargs):
27     kwargs.setdefault('stdin', subprocess.PIPE)
28     kwargs.setdefault('stdout', subprocess.PIPE)
29     kwargs.setdefault('stderr', sys.stderr)
30     kwargs.setdefault('close_fds', True)
31     kwargs.setdefault('shell', False)
32     p = subprocess.Popen(execargs, **kwargs)
33     stdoutdata, stderrdata = p.communicate(None)
34     if p.returncode != 0:
35         raise errors.CommandFailedError(
36             "run_command %s exit %d:\n%s" %
37             (execargs, p.returncode, stderrdata))
38     return stdoutdata, stderrdata
39
40 def git_checkout(url, version, path):
41     if not re.search('^/', path):
42         path = os.path.join(arvados.current_job().tmpdir, path)
43     if not os.path.exists(path):
44         run_command(["git", "clone", url, path],
45                     cwd=os.path.dirname(path))
46     run_command(["git", "checkout", version],
47                 cwd=path)
48     return path
49
50 def tar_extractor(path, decompress_flag):
51     return subprocess.Popen(["tar",
52                              "-C", path,
53                              ("-x%sf" % decompress_flag),
54                              "-"],
55                             stdout=None,
56                             stdin=subprocess.PIPE, stderr=sys.stderr,
57                             shell=False, close_fds=True)
58
59 def tarball_extract(tarball, path):
60     """Retrieve a tarball from Keep and extract it to a local
61     directory.  Return the absolute path where the tarball was
62     extracted. If the top level of the tarball contained just one
63     file or directory, return the absolute path of that single
64     item.
65
66     tarball -- collection locator
67     path -- where to extract the tarball: absolute, or relative to job tmp
68     """
69     if not re.search('^/', path):
70         path = os.path.join(arvados.current_job().tmpdir, path)
71     lockfile = open(path + '.lock', 'w')
72     fcntl.flock(lockfile, fcntl.LOCK_EX)
73     try:
74         os.stat(path)
75     except OSError:
76         os.mkdir(path)
77     already_have_it = False
78     try:
79         if os.readlink(os.path.join(path, '.locator')) == tarball:
80             already_have_it = True
81     except OSError:
82         pass
83     if not already_have_it:
84
85         # emulate "rm -f" (i.e., if the file does not exist, we win)
86         try:
87             os.unlink(os.path.join(path, '.locator'))
88         except OSError:
89             if os.path.exists(os.path.join(path, '.locator')):
90                 os.unlink(os.path.join(path, '.locator'))
91
92         for f in CollectionReader(tarball).all_files():
93             if re.search('\.(tbz|tar.bz2)$', f.name()):
94                 p = tar_extractor(path, 'j')
95             elif re.search('\.(tgz|tar.gz)$', f.name()):
96                 p = tar_extractor(path, 'z')
97             elif re.search('\.tar$', f.name()):
98                 p = tar_extractor(path, '')
99             else:
100                 raise errors.AssertionError(
101                     "tarball_extract cannot handle filename %s" % f.name())
102             while True:
103                 buf = f.read(2**20)
104                 if len(buf) == 0:
105                     break
106                 p.stdin.write(buf)
107             p.stdin.close()
108             p.wait()
109             if p.returncode != 0:
110                 lockfile.close()
111                 raise errors.CommandFailedError(
112                     "tar exited %d" % p.returncode)
113         os.symlink(tarball, os.path.join(path, '.locator'))
114     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
115     lockfile.close()
116     if len(tld_extracts) == 1:
117         return os.path.join(path, tld_extracts[0])
118     return path
119
120 def zipball_extract(zipball, path):
121     """Retrieve a zip archive from Keep and extract it to a local
122     directory.  Return the absolute path where the archive was
123     extracted. If the top level of the archive contained just one
124     file or directory, return the absolute path of that single
125     item.
126
127     zipball -- collection locator
128     path -- where to extract the archive: absolute, or relative to job tmp
129     """
130     if not re.search('^/', path):
131         path = os.path.join(arvados.current_job().tmpdir, path)
132     lockfile = open(path + '.lock', 'w')
133     fcntl.flock(lockfile, fcntl.LOCK_EX)
134     try:
135         os.stat(path)
136     except OSError:
137         os.mkdir(path)
138     already_have_it = False
139     try:
140         if os.readlink(os.path.join(path, '.locator')) == zipball:
141             already_have_it = True
142     except OSError:
143         pass
144     if not already_have_it:
145
146         # emulate "rm -f" (i.e., if the file does not exist, we win)
147         try:
148             os.unlink(os.path.join(path, '.locator'))
149         except OSError:
150             if os.path.exists(os.path.join(path, '.locator')):
151                 os.unlink(os.path.join(path, '.locator'))
152
153         for f in CollectionReader(zipball).all_files():
154             if not re.search('\.zip$', f.name()):
155                 raise errors.NotImplementedError(
156                     "zipball_extract cannot handle filename %s" % f.name())
157             zip_filename = os.path.join(path, os.path.basename(f.name()))
158             zip_file = open(zip_filename, 'wb')
159             while True:
160                 buf = f.read(2**20)
161                 if len(buf) == 0:
162                     break
163                 zip_file.write(buf)
164             zip_file.close()
165             
166             p = subprocess.Popen(["unzip",
167                                   "-q", "-o",
168                                   "-d", path,
169                                   zip_filename],
170                                  stdout=None,
171                                  stdin=None, stderr=sys.stderr,
172                                  shell=False, close_fds=True)
173             p.wait()
174             if p.returncode != 0:
175                 lockfile.close()
176                 raise errors.CommandFailedError(
177                     "unzip exited %d" % p.returncode)
178             os.unlink(zip_filename)
179         os.symlink(zipball, os.path.join(path, '.locator'))
180     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
181     lockfile.close()
182     if len(tld_extracts) == 1:
183         return os.path.join(path, tld_extracts[0])
184     return path
185
186 def collection_extract(collection, path, files=[], decompress=True):
187     """Retrieve a collection from Keep and extract it to a local
188     directory.  Return the absolute path where the collection was
189     extracted.
190
191     collection -- collection locator
192     path -- where to extract: absolute, or relative to job tmp
193     """
194     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
195     if matches:
196         collection_hash = matches.group(1)
197     else:
198         collection_hash = hashlib.md5(collection).hexdigest()
199     if not re.search('^/', path):
200         path = os.path.join(arvados.current_job().tmpdir, path)
201     lockfile = open(path + '.lock', 'w')
202     fcntl.flock(lockfile, fcntl.LOCK_EX)
203     try:
204         os.stat(path)
205     except OSError:
206         os.mkdir(path)
207     already_have_it = False
208     try:
209         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
210             already_have_it = True
211     except OSError:
212         pass
213
214     # emulate "rm -f" (i.e., if the file does not exist, we win)
215     try:
216         os.unlink(os.path.join(path, '.locator'))
217     except OSError:
218         if os.path.exists(os.path.join(path, '.locator')):
219             os.unlink(os.path.join(path, '.locator'))
220
221     files_got = []
222     for s in CollectionReader(collection).all_streams():
223         stream_name = s.name()
224         for f in s.all_files():
225             if (files == [] or
226                 ((f.name() not in files_got) and
227                  (f.name() in files or
228                   (decompress and f.decompressed_name() in files)))):
229                 outname = f.decompressed_name() if decompress else f.name()
230                 files_got += [outname]
231                 if os.path.exists(os.path.join(path, stream_name, outname)):
232                     continue
233                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
234                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
235                 for buf in (f.readall_decompressed() if decompress
236                             else f.readall()):
237                     outfile.write(buf)
238                 outfile.close()
239     if len(files_got) < len(files):
240         raise errors.AssertionError(
241             "Wanted files %s but only got %s from %s" %
242             (files, files_got,
243              [z.name() for z in CollectionReader(collection).all_files()]))
244     os.symlink(collection_hash, os.path.join(path, '.locator'))
245
246     lockfile.close()
247     return path
248
249 def mkdir_dash_p(path):
250     if not os.path.isdir(path):
251         try:
252             os.makedirs(path)
253         except OSError as e:
254             if e.errno == errno.EEXIST and os.path.isdir(path):
255                 # It is not an error if someone else creates the
256                 # directory between our exists() and makedirs() calls.
257                 pass
258             else:
259                 raise
260
261 def stream_extract(stream, path, files=[], decompress=True):
262     """Retrieve a stream from Keep and extract it to a local
263     directory.  Return the absolute path where the stream was
264     extracted.
265
266     stream -- StreamReader object
267     path -- where to extract: absolute, or relative to job tmp
268     """
269     if not re.search('^/', path):
270         path = os.path.join(arvados.current_job().tmpdir, path)
271     lockfile = open(path + '.lock', 'w')
272     fcntl.flock(lockfile, fcntl.LOCK_EX)
273     try:
274         os.stat(path)
275     except OSError:
276         os.mkdir(path)
277
278     files_got = []
279     for f in stream.all_files():
280         if (files == [] or
281             ((f.name() not in files_got) and
282              (f.name() in files or
283               (decompress and f.decompressed_name() in files)))):
284             outname = f.decompressed_name() if decompress else f.name()
285             files_got += [outname]
286             if os.path.exists(os.path.join(path, outname)):
287                 os.unlink(os.path.join(path, outname))
288             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
289             outfile = open(os.path.join(path, outname), 'wb')
290             for buf in (f.readall_decompressed() if decompress
291                         else f.readall()):
292                 outfile.write(buf)
293             outfile.close()
294     if len(files_got) < len(files):
295         raise errors.AssertionError(
296             "Wanted files %s but only got %s from %s" %
297             (files, files_got, [z.name() for z in stream.all_files()]))
298     lockfile.close()
299     return path
300
301 def listdir_recursive(dirname, base=None):
302     allfiles = []
303     for ent in sorted(os.listdir(dirname)):
304         ent_path = os.path.join(dirname, ent)
305         ent_base = os.path.join(base, ent) if base else ent
306         if os.path.isdir(ent_path):
307             allfiles += listdir_recursive(ent_path, ent_base)
308         else:
309             allfiles += [ent_base]
310     return allfiles
311
312 def is_hex(s, *length_args):
313     """is_hex(s[, length[, max_length]]) -> boolean
314
315     Return True if s is a string of hexadecimal digits.
316     If one length argument is given, the string must contain exactly
317     that number of digits.
318     If two length arguments are given, the string must contain a number of
319     digits between those two lengths, inclusive.
320     Return False otherwise.
321     """
322     num_length_args = len(length_args)
323     if num_length_args > 2:
324         raise ArgumentError("is_hex accepts up to 3 arguments ({} given)".
325                             format(1 + num_length_args))
326     elif num_length_args == 2:
327         good_len = (length_args[0] <= len(s) <= length_args[1])
328     elif num_length_args == 1:
329         good_len = (len(s) == length_args[0])
330     else:
331         good_len = True
332     return bool(good_len and HEX_RE.match(s))