Handle zero-length streams that contain zero-length files. refs #3084
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 def normalize_stream(s, stream):
31     stream_tokens = [s]
32     sortedfiles = list(stream.keys())
33     sortedfiles.sort()
34
35     blocks = {}
36     streamoffset = 0L
37     for f in sortedfiles:
38         for b in stream[f]:
39             if b[arvados.LOCATOR] not in blocks:
40                 stream_tokens.append(b[arvados.LOCATOR])
41                 blocks[b[arvados.LOCATOR]] = streamoffset
42                 streamoffset += b[arvados.BLOCKSIZE]
43
44     if len(stream_tokens) == 1:
45         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
46
47     for f in sortedfiles:
48         current_span = None
49         fout = f.replace(' ', '\\040')
50         for segment in stream[f]:
51             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
52             if current_span == None:
53                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
54             else:
55                 if segmentoffset == current_span[1]:
56                     current_span[1] += segment[arvados.SEGMENTSIZE]
57                 else:
58                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
59                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
60
61         if current_span != None:
62             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
63
64         if len(stream[f]) == 0:
65             stream_tokens.append("0:0:{0}".format(fout))
66
67     return stream_tokens
68
69 def normalize(collection):
70     streams = {}
71     for s in collection.all_streams():
72         for f in s.all_files():
73             filestream = s.name() + "/" + f.name()
74             r = filestream.rindex("/")
75             streamname = filestream[:r]
76             filename = filestream[r+1:]
77             if streamname not in streams:
78                 streams[streamname] = {}
79             if filename not in streams[streamname]:
80                 streams[streamname][filename] = []
81             for r in f.segments:
82                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
83
84     normalized_streams = []
85     sortedstreams = list(streams.keys())
86     sortedstreams.sort()
87     for s in sortedstreams:
88         normalized_streams.append(normalize_stream(s, streams[s]))
89     return normalized_streams
90
91
92 class CollectionReader(object):
93     def __init__(self, manifest_locator_or_text):
94         if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
95             self._manifest_locator = manifest_locator_or_text
96             self._manifest_text = None
97         elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
98             self._manifest_text = manifest_locator_or_text
99             self._manifest_locator = None
100         else:
101             raise errors.ArgumentError(
102                 "Argument to CollectionReader must be a manifest or a collection UUID")
103         self._streams = None
104
105     def __enter__(self):
106         pass
107
108     def __exit__(self):
109         pass
110
111     def _populate(self):
112         if self._streams != None:
113             return
114         if not self._manifest_text:
115             try:
116                 c = arvados.api('v1').collections().get(
117                     uuid=self._manifest_locator).execute()
118                 self._manifest_text = c['manifest_text']
119             except Exception as e:
120                 logging.warning("API lookup failed for collection %s (%s: %s)" %
121                                 (self._manifest_locator, type(e), str(e)))
122                 self._manifest_text = Keep.get(self._manifest_locator)
123         self._streams = []
124         for stream_line in self._manifest_text.split("\n"):
125             if stream_line != '':
126                 stream_tokens = stream_line.split()
127                 self._streams += [stream_tokens]
128         self._streams = normalize(self)
129
130         # now regenerate the manifest text based on the normalized stream
131
132         #print "normalizing", self._manifest_text
133         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
134         #print "result", self._manifest_text
135
136
137     def all_streams(self):
138         self._populate()
139         resp = []
140         for s in self._streams:
141             resp.append(StreamReader(s))
142         return resp
143
144     def all_files(self):
145         for s in self.all_streams():
146             for f in s.all_files():
147                 yield f
148
149     def manifest_text(self, strip=False):
150         self._populate()
151         if strip:
152             m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
153             return m
154         else:
155             return self._manifest_text
156
157 class CollectionWriter(object):
158     KEEP_BLOCK_SIZE = 2**26
159
160     def __init__(self):
161         self._data_buffer = []
162         self._data_buffer_len = 0
163         self._current_stream_files = []
164         self._current_stream_length = 0
165         self._current_stream_locators = []
166         self._current_stream_name = '.'
167         self._current_file_name = None
168         self._current_file_pos = 0
169         self._finished_streams = []
170         self._close_file = None
171         self._queued_file = None
172         self._queued_dirents = deque()
173         self._queued_trees = deque()
174
175     def __enter__(self):
176         pass
177
178     def __exit__(self):
179         self.finish()
180
181     def do_queued_work(self):
182         # The work queue consists of three pieces:
183         # * _queued_file: The file object we're currently writing to the
184         #   Collection.
185         # * _queued_dirents: Entries under the current directory
186         #   (_queued_trees[0]) that we want to write or recurse through.
187         #   This may contain files from subdirectories if
188         #   max_manifest_depth == 0 for this directory.
189         # * _queued_trees: Directories that should be written as separate
190         #   streams to the Collection.
191         # This function handles the smallest piece of work currently queued
192         # (current file, then current directory, then next directory) until
193         # no work remains.  The _work_THING methods each do a unit of work on
194         # THING.  _queue_THING methods add a THING to the work queue.
195         while True:
196             if self._queued_file:
197                 self._work_file()
198             elif self._queued_dirents:
199                 self._work_dirents()
200             elif self._queued_trees:
201                 self._work_trees()
202             else:
203                 break
204
205     def _work_file(self):
206         while True:
207             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
208             if not buf:
209                 break
210             self.write(buf)
211         self.finish_current_file()
212         if self._close_file:
213             self._queued_file.close()
214         self._close_file = None
215         self._queued_file = None
216
217     def _work_dirents(self):
218         path, stream_name, max_manifest_depth = self._queued_trees[0]
219         if stream_name != self.current_stream_name():
220             self.start_new_stream(stream_name)
221         while self._queued_dirents:
222             dirent = self._queued_dirents.popleft()
223             target = os.path.join(path, dirent)
224             if os.path.isdir(target):
225                 self._queue_tree(target,
226                                  os.path.join(stream_name, dirent),
227                                  max_manifest_depth - 1)
228             else:
229                 self._queue_file(target, dirent)
230                 break
231         if not self._queued_dirents:
232             self._queued_trees.popleft()
233
234     def _work_trees(self):
235         path, stream_name, max_manifest_depth = self._queued_trees[0]
236         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
237                         else os.listdir)
238         self._queue_dirents(stream_name, make_dirents(path))
239
240     def _queue_file(self, source, filename=None):
241         assert (self._queued_file is None), "tried to queue more than one file"
242         if not hasattr(source, 'read'):
243             source = open(source, 'rb')
244             self._close_file = True
245         else:
246             self._close_file = False
247         if filename is None:
248             filename = os.path.basename(source.name)
249         self.start_new_file(filename)
250         self._queued_file = source
251
252     def _queue_dirents(self, stream_name, dirents):
253         assert (not self._queued_dirents), "tried to queue more than one tree"
254         self._queued_dirents = deque(sorted(dirents))
255
256     def _queue_tree(self, path, stream_name, max_manifest_depth):
257         self._queued_trees.append((path, stream_name, max_manifest_depth))
258
259     def write_file(self, source, filename=None):
260         self._queue_file(source, filename)
261         self.do_queued_work()
262
263     def write_directory_tree(self,
264                              path, stream_name='.', max_manifest_depth=-1):
265         self._queue_tree(path, stream_name, max_manifest_depth)
266         self.do_queued_work()
267
268     def write(self, newdata):
269         if hasattr(newdata, '__iter__'):
270             for s in newdata:
271                 self.write(s)
272             return
273         self._data_buffer += [newdata]
274         self._data_buffer_len += len(newdata)
275         self._current_stream_length += len(newdata)
276         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
277             self.flush_data()
278
279     def flush_data(self):
280         data_buffer = ''.join(self._data_buffer)
281         if data_buffer != '':
282             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
283             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
284             self._data_buffer_len = len(self._data_buffer[0])
285
286     def start_new_file(self, newfilename=None):
287         self.finish_current_file()
288         self.set_current_file_name(newfilename)
289
290     def set_current_file_name(self, newfilename):
291         if re.search(r'[\t\n]', newfilename):
292             raise errors.AssertionError(
293                 "Manifest filenames cannot contain whitespace: %s" %
294                 newfilename)
295         self._current_file_name = newfilename
296
297     def current_file_name(self):
298         return self._current_file_name
299
300     def finish_current_file(self):
301         if self._current_file_name == None:
302             if self._current_file_pos == self._current_stream_length:
303                 return
304             raise errors.AssertionError(
305                 "Cannot finish an unnamed file " +
306                 "(%d bytes at offset %d in '%s' stream)" %
307                 (self._current_stream_length - self._current_file_pos,
308                  self._current_file_pos,
309                  self._current_stream_name))
310         self._current_stream_files += [[self._current_file_pos,
311                                        self._current_stream_length - self._current_file_pos,
312                                        self._current_file_name]]
313         self._current_file_pos = self._current_stream_length
314
315     def start_new_stream(self, newstreamname='.'):
316         self.finish_current_stream()
317         self.set_current_stream_name(newstreamname)
318
319     def set_current_stream_name(self, newstreamname):
320         if re.search(r'[\t\n]', newstreamname):
321             raise errors.AssertionError(
322                 "Manifest stream names cannot contain whitespace")
323         self._current_stream_name = '.' if newstreamname=='' else newstreamname
324
325     def current_stream_name(self):
326         return self._current_stream_name
327
328     def finish_current_stream(self):
329         self.finish_current_file()
330         self.flush_data()
331         if len(self._current_stream_files) == 0:
332             pass
333         elif self._current_stream_name == None:
334             raise errors.AssertionError(
335                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
336                 (self._current_stream_length, len(self._current_stream_files)))
337         else:
338             if len(self._current_stream_locators) == 0:
339                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
340             self._finished_streams += [[self._current_stream_name,
341                                        self._current_stream_locators,
342                                        self._current_stream_files]]
343         self._current_stream_files = []
344         self._current_stream_length = 0
345         self._current_stream_locators = []
346         self._current_stream_name = None
347         self._current_file_pos = 0
348         self._current_file_name = None
349
350     def finish(self):
351         # Send the stripped manifest to Keep, to ensure that we use the
352         # same UUID regardless of what hints are used on the collection.
353         return Keep.put(self.stripped_manifest())
354
355     def stripped_manifest(self):
356         """
357         Return the manifest for the current collection with all permission
358         hints removed from the locators in the manifest.
359         """
360         raw = self.manifest_text()
361         clean = ''
362         for line in raw.split("\n"):
363             fields = line.split()
364             if len(fields) > 0:
365                 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
366                              for x in fields[1:-1] ]
367                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
368         return clean
369
370     def manifest_text(self):
371         self.finish_current_stream()
372         manifest = ''
373
374         for stream in self._finished_streams:
375             if not re.search(r'^\.(/.*)?$', stream[0]):
376                 manifest += './'
377             manifest += stream[0].replace(' ', '\\040')
378             manifest += ' ' + ' '.join(stream[1])
379             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
380             manifest += "\n"
381
382         #print 'writer',manifest
383         #print 'after reader',CollectionReader(manifest).manifest_text()
384
385         return CollectionReader(manifest).manifest_text()
386
387     def data_locators(self):
388         ret = []
389         for name, locators, files in self._finished_streams:
390             ret += locators
391         return ret
392
393
394 class ResumableCollectionWriter(CollectionWriter):
395     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
396                    '_current_stream_locators', '_current_stream_name',
397                    '_current_file_name', '_current_file_pos', '_close_file',
398                    '_data_buffer', '_dependencies', '_finished_streams',
399                    '_queued_dirents', '_queued_trees']
400
401     def __init__(self):
402         self._dependencies = {}
403         super(ResumableCollectionWriter, self).__init__()
404
405     @classmethod
406     def from_state(cls, state, *init_args, **init_kwargs):
407         # Try to build a new writer from scratch with the given state.
408         # If the state is not suitable to resume (because files have changed,
409         # been deleted, aren't predictable, etc.), raise a
410         # StaleWriterStateError.  Otherwise, return the initialized writer.
411         # The caller is responsible for calling writer.do_queued_work()
412         # appropriately after it's returned.
413         writer = cls(*init_args, **init_kwargs)
414         for attr_name in cls.STATE_PROPS:
415             attr_value = state[attr_name]
416             attr_class = getattr(writer, attr_name).__class__
417             # Coerce the value into the same type as the initial value, if
418             # needed.
419             if attr_class not in (type(None), attr_value.__class__):
420                 attr_value = attr_class(attr_value)
421             setattr(writer, attr_name, attr_value)
422         # Check dependencies before we try to resume anything.
423         if any(KeepLocator(ls).permission_expired()
424                for ls in writer._current_stream_locators):
425             raise errors.StaleWriterStateError(
426                 "locators include expired permission hint")
427         writer.check_dependencies()
428         if state['_current_file'] is not None:
429             path, pos = state['_current_file']
430             try:
431                 writer._queued_file = open(path, 'rb')
432                 writer._queued_file.seek(pos)
433             except IOError as error:
434                 raise errors.StaleWriterStateError(
435                     "failed to reopen active file {}: {}".format(path, error))
436         return writer
437
438     def check_dependencies(self):
439         for path, orig_stat in self._dependencies.items():
440             if not S_ISREG(orig_stat[ST_MODE]):
441                 raise errors.StaleWriterStateError("{} not file".format(path))
442             try:
443                 now_stat = tuple(os.stat(path))
444             except OSError as error:
445                 raise errors.StaleWriterStateError(
446                     "failed to stat {}: {}".format(path, error))
447             if ((not S_ISREG(now_stat[ST_MODE])) or
448                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
449                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
450                 raise errors.StaleWriterStateError("{} changed".format(path))
451
452     def dump_state(self, copy_func=lambda x: x):
453         state = {attr: copy_func(getattr(self, attr))
454                  for attr in self.STATE_PROPS}
455         if self._queued_file is None:
456             state['_current_file'] = None
457         else:
458             state['_current_file'] = (os.path.realpath(self._queued_file.name),
459                                       self._queued_file.tell())
460         return state
461
462     def _queue_file(self, source, filename=None):
463         try:
464             src_path = os.path.realpath(source)
465         except Exception:
466             raise errors.AssertionError("{} not a file path".format(source))
467         try:
468             path_stat = os.stat(src_path)
469         except OSError as stat_error:
470             path_stat = None
471         super(ResumableCollectionWriter, self)._queue_file(source, filename)
472         fd_stat = os.fstat(self._queued_file.fileno())
473         if not S_ISREG(fd_stat.st_mode):
474             # We won't be able to resume from this cache anyway, so don't
475             # worry about further checks.
476             self._dependencies[source] = tuple(fd_stat)
477         elif path_stat is None:
478             raise errors.AssertionError(
479                 "could not stat {}: {}".format(source, stat_error))
480         elif path_stat.st_ino != fd_stat.st_ino:
481             raise errors.AssertionError(
482                 "{} changed between open and stat calls".format(source))
483         else:
484             self._dependencies[src_path] = tuple(fd_stat)
485
486     def write(self, data):
487         if self._queued_file is None:
488             raise errors.AssertionError(
489                 "resumable writer can't accept unsourced data")
490         return super(ResumableCollectionWriter, self).write(data)