7593: Add arvados-cwl-runner to disambiguate if there is a conflict over what
[arvados.git] / sdk / python / arvados / util.py
1 import fcntl
2 import hashlib
3 import httplib2
4 import os
5 import re
6 import subprocess
7 import errno
8 import sys
9
10 import arvados
11 from arvados.collection import CollectionReader
12
13 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
14
15 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
16 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
17 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
18 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
19 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
20 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
21 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
22 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
23 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
24
25 def clear_tmpdir(path=None):
26     """
27     Ensure the given directory (or TASK_TMPDIR if none given)
28     exists and is empty.
29     """
30     if path is None:
31         path = arvados.current_task().tmpdir
32     if os.path.exists(path):
33         p = subprocess.Popen(['rm', '-rf', path])
34         stdout, stderr = p.communicate(None)
35         if p.returncode != 0:
36             raise Exception('rm -rf %s: %s' % (path, stderr))
37     os.mkdir(path)
38
39 def run_command(execargs, **kwargs):
40     kwargs.setdefault('stdin', subprocess.PIPE)
41     kwargs.setdefault('stdout', subprocess.PIPE)
42     kwargs.setdefault('stderr', sys.stderr)
43     kwargs.setdefault('close_fds', True)
44     kwargs.setdefault('shell', False)
45     p = subprocess.Popen(execargs, **kwargs)
46     stdoutdata, stderrdata = p.communicate(None)
47     if p.returncode != 0:
48         raise arvados.errors.CommandFailedError(
49             "run_command %s exit %d:\n%s" %
50             (execargs, p.returncode, stderrdata))
51     return stdoutdata, stderrdata
52
53 def git_checkout(url, version, path):
54     if not re.search('^/', path):
55         path = os.path.join(arvados.current_job().tmpdir, path)
56     if not os.path.exists(path):
57         run_command(["git", "clone", url, path],
58                     cwd=os.path.dirname(path))
59     run_command(["git", "checkout", version],
60                 cwd=path)
61     return path
62
63 def tar_extractor(path, decompress_flag):
64     return subprocess.Popen(["tar",
65                              "-C", path,
66                              ("-x%sf" % decompress_flag),
67                              "-"],
68                             stdout=None,
69                             stdin=subprocess.PIPE, stderr=sys.stderr,
70                             shell=False, close_fds=True)
71
72 def tarball_extract(tarball, path):
73     """Retrieve a tarball from Keep and extract it to a local
74     directory.  Return the absolute path where the tarball was
75     extracted. If the top level of the tarball contained just one
76     file or directory, return the absolute path of that single
77     item.
78
79     tarball -- collection locator
80     path -- where to extract the tarball: absolute, or relative to job tmp
81     """
82     if not re.search('^/', path):
83         path = os.path.join(arvados.current_job().tmpdir, path)
84     lockfile = open(path + '.lock', 'w')
85     fcntl.flock(lockfile, fcntl.LOCK_EX)
86     try:
87         os.stat(path)
88     except OSError:
89         os.mkdir(path)
90     already_have_it = False
91     try:
92         if os.readlink(os.path.join(path, '.locator')) == tarball:
93             already_have_it = True
94     except OSError:
95         pass
96     if not already_have_it:
97
98         # emulate "rm -f" (i.e., if the file does not exist, we win)
99         try:
100             os.unlink(os.path.join(path, '.locator'))
101         except OSError:
102             if os.path.exists(os.path.join(path, '.locator')):
103                 os.unlink(os.path.join(path, '.locator'))
104
105         for f in CollectionReader(tarball).all_files():
106             if re.search('\.(tbz|tar.bz2)$', f.name()):
107                 p = tar_extractor(path, 'j')
108             elif re.search('\.(tgz|tar.gz)$', f.name()):
109                 p = tar_extractor(path, 'z')
110             elif re.search('\.tar$', f.name()):
111                 p = tar_extractor(path, '')
112             else:
113                 raise arvados.errors.AssertionError(
114                     "tarball_extract cannot handle filename %s" % f.name())
115             while True:
116                 buf = f.read(2**20)
117                 if len(buf) == 0:
118                     break
119                 p.stdin.write(buf)
120             p.stdin.close()
121             p.wait()
122             if p.returncode != 0:
123                 lockfile.close()
124                 raise arvados.errors.CommandFailedError(
125                     "tar exited %d" % p.returncode)
126         os.symlink(tarball, os.path.join(path, '.locator'))
127     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
128     lockfile.close()
129     if len(tld_extracts) == 1:
130         return os.path.join(path, tld_extracts[0])
131     return path
132
133 def zipball_extract(zipball, path):
134     """Retrieve a zip archive from Keep and extract it to a local
135     directory.  Return the absolute path where the archive was
136     extracted. If the top level of the archive contained just one
137     file or directory, return the absolute path of that single
138     item.
139
140     zipball -- collection locator
141     path -- where to extract the archive: absolute, or relative to job tmp
142     """
143     if not re.search('^/', path):
144         path = os.path.join(arvados.current_job().tmpdir, path)
145     lockfile = open(path + '.lock', 'w')
146     fcntl.flock(lockfile, fcntl.LOCK_EX)
147     try:
148         os.stat(path)
149     except OSError:
150         os.mkdir(path)
151     already_have_it = False
152     try:
153         if os.readlink(os.path.join(path, '.locator')) == zipball:
154             already_have_it = True
155     except OSError:
156         pass
157     if not already_have_it:
158
159         # emulate "rm -f" (i.e., if the file does not exist, we win)
160         try:
161             os.unlink(os.path.join(path, '.locator'))
162         except OSError:
163             if os.path.exists(os.path.join(path, '.locator')):
164                 os.unlink(os.path.join(path, '.locator'))
165
166         for f in CollectionReader(zipball).all_files():
167             if not re.search('\.zip$', f.name()):
168                 raise arvados.errors.NotImplementedError(
169                     "zipball_extract cannot handle filename %s" % f.name())
170             zip_filename = os.path.join(path, os.path.basename(f.name()))
171             zip_file = open(zip_filename, 'wb')
172             while True:
173                 buf = f.read(2**20)
174                 if len(buf) == 0:
175                     break
176                 zip_file.write(buf)
177             zip_file.close()
178
179             p = subprocess.Popen(["unzip",
180                                   "-q", "-o",
181                                   "-d", path,
182                                   zip_filename],
183                                  stdout=None,
184                                  stdin=None, stderr=sys.stderr,
185                                  shell=False, close_fds=True)
186             p.wait()
187             if p.returncode != 0:
188                 lockfile.close()
189                 raise arvados.errors.CommandFailedError(
190                     "unzip exited %d" % p.returncode)
191             os.unlink(zip_filename)
192         os.symlink(zipball, os.path.join(path, '.locator'))
193     tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
194     lockfile.close()
195     if len(tld_extracts) == 1:
196         return os.path.join(path, tld_extracts[0])
197     return path
198
199 def collection_extract(collection, path, files=[], decompress=True):
200     """Retrieve a collection from Keep and extract it to a local
201     directory.  Return the absolute path where the collection was
202     extracted.
203
204     collection -- collection locator
205     path -- where to extract: absolute, or relative to job tmp
206     """
207     matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
208     if matches:
209         collection_hash = matches.group(1)
210     else:
211         collection_hash = hashlib.md5(collection).hexdigest()
212     if not re.search('^/', path):
213         path = os.path.join(arvados.current_job().tmpdir, path)
214     lockfile = open(path + '.lock', 'w')
215     fcntl.flock(lockfile, fcntl.LOCK_EX)
216     try:
217         os.stat(path)
218     except OSError:
219         os.mkdir(path)
220     already_have_it = False
221     try:
222         if os.readlink(os.path.join(path, '.locator')) == collection_hash:
223             already_have_it = True
224     except OSError:
225         pass
226
227     # emulate "rm -f" (i.e., if the file does not exist, we win)
228     try:
229         os.unlink(os.path.join(path, '.locator'))
230     except OSError:
231         if os.path.exists(os.path.join(path, '.locator')):
232             os.unlink(os.path.join(path, '.locator'))
233
234     files_got = []
235     for s in CollectionReader(collection).all_streams():
236         stream_name = s.name()
237         for f in s.all_files():
238             if (files == [] or
239                 ((f.name() not in files_got) and
240                  (f.name() in files or
241                   (decompress and f.decompressed_name() in files)))):
242                 outname = f.decompressed_name() if decompress else f.name()
243                 files_got += [outname]
244                 if os.path.exists(os.path.join(path, stream_name, outname)):
245                     continue
246                 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
247                 outfile = open(os.path.join(path, stream_name, outname), 'wb')
248                 for buf in (f.readall_decompressed() if decompress
249                             else f.readall()):
250                     outfile.write(buf)
251                 outfile.close()
252     if len(files_got) < len(files):
253         raise arvados.errors.AssertionError(
254             "Wanted files %s but only got %s from %s" %
255             (files, files_got,
256              [z.name() for z in CollectionReader(collection).all_files()]))
257     os.symlink(collection_hash, os.path.join(path, '.locator'))
258
259     lockfile.close()
260     return path
261
262 def mkdir_dash_p(path):
263     if not os.path.isdir(path):
264         try:
265             os.makedirs(path)
266         except OSError as e:
267             if e.errno == errno.EEXIST and os.path.isdir(path):
268                 # It is not an error if someone else creates the
269                 # directory between our exists() and makedirs() calls.
270                 pass
271             else:
272                 raise
273
274 def stream_extract(stream, path, files=[], decompress=True):
275     """Retrieve a stream from Keep and extract it to a local
276     directory.  Return the absolute path where the stream was
277     extracted.
278
279     stream -- StreamReader object
280     path -- where to extract: absolute, or relative to job tmp
281     """
282     if not re.search('^/', path):
283         path = os.path.join(arvados.current_job().tmpdir, path)
284     lockfile = open(path + '.lock', 'w')
285     fcntl.flock(lockfile, fcntl.LOCK_EX)
286     try:
287         os.stat(path)
288     except OSError:
289         os.mkdir(path)
290
291     files_got = []
292     for f in stream.all_files():
293         if (files == [] or
294             ((f.name() not in files_got) and
295              (f.name() in files or
296               (decompress and f.decompressed_name() in files)))):
297             outname = f.decompressed_name() if decompress else f.name()
298             files_got += [outname]
299             if os.path.exists(os.path.join(path, outname)):
300                 os.unlink(os.path.join(path, outname))
301             mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
302             outfile = open(os.path.join(path, outname), 'wb')
303             for buf in (f.readall_decompressed() if decompress
304                         else f.readall()):
305                 outfile.write(buf)
306             outfile.close()
307     if len(files_got) < len(files):
308         raise arvados.errors.AssertionError(
309             "Wanted files %s but only got %s from %s" %
310             (files, files_got, [z.name() for z in stream.all_files()]))
311     lockfile.close()
312     return path
313
314 def listdir_recursive(dirname, base=None, max_depth=None):
315     """listdir_recursive(dirname, base, max_depth)
316
317     Return a list of file and directory names found under dirname.
318
319     If base is not None, prepend "{base}/" to each returned name.
320
321     If max_depth is None, descend into directories and return only the
322     names of files found in the directory tree.
323
324     If max_depth is a non-negative integer, stop descending into
325     directories at the given depth, and at that point return directory
326     names instead.
327
328     If max_depth==0 (and base is None) this is equivalent to
329     sorted(os.listdir(dirname)).
330     """
331     allfiles = []
332     for ent in sorted(os.listdir(dirname)):
333         ent_path = os.path.join(dirname, ent)
334         ent_base = os.path.join(base, ent) if base else ent
335         if os.path.isdir(ent_path) and max_depth != 0:
336             allfiles += listdir_recursive(
337                 ent_path, base=ent_base,
338                 max_depth=(max_depth-1 if max_depth else None))
339         else:
340             allfiles += [ent_base]
341     return allfiles
342
343 def is_hex(s, *length_args):
344     """is_hex(s[, length[, max_length]]) -> boolean
345
346     Return True if s is a string of hexadecimal digits.
347     If one length argument is given, the string must contain exactly
348     that number of digits.
349     If two length arguments are given, the string must contain a number of
350     digits between those two lengths, inclusive.
351     Return False otherwise.
352     """
353     num_length_args = len(length_args)
354     if num_length_args > 2:
355         raise arvados.errors.ArgumentError(
356             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
357     elif num_length_args == 2:
358         good_len = (length_args[0] <= len(s) <= length_args[1])
359     elif num_length_args == 1:
360         good_len = (len(s) == length_args[0])
361     else:
362         good_len = True
363     return bool(good_len and HEX_RE.match(s))
364
365 def list_all(fn, num_retries=0, **kwargs):
366     items = []
367     offset = 0
368     items_available = sys.maxint
369     while len(items) < items_available:
370         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
371         items += c['items']
372         items_available = c['items_available']
373         offset = c['offset'] + len(c['items'])
374     return items
375
376 def ca_certs_path(fallback=httplib2.CA_CERTS):
377     """Return the path of the best available CA certs source.
378
379     This function searches for various distribution sources of CA
380     certificates, and returns the first it finds.  If it doesn't find any,
381     it returns the value of `fallback` (httplib2's CA certs by default).
382     """
383     for ca_certs_path in [
384         # Debian:
385         '/etc/ssl/certs/ca-certificates.crt',
386         # Red Hat:
387         '/etc/pki/tls/certs/ca-bundle.crt',
388         ]:
389         if os.path.exists(ca_certs_path):
390             return ca_certs_path
391     return fallback