Retrieve manifest_text from API server. If that fails, emit a warning
[arvados.git] / sdk / python / arvados / util.py
1 import fcntl
2 import hashlib
3 import os
4 import re
5 import subprocess
6 import errno
7
8 def clear_tmpdir(path=None):
9     """
10     Ensure the given directory (or TASK_TMPDIR if none given)
11     exists and is empty.
12     """
13     if path == None:
14         path = current_task().tmpdir
15     if os.path.exists(path):
16         p = subprocess.Popen(['rm', '-rf', path])
17         stdout, stderr = p.communicate(None)
18         if p.returncode != 0:
19             raise Exception('rm -rf %s: %s' % (path, stderr))
20     os.mkdir(path)
21
22 def run_command(execargs, **kwargs):
23     kwargs.setdefault('stdin', subprocess.PIPE)
24     kwargs.setdefault('stdout', subprocess.PIPE)
25     kwargs.setdefault('stderr', sys.stderr)
26     kwargs.setdefault('close_fds', True)
27     kwargs.setdefault('shell', False)
28     p = subprocess.Popen(execargs, **kwargs)
29     stdoutdata, stderrdata = p.communicate(None)
30     if p.returncode != 0:
31         raise errors.CommandFailedError(
32             "run_command %s exit %d:\n%s" %
33             (execargs, p.returncode, stderrdata))
34     return stdoutdata, stderrdata
35
36 def git_checkout(url, version, path):
37     if not re.search('^/', path):
38         path = os.path.join(current_job().tmpdir, path)
39     if not os.path.exists(path):
40         util.run_command(["git", "clone", url, path],
41                          cwd=os.path.dirname(path))
42     util.run_command(["git", "checkout", version],
43                      cwd=path)
44     return path
45
46 def tar_extractor(path, decompress_flag):
47     return subprocess.Popen(["tar",
48                              "-C", path,
49                              ("-x%sf" % decompress_flag),
50                              "-"],
51                             stdout=None,
52                             stdin=subprocess.PIPE, stderr=sys.stderr,
53                             shell=False, close_fds=True)
54
55 def tarball_extract(tarball, path):
56     """Retrieve a tarball from Keep and extract it to a local
57     directory.  Return the absolute path where the tarball was
58     extracted. If the top level of the tarball contained just one
59     file or directory, return the absolute path of that single
60     item.
61
62     tarball -- collection locator
63     path -- where to extract the tarball: absolute, or relative to job tmp
64     """
65     if not re.search('^/', path):
66         path = os.path.join(current_job().tmpdir, path)
67     lockfile = open(path + '.lock', 'w')
68     fcntl.flock(lockfile, fcntl.LOCK_EX)
69     try:
70         os.stat(path)
71     except OSError:
72         os.mkdir(path)
73     already_have_it = False
74     try:
75         if os.readlink(os.path.join(path, '.locator')) == tarball:
76             already_have_it = True
77     except OSError:
78         pass
79     if not already_have_it:
80
81         # emulate "rm -f" (i.e., if the file does not exist, we win)
82         try:
83             os.unlink(os.path.join(path, '.locator'))
84         except OSError:
85             if os.path.exists(os.path.join(path, '.locator')):
86                 os.unlink(os.path.join(path, '.locator'))
87
88         for f in CollectionReader(tarball).all_files():
89             if re.search('\.(tbz|tar.bz2)$', f.name()):
90                 p = util.tar_extractor(path, 'j')
91             elif re.search('\.(tgz|tar.gz)$', f.name()):
92                 p = util.tar_extractor(path, 'z')
93             elif re.search('\.tar$', f.name()):
94                 p = util.tar_extractor(path, '')
95             else:
96                 raise errors.AssertionError(
97                     "tarball_extract cannot handle filename %s" % f.name())
98             while True:
99                 buf = f.read(2**20)
100                 if len(buf) == 0:
101                     break
102                 p.stdin.write(buf)
103             p.stdin.close()
104             p.wait()
105             if p.returncode != 0:
106                 lockfile.close()
107                 raise errors.CommandFailedError(
108                     "tar exited %d" % p.returncode)
109         os.symlink(tarball, os.path.join(path, '.locator'))
110     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
111     lockfile.close()
112     if len(tld_extracts) == 1:
113         return os.path.join(path, tld_extracts[0])
114     return path
115
116 def zipball_extract(zipball, path):
117     """Retrieve a zip archive from Keep and extract it to a local
118     directory.  Return the absolute path where the archive was
119     extracted. If the top level of the archive contained just one
120     file or directory, return the absolute path of that single
121     item.
122
123     zipball -- collection locator
124     path -- where to extract the archive: absolute, or relative to job tmp
125     """
126     if not re.search('^/', path):
127         path = os.path.join(current_job().tmpdir, path)
128     lockfile = open(path + '.lock', 'w')
129     fcntl.flock(lockfile, fcntl.LOCK_EX)
130     try:
131         os.stat(path)
132     except OSError:
133         os.mkdir(path)
134     already_have_it = False
135     try:
136         if os.readlink(os.path.join(path, '.locator')) == zipball:
137             already_have_it = True
138     except OSError:
139         pass
140     if not already_have_it:
141
142         # emulate "rm -f" (i.e., if the file does not exist, we win)
143         try:
144             os.unlink(os.path.join(path, '.locator'))
145         except OSError:
146             if os.path.exists(os.path.join(path, '.locator')):
147                 os.unlink(os.path.join(path, '.locator'))
148
149         for f in CollectionReader(zipball).all_files():
150             if not re.search('\.zip$', f.name()):
151                 raise errors.NotImplementedError(
152                     "zipball_extract cannot handle filename %s" % f.name())
153             zip_filename = os.path.join(path, os.path.basename(f.name()))
154             zip_file = open(zip_filename, 'wb')
155             while True:
156                 buf = f.read(2**20)
157                 if len(buf) == 0:
158                     break
159                 zip_file.write(buf)
160             zip_file.close()
161             
162             p = subprocess.Popen(["unzip",
163                                   "-q", "-o",
164                                   "-d", path,
165                                   zip_filename],
166                                  stdout=None,
167                                  stdin=None, stderr=sys.stderr,
168                                  shell=False, close_fds=True)
169             p.wait()
170             if p.returncode != 0:
171                 lockfile.close()
172                 raise errors.CommandFailedError(
173                     "unzip exited %d" % p.returncode)
174             os.unlink(zip_filename)
175         os.symlink(zipball, os.path.join(path, '.locator'))
176     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
177     lockfile.close()
178     if len(tld_extracts) == 1:
179         return os.path.join(path, tld_extracts[0])
180     return path
181
182 def collection_extract(collection, path, files=[], decompress=True):
183     """Retrieve a collection from Keep and extract it to a local
184     directory.  Return the absolute path where the collection was
185     extracted.
186
187     collection -- collection locator
188     path -- where to extract: absolute, or relative to job tmp
189     """
190     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
191     if matches:
192         collection_hash = matches.group(1)
193     else:
194         collection_hash = hashlib.md5(collection).hexdigest()
195     if not re.search('^/', path):
196         path = os.path.join(current_job().tmpdir, path)
197     lockfile = open(path + '.lock', 'w')
198     fcntl.flock(lockfile, fcntl.LOCK_EX)
199     try:
200         os.stat(path)
201     except OSError:
202         os.mkdir(path)
203     already_have_it = False
204     try:
205         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
206             already_have_it = True
207     except OSError:
208         pass
209
210     # emulate "rm -f" (i.e., if the file does not exist, we win)
211     try:
212         os.unlink(os.path.join(path, '.locator'))
213     except OSError:
214         if os.path.exists(os.path.join(path, '.locator')):
215             os.unlink(os.path.join(path, '.locator'))
216
217     files_got = []
218     for s in CollectionReader(collection).all_streams():
219         stream_name = s.name()
220         for f in s.all_files():
221             if (files == [] or
222                 ((f.name() not in files_got) and
223                  (f.name() in files or
224                   (decompress and f.decompressed_name() in files)))):
225                 outname = f.decompressed_name() if decompress else f.name()
226                 files_got += [outname]
227                 if os.path.exists(os.path.join(path, stream_name, outname)):
228                     continue
229                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
230                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
231                 for buf in (f.readall_decompressed() if decompress
232                             else f.readall()):
233                     outfile.write(buf)
234                 outfile.close()
235     if len(files_got) < len(files):
236         raise errors.AssertionError(
237             "Wanted files %s but only got %s from %s" %
238             (files, files_got,
239              [z.name() for z in CollectionReader(collection).all_files()]))
240     os.symlink(collection_hash, os.path.join(path, '.locator'))
241
242     lockfile.close()
243     return path
244
245 def mkdir_dash_p(path):
246     if not os.path.isdir(path):
247         try:
248             os.makedirs(path)
249         except OSError as e:
250             if e.errno == errno.EEXIST and os.path.isdir(path):
251                 # It is not an error if someone else creates the
252                 # directory between our exists() and makedirs() calls.
253                 pass
254             else:
255                 raise
256
257 def stream_extract(stream, path, files=[], decompress=True):
258     """Retrieve a stream from Keep and extract it to a local
259     directory.  Return the absolute path where the stream was
260     extracted.
261
262     stream -- StreamReader object
263     path -- where to extract: absolute, or relative to job tmp
264     """
265     if not re.search('^/', path):
266         path = os.path.join(current_job().tmpdir, path)
267     lockfile = open(path + '.lock', 'w')
268     fcntl.flock(lockfile, fcntl.LOCK_EX)
269     try:
270         os.stat(path)
271     except OSError:
272         os.mkdir(path)
273
274     files_got = []
275     for f in stream.all_files():
276         if (files == [] or
277             ((f.name() not in files_got) and
278              (f.name() in files or
279               (decompress and f.decompressed_name() in files)))):
280             outname = f.decompressed_name() if decompress else f.name()
281             files_got += [outname]
282             if os.path.exists(os.path.join(path, outname)):
283                 os.unlink(os.path.join(path, outname))
284             util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
285             outfile = open(os.path.join(path, outname), 'wb')
286             for buf in (f.readall_decompressed() if decompress
287                         else f.readall()):
288                 outfile.write(buf)
289             outfile.close()
290     if len(files_got) < len(files):
291         raise errors.AssertionError(
292             "Wanted files %s but only got %s from %s" %
293             (files, files_got, [z.name() for z in stream.all_files()]))
294     lockfile.close()
295     return path
296
297 def listdir_recursive(dirname, base=None):
298     allfiles = []
299     for ent in sorted(os.listdir(dirname)):
300         ent_path = os.path.join(dirname, ent)
301         ent_base = os.path.join(base, ent) if base else ent
302         if os.path.isdir(ent_path):
303             allfiles += util.listdir_recursive(ent_path, ent_base)
304         else:
305             allfiles += [ent_base]
306     return allfiles