3351: Do not use resume cache at all if --no-resume flag is given.
[arvados.git] / sdk / python / arvados / util.py
1 import fcntl
2 import hashlib
3 import os
4 import re
5 import subprocess
6 import errno
7 import sys
8 from arvados.collection import *
9
10 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
11
12 def clear_tmpdir(path=None):
13     """
14     Ensure the given directory (or TASK_TMPDIR if none given)
15     exists and is empty.
16     """
17     if path == None:
18         path = arvados.current_task().tmpdir
19     if os.path.exists(path):
20         p = subprocess.Popen(['rm', '-rf', path])
21         stdout, stderr = p.communicate(None)
22         if p.returncode != 0:
23             raise Exception('rm -rf %s: %s' % (path, stderr))
24     os.mkdir(path)
25
26 def run_command(execargs, **kwargs):
27     kwargs.setdefault('stdin', subprocess.PIPE)
28     kwargs.setdefault('stdout', subprocess.PIPE)
29     kwargs.setdefault('stderr', sys.stderr)
30     kwargs.setdefault('close_fds', True)
31     kwargs.setdefault('shell', False)
32     p = subprocess.Popen(execargs, **kwargs)
33     if kwargs['stdout'] == subprocess.PIPE:
34         stdoutdata, stderrdata = p.communicate(None)
35     else:
36         p.wait()
37     if p.returncode != 0:
38         raise errors.CommandFailedError(
39             "run_command %s exit %d:\n%s" %
40             (execargs, p.returncode, stderrdata))
41     return stdoutdata, stderrdata
42
43 def git_checkout(url, version, path):
44     if not re.search('^/', path):
45         path = os.path.join(arvados.current_job().tmpdir, path)
46     if not os.path.exists(path):
47         run_command(["git", "clone", url, path],
48                     cwd=os.path.dirname(path))
49     run_command(["git", "checkout", version],
50                 cwd=path)
51     return path
52
53 def tar_extractor(path, decompress_flag):
54     return subprocess.Popen(["tar",
55                              "-C", path,
56                              ("-x%sf" % decompress_flag),
57                              "-"],
58                             stdout=None,
59                             stdin=subprocess.PIPE, stderr=sys.stderr,
60                             shell=False, close_fds=True)
61
62 def tarball_extract(tarball, path):
63     """Retrieve a tarball from Keep and extract it to a local
64     directory.  Return the absolute path where the tarball was
65     extracted. If the top level of the tarball contained just one
66     file or directory, return the absolute path of that single
67     item.
68
69     tarball -- collection locator
70     path -- where to extract the tarball: absolute, or relative to job tmp
71     """
72     if not re.search('^/', path):
73         path = os.path.join(arvados.current_job().tmpdir, path)
74     lockfile = open(path + '.lock', 'w')
75     fcntl.flock(lockfile, fcntl.LOCK_EX)
76     try:
77         os.stat(path)
78     except OSError:
79         os.mkdir(path)
80     already_have_it = False
81     try:
82         if os.readlink(os.path.join(path, '.locator')) == tarball:
83             already_have_it = True
84     except OSError:
85         pass
86     if not already_have_it:
87
88         # emulate "rm -f" (i.e., if the file does not exist, we win)
89         try:
90             os.unlink(os.path.join(path, '.locator'))
91         except OSError:
92             if os.path.exists(os.path.join(path, '.locator')):
93                 os.unlink(os.path.join(path, '.locator'))
94
95         for f in CollectionReader(tarball).all_files():
96             if re.search('\.(tbz|tar.bz2)$', f.name()):
97                 p = tar_extractor(path, 'j')
98             elif re.search('\.(tgz|tar.gz)$', f.name()):
99                 p = tar_extractor(path, 'z')
100             elif re.search('\.tar$', f.name()):
101                 p = tar_extractor(path, '')
102             else:
103                 raise errors.AssertionError(
104                     "tarball_extract cannot handle filename %s" % f.name())
105             while True:
106                 buf = f.read(2**20)
107                 if len(buf) == 0:
108                     break
109                 p.stdin.write(buf)
110             p.stdin.close()
111             p.wait()
112             if p.returncode != 0:
113                 lockfile.close()
114                 raise errors.CommandFailedError(
115                     "tar exited %d" % p.returncode)
116         os.symlink(tarball, os.path.join(path, '.locator'))
117     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
118     lockfile.close()
119     if len(tld_extracts) == 1:
120         return os.path.join(path, tld_extracts[0])
121     return path
122
123 def zipball_extract(zipball, path):
124     """Retrieve a zip archive from Keep and extract it to a local
125     directory.  Return the absolute path where the archive was
126     extracted. If the top level of the archive contained just one
127     file or directory, return the absolute path of that single
128     item.
129
130     zipball -- collection locator
131     path -- where to extract the archive: absolute, or relative to job tmp
132     """
133     if not re.search('^/', path):
134         path = os.path.join(arvados.current_job().tmpdir, path)
135     lockfile = open(path + '.lock', 'w')
136     fcntl.flock(lockfile, fcntl.LOCK_EX)
137     try:
138         os.stat(path)
139     except OSError:
140         os.mkdir(path)
141     already_have_it = False
142     try:
143         if os.readlink(os.path.join(path, '.locator')) == zipball:
144             already_have_it = True
145     except OSError:
146         pass
147     if not already_have_it:
148
149         # emulate "rm -f" (i.e., if the file does not exist, we win)
150         try:
151             os.unlink(os.path.join(path, '.locator'))
152         except OSError:
153             if os.path.exists(os.path.join(path, '.locator')):
154                 os.unlink(os.path.join(path, '.locator'))
155
156         for f in CollectionReader(zipball).all_files():
157             if not re.search('\.zip$', f.name()):
158                 raise errors.NotImplementedError(
159                     "zipball_extract cannot handle filename %s" % f.name())
160             zip_filename = os.path.join(path, os.path.basename(f.name()))
161             zip_file = open(zip_filename, 'wb')
162             while True:
163                 buf = f.read(2**20)
164                 if len(buf) == 0:
165                     break
166                 zip_file.write(buf)
167             zip_file.close()
168
169             p = subprocess.Popen(["unzip",
170                                   "-q", "-o",
171                                   "-d", path,
172                                   zip_filename],
173                                  stdout=None,
174                                  stdin=None, stderr=sys.stderr,
175                                  shell=False, close_fds=True)
176             p.wait()
177             if p.returncode != 0:
178                 lockfile.close()
179                 raise errors.CommandFailedError(
180                     "unzip exited %d" % p.returncode)
181             os.unlink(zip_filename)
182         os.symlink(zipball, os.path.join(path, '.locator'))
183     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
184     lockfile.close()
185     if len(tld_extracts) == 1:
186         return os.path.join(path, tld_extracts[0])
187     return path
188
189 def collection_extract(collection, path, files=[], decompress=True):
190     """Retrieve a collection from Keep and extract it to a local
191     directory.  Return the absolute path where the collection was
192     extracted.
193
194     collection -- collection locator
195     path -- where to extract: absolute, or relative to job tmp
196     """
197     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
198     if matches:
199         collection_hash = matches.group(1)
200     else:
201         collection_hash = hashlib.md5(collection).hexdigest()
202     if not re.search('^/', path):
203         path = os.path.join(arvados.current_job().tmpdir, path)
204     lockfile = open(path + '.lock', 'w')
205     fcntl.flock(lockfile, fcntl.LOCK_EX)
206     try:
207         os.stat(path)
208     except OSError:
209         os.mkdir(path)
210     already_have_it = False
211     try:
212         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
213             already_have_it = True
214     except OSError:
215         pass
216
217     # emulate "rm -f" (i.e., if the file does not exist, we win)
218     try:
219         os.unlink(os.path.join(path, '.locator'))
220     except OSError:
221         if os.path.exists(os.path.join(path, '.locator')):
222             os.unlink(os.path.join(path, '.locator'))
223
224     files_got = []
225     for s in CollectionReader(collection).all_streams():
226         stream_name = s.name()
227         for f in s.all_files():
228             if (files == [] or
229                 ((f.name() not in files_got) and
230                  (f.name() in files or
231                   (decompress and f.decompressed_name() in files)))):
232                 outname = f.decompressed_name() if decompress else f.name()
233                 files_got += [outname]
234                 if os.path.exists(os.path.join(path, stream_name, outname)):
235                     continue
236                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
237                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
238                 for buf in (f.readall_decompressed() if decompress
239                             else f.readall()):
240                     outfile.write(buf)
241                 outfile.close()
242     if len(files_got) < len(files):
243         raise errors.AssertionError(
244             "Wanted files %s but only got %s from %s" %
245             (files, files_got,
246              [z.name() for z in CollectionReader(collection).all_files()]))
247     os.symlink(collection_hash, os.path.join(path, '.locator'))
248
249     lockfile.close()
250     return path
251
252 def mkdir_dash_p(path):
253     if not os.path.isdir(path):
254         try:
255             os.makedirs(path)
256         except OSError as e:
257             if e.errno == errno.EEXIST and os.path.isdir(path):
258                 # It is not an error if someone else creates the
259                 # directory between our exists() and makedirs() calls.
260                 pass
261             else:
262                 raise
263
264 def stream_extract(stream, path, files=[], decompress=True):
265     """Retrieve a stream from Keep and extract it to a local
266     directory.  Return the absolute path where the stream was
267     extracted.
268
269     stream -- StreamReader object
270     path -- where to extract: absolute, or relative to job tmp
271     """
272     if not re.search('^/', path):
273         path = os.path.join(arvados.current_job().tmpdir, path)
274     lockfile = open(path + '.lock', 'w')
275     fcntl.flock(lockfile, fcntl.LOCK_EX)
276     try:
277         os.stat(path)
278     except OSError:
279         os.mkdir(path)
280
281     files_got = []
282     for f in stream.all_files():
283         if (files == [] or
284             ((f.name() not in files_got) and
285              (f.name() in files or
286               (decompress and f.decompressed_name() in files)))):
287             outname = f.decompressed_name() if decompress else f.name()
288             files_got += [outname]
289             if os.path.exists(os.path.join(path, outname)):
290                 os.unlink(os.path.join(path, outname))
291             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
292             outfile = open(os.path.join(path, outname), 'wb')
293             for buf in (f.readall_decompressed() if decompress
294                         else f.readall()):
295                 outfile.write(buf)
296             outfile.close()
297     if len(files_got) < len(files):
298         raise errors.AssertionError(
299             "Wanted files %s but only got %s from %s" %
300             (files, files_got, [z.name() for z in stream.all_files()]))
301     lockfile.close()
302     return path
303
304 def listdir_recursive(dirname, base=None):
305     allfiles = []
306     for ent in sorted(os.listdir(dirname)):
307         ent_path = os.path.join(dirname, ent)
308         ent_base = os.path.join(base, ent) if base else ent
309         if os.path.isdir(ent_path):
310             allfiles += listdir_recursive(ent_path, ent_base)
311         else:
312             allfiles += [ent_base]
313     return allfiles
314
315 def is_hex(s, *length_args):
316     """is_hex(s[, length[, max_length]]) -> boolean
317
318     Return True if s is a string of hexadecimal digits.
319     If one length argument is given, the string must contain exactly
320     that number of digits.
321     If two length arguments are given, the string must contain a number of
322     digits between those two lengths, inclusive.
323     Return False otherwise.
324     """
325     num_length_args = len(length_args)
326     if num_length_args > 2:
327         raise ArgumentError("is_hex accepts up to 3 arguments ({} given)".
328                             format(1 + num_length_args))
329     elif num_length_args == 2:
330         good_len = (length_args[0] <= len(s) <= length_args[1])
331     elif num_length_args == 1:
332         good_len = (len(s) == length_args[0])
333     else:
334         good_len = True
335     return bool(good_len and HEX_RE.match(s))