2752: Implement CollectionWriter with a work queue.
[arvados.git] / sdk / python / arvados / util.py
1 import fcntl
2 import hashlib
3 import os
4 import re
5 import subprocess
6 import errno
7 import sys
8 from arvados.collection import *
9
10 def clear_tmpdir(path=None):
11     """
12     Ensure the given directory (or TASK_TMPDIR if none given)
13     exists and is empty.
14     """
15     if path == None:
16         path = arvados.current_task().tmpdir
17     if os.path.exists(path):
18         p = subprocess.Popen(['rm', '-rf', path])
19         stdout, stderr = p.communicate(None)
20         if p.returncode != 0:
21             raise Exception('rm -rf %s: %s' % (path, stderr))
22     os.mkdir(path)
23
24 def run_command(execargs, **kwargs):
25     kwargs.setdefault('stdin', subprocess.PIPE)
26     kwargs.setdefault('stdout', subprocess.PIPE)
27     kwargs.setdefault('stderr', sys.stderr)
28     kwargs.setdefault('close_fds', True)
29     kwargs.setdefault('shell', False)
30     p = subprocess.Popen(execargs, **kwargs)
31     stdoutdata, stderrdata = p.communicate(None)
32     if p.returncode != 0:
33         raise errors.CommandFailedError(
34             "run_command %s exit %d:\n%s" %
35             (execargs, p.returncode, stderrdata))
36     return stdoutdata, stderrdata
37
38 def git_checkout(url, version, path):
39     if not re.search('^/', path):
40         path = os.path.join(arvados.current_job().tmpdir, path)
41     if not os.path.exists(path):
42         run_command(["git", "clone", url, path],
43                     cwd=os.path.dirname(path))
44     run_command(["git", "checkout", version],
45                 cwd=path)
46     return path
47
48 def tar_extractor(path, decompress_flag):
49     return subprocess.Popen(["tar",
50                              "-C", path,
51                              ("-x%sf" % decompress_flag),
52                              "-"],
53                             stdout=None,
54                             stdin=subprocess.PIPE, stderr=sys.stderr,
55                             shell=False, close_fds=True)
56
57 def tarball_extract(tarball, path):
58     """Retrieve a tarball from Keep and extract it to a local
59     directory.  Return the absolute path where the tarball was
60     extracted. If the top level of the tarball contained just one
61     file or directory, return the absolute path of that single
62     item.
63
64     tarball -- collection locator
65     path -- where to extract the tarball: absolute, or relative to job tmp
66     """
67     if not re.search('^/', path):
68         path = os.path.join(arvados.current_job().tmpdir, path)
69     lockfile = open(path + '.lock', 'w')
70     fcntl.flock(lockfile, fcntl.LOCK_EX)
71     try:
72         os.stat(path)
73     except OSError:
74         os.mkdir(path)
75     already_have_it = False
76     try:
77         if os.readlink(os.path.join(path, '.locator')) == tarball:
78             already_have_it = True
79     except OSError:
80         pass
81     if not already_have_it:
82
83         # emulate "rm -f" (i.e., if the file does not exist, we win)
84         try:
85             os.unlink(os.path.join(path, '.locator'))
86         except OSError:
87             if os.path.exists(os.path.join(path, '.locator')):
88                 os.unlink(os.path.join(path, '.locator'))
89
90         for f in CollectionReader(tarball).all_files():
91             if re.search('\.(tbz|tar.bz2)$', f.name()):
92                 p = tar_extractor(path, 'j')
93             elif re.search('\.(tgz|tar.gz)$', f.name()):
94                 p = tar_extractor(path, 'z')
95             elif re.search('\.tar$', f.name()):
96                 p = tar_extractor(path, '')
97             else:
98                 raise errors.AssertionError(
99                     "tarball_extract cannot handle filename %s" % f.name())
100             while True:
101                 buf = f.read(2**20)
102                 if len(buf) == 0:
103                     break
104                 p.stdin.write(buf)
105             p.stdin.close()
106             p.wait()
107             if p.returncode != 0:
108                 lockfile.close()
109                 raise errors.CommandFailedError(
110                     "tar exited %d" % p.returncode)
111         os.symlink(tarball, os.path.join(path, '.locator'))
112     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
113     lockfile.close()
114     if len(tld_extracts) == 1:
115         return os.path.join(path, tld_extracts[0])
116     return path
117
118 def zipball_extract(zipball, path):
119     """Retrieve a zip archive from Keep and extract it to a local
120     directory.  Return the absolute path where the archive was
121     extracted. If the top level of the archive contained just one
122     file or directory, return the absolute path of that single
123     item.
124
125     zipball -- collection locator
126     path -- where to extract the archive: absolute, or relative to job tmp
127     """
128     if not re.search('^/', path):
129         path = os.path.join(arvados.current_job().tmpdir, path)
130     lockfile = open(path + '.lock', 'w')
131     fcntl.flock(lockfile, fcntl.LOCK_EX)
132     try:
133         os.stat(path)
134     except OSError:
135         os.mkdir(path)
136     already_have_it = False
137     try:
138         if os.readlink(os.path.join(path, '.locator')) == zipball:
139             already_have_it = True
140     except OSError:
141         pass
142     if not already_have_it:
143
144         # emulate "rm -f" (i.e., if the file does not exist, we win)
145         try:
146             os.unlink(os.path.join(path, '.locator'))
147         except OSError:
148             if os.path.exists(os.path.join(path, '.locator')):
149                 os.unlink(os.path.join(path, '.locator'))
150
151         for f in CollectionReader(zipball).all_files():
152             if not re.search('\.zip$', f.name()):
153                 raise errors.NotImplementedError(
154                     "zipball_extract cannot handle filename %s" % f.name())
155             zip_filename = os.path.join(path, os.path.basename(f.name()))
156             zip_file = open(zip_filename, 'wb')
157             while True:
158                 buf = f.read(2**20)
159                 if len(buf) == 0:
160                     break
161                 zip_file.write(buf)
162             zip_file.close()
163             
164             p = subprocess.Popen(["unzip",
165                                   "-q", "-o",
166                                   "-d", path,
167                                   zip_filename],
168                                  stdout=None,
169                                  stdin=None, stderr=sys.stderr,
170                                  shell=False, close_fds=True)
171             p.wait()
172             if p.returncode != 0:
173                 lockfile.close()
174                 raise errors.CommandFailedError(
175                     "unzip exited %d" % p.returncode)
176             os.unlink(zip_filename)
177         os.symlink(zipball, os.path.join(path, '.locator'))
178     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
179     lockfile.close()
180     if len(tld_extracts) == 1:
181         return os.path.join(path, tld_extracts[0])
182     return path
183
184 def collection_extract(collection, path, files=[], decompress=True):
185     """Retrieve a collection from Keep and extract it to a local
186     directory.  Return the absolute path where the collection was
187     extracted.
188
189     collection -- collection locator
190     path -- where to extract: absolute, or relative to job tmp
191     """
192     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
193     if matches:
194         collection_hash = matches.group(1)
195     else:
196         collection_hash = hashlib.md5(collection).hexdigest()
197     if not re.search('^/', path):
198         path = os.path.join(arvados.current_job().tmpdir, path)
199     lockfile = open(path + '.lock', 'w')
200     fcntl.flock(lockfile, fcntl.LOCK_EX)
201     try:
202         os.stat(path)
203     except OSError:
204         os.mkdir(path)
205     already_have_it = False
206     try:
207         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
208             already_have_it = True
209     except OSError:
210         pass
211
212     # emulate "rm -f" (i.e., if the file does not exist, we win)
213     try:
214         os.unlink(os.path.join(path, '.locator'))
215     except OSError:
216         if os.path.exists(os.path.join(path, '.locator')):
217             os.unlink(os.path.join(path, '.locator'))
218
219     files_got = []
220     for s in CollectionReader(collection).all_streams():
221         stream_name = s.name()
222         for f in s.all_files():
223             if (files == [] or
224                 ((f.name() not in files_got) and
225                  (f.name() in files or
226                   (decompress and f.decompressed_name() in files)))):
227                 outname = f.decompressed_name() if decompress else f.name()
228                 files_got += [outname]
229                 if os.path.exists(os.path.join(path, stream_name, outname)):
230                     continue
231                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
232                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
233                 for buf in (f.readall_decompressed() if decompress
234                             else f.readall()):
235                     outfile.write(buf)
236                 outfile.close()
237     if len(files_got) < len(files):
238         raise errors.AssertionError(
239             "Wanted files %s but only got %s from %s" %
240             (files, files_got,
241              [z.name() for z in CollectionReader(collection).all_files()]))
242     os.symlink(collection_hash, os.path.join(path, '.locator'))
243
244     lockfile.close()
245     return path
246
247 def mkdir_dash_p(path):
248     if not os.path.isdir(path):
249         try:
250             os.makedirs(path)
251         except OSError as e:
252             if e.errno == errno.EEXIST and os.path.isdir(path):
253                 # It is not an error if someone else creates the
254                 # directory between our exists() and makedirs() calls.
255                 pass
256             else:
257                 raise
258
259 def stream_extract(stream, path, files=[], decompress=True):
260     """Retrieve a stream from Keep and extract it to a local
261     directory.  Return the absolute path where the stream was
262     extracted.
263
264     stream -- StreamReader object
265     path -- where to extract: absolute, or relative to job tmp
266     """
267     if not re.search('^/', path):
268         path = os.path.join(arvados.current_job().tmpdir, path)
269     lockfile = open(path + '.lock', 'w')
270     fcntl.flock(lockfile, fcntl.LOCK_EX)
271     try:
272         os.stat(path)
273     except OSError:
274         os.mkdir(path)
275
276     files_got = []
277     for f in stream.all_files():
278         if (files == [] or
279             ((f.name() not in files_got) and
280              (f.name() in files or
281               (decompress and f.decompressed_name() in files)))):
282             outname = f.decompressed_name() if decompress else f.name()
283             files_got += [outname]
284             if os.path.exists(os.path.join(path, outname)):
285                 os.unlink(os.path.join(path, outname))
286             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
287             outfile = open(os.path.join(path, outname), 'wb')
288             for buf in (f.readall_decompressed() if decompress
289                         else f.readall()):
290                 outfile.write(buf)
291             outfile.close()
292     if len(files_got) < len(files):
293         raise errors.AssertionError(
294             "Wanted files %s but only got %s from %s" %
295             (files, files_got, [z.name() for z in stream.all_files()]))
296     lockfile.close()
297     return path
298
299 def listdir_recursive(dirname, base=None):
300     allfiles = []
301     for ent in sorted(os.listdir(dirname)):
302         ent_path = os.path.join(dirname, ent)
303         ent_base = os.path.join(base, ent) if base else ent
304         if os.path.isdir(ent_path):
305             allfiles += listdir_recursive(ent_path, ent_base)
306         else:
307             allfiles += [ent_base]
308     return allfiles