2752: Add arvados.collections.ResumableCollectionWriter.
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 def normalize_stream(s, stream):
31     stream_tokens = [s]
32     sortedfiles = list(stream.keys())
33     sortedfiles.sort()
34
35     blocks = {}
36     streamoffset = 0L
37     for f in sortedfiles:
38         for b in stream[f]:
39             if b[arvados.LOCATOR] not in blocks:
40                 stream_tokens.append(b[arvados.LOCATOR])
41                 blocks[b[arvados.LOCATOR]] = streamoffset
42                 streamoffset += b[arvados.BLOCKSIZE]
43
44     for f in sortedfiles:
45         current_span = None
46         fout = f.replace(' ', '\\040')
47         for segment in stream[f]:
48             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
49             if current_span == None:
50                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
51             else:
52                 if segmentoffset == current_span[1]:
53                     current_span[1] += segment[arvados.SEGMENTSIZE]
54                 else:
55                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
56                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
57
58         if current_span != None:
59             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
60
61         if len(stream[f]) == 0:
62             stream_tokens.append("0:0:{0}".format(fout))
63
64     return stream_tokens
65
66 def normalize(collection):
67     streams = {}
68     for s in collection.all_streams():
69         for f in s.all_files():
70             filestream = s.name() + "/" + f.name()
71             r = filestream.rindex("/")
72             streamname = filestream[:r]
73             filename = filestream[r+1:]
74             if streamname not in streams:
75                 streams[streamname] = {}
76             if filename not in streams[streamname]:
77                 streams[streamname][filename] = []
78             for r in f.segments:
79                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
80
81     normalized_streams = []
82     sortedstreams = list(streams.keys())
83     sortedstreams.sort()
84     for s in sortedstreams:
85         normalized_streams.append(normalize_stream(s, streams[s]))
86     return normalized_streams
87
88
89 class CollectionReader(object):
90     def __init__(self, manifest_locator_or_text):
91         if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
92             self._manifest_locator = manifest_locator_or_text
93             self._manifest_text = None
94         elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
95             self._manifest_text = manifest_locator_or_text
96             self._manifest_locator = None
97         else:
98             raise errors.ArgumentError(
99                 "Argument to CollectionReader must be a manifest or a collection UUID")
100         self._streams = None
101
102     def __enter__(self):
103         pass
104
105     def __exit__(self):
106         pass
107
108     def _populate(self):
109         if self._streams != None:
110             return
111         if not self._manifest_text:
112             try:
113                 c = arvados.api('v1').collections().get(
114                     uuid=self._manifest_locator).execute()
115                 self._manifest_text = c['manifest_text']
116             except Exception as e:
117                 logging.warning("API lookup failed for collection %s (%s: %s)" %
118                                 (self._manifest_locator, type(e), str(e)))
119                 self._manifest_text = Keep.get(self._manifest_locator)
120         self._streams = []
121         for stream_line in self._manifest_text.split("\n"):
122             if stream_line != '':
123                 stream_tokens = stream_line.split()
124                 self._streams += [stream_tokens]
125         self._streams = normalize(self)
126
127         # now regenerate the manifest text based on the normalized stream
128
129         #print "normalizing", self._manifest_text
130         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
131         #print "result", self._manifest_text
132
133
134     def all_streams(self):
135         self._populate()
136         resp = []
137         for s in self._streams:
138             resp.append(StreamReader(s))
139         return resp
140
141     def all_files(self):
142         for s in self.all_streams():
143             for f in s.all_files():
144                 yield f
145
146     def manifest_text(self):
147         self._populate()
148         return self._manifest_text
149
150 class CollectionWriter(object):
151     KEEP_BLOCK_SIZE = 2**26
152
153     def __init__(self):
154         self._data_buffer = []
155         self._data_buffer_len = 0
156         self._current_stream_files = []
157         self._current_stream_length = 0
158         self._current_stream_locators = []
159         self._current_stream_name = '.'
160         self._current_file_name = None
161         self._current_file_pos = 0
162         self._finished_streams = []
163         self._close_file = None
164         self._queued_file = None
165         self._queued_dirents = deque()
166         self._queued_trees = deque()
167
168     def __enter__(self):
169         pass
170
171     def __exit__(self):
172         self.finish()
173
174     def _do_queued_work(self):
175         # The work queue consists of three pieces:
176         # * _queued_file: The file object we're currently writing to the
177         #   Collection.
178         # * _queued_dirents: Entries under the current directory
179         #   (_queued_trees[0]) that we want to write or recurse through.
180         #   This may contain files from subdirectories if
181         #   max_manifest_depth == 0 for this directory.
182         # * _queued_trees: Directories that should be written as separate
183         #   streams to the Collection.
184         # This function handles the smallest piece of work currently queued
185         # (current file, then current directory, then next directory) until
186         # no work remains.  The _work_THING methods each do a unit of work on
187         # THING.  _queue_THING methods add a THING to the work queue.
188         while True:
189             if self._queued_file:
190                 self._work_file()
191             elif self._queued_dirents:
192                 self._work_dirents()
193             elif self._queued_trees:
194                 self._work_trees()
195             else:
196                 break
197             self.checkpoint_state()
198
199     def checkpoint_state(self):
200         # Subclasses can implement this method to, e.g., report or record state.
201         pass
202
203     def _work_file(self):
204         while True:
205             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
206             if not buf:
207                 break
208             self.write(buf)
209         self.finish_current_file()
210         if self._close_file:
211             self._queued_file.close()
212         self._close_file = None
213         self._queued_file = None
214
215     def _work_dirents(self):
216         path, stream_name, max_manifest_depth = self._queued_trees[0]
217         if stream_name != self.current_stream_name():
218             self.start_new_stream(stream_name)
219         while self._queued_dirents:
220             dirent = self._queued_dirents.popleft()
221             target = os.path.join(path, dirent)
222             if os.path.isdir(target):
223                 self._queue_tree(target,
224                                  os.path.join(stream_name, dirent),
225                                  max_manifest_depth - 1)
226             else:
227                 self._queue_file(target, dirent)
228                 break
229         if not self._queued_dirents:
230             self._queued_trees.popleft()
231
232     def _work_trees(self):
233         path, stream_name, max_manifest_depth = self._queued_trees[0]
234         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
235                         else os.listdir)
236         self._queue_dirents(stream_name, make_dirents(path))
237
238     def _queue_file(self, source, filename=None):
239         assert (self._queued_file is None), "tried to queue more than one file"
240         if not hasattr(source, 'read'):
241             source = open(source, 'rb')
242             self._close_file = True
243         else:
244             self._close_file = False
245         if filename is None:
246             filename = os.path.basename(source.name)
247         self.start_new_file(filename)
248         self._queued_file = source
249
250     def _queue_dirents(self, stream_name, dirents):
251         assert (not self._queued_dirents), "tried to queue more than one tree"
252         self._queued_dirents = deque(sorted(dirents))
253
254     def _queue_tree(self, path, stream_name, max_manifest_depth):
255         self._queued_trees.append((path, stream_name, max_manifest_depth))
256
257     def write_file(self, source, filename=None):
258         self._queue_file(source, filename)
259         self._do_queued_work()
260
261     def write_directory_tree(self,
262                              path, stream_name='.', max_manifest_depth=-1):
263         self._queue_tree(path, stream_name, max_manifest_depth)
264         self._do_queued_work()
265
266     def write(self, newdata):
267         if hasattr(newdata, '__iter__'):
268             for s in newdata:
269                 self.write(s)
270             return
271         self._data_buffer += [newdata]
272         self._data_buffer_len += len(newdata)
273         self._current_stream_length += len(newdata)
274         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
275             self.flush_data()
276
277     def flush_data(self):
278         data_buffer = ''.join(self._data_buffer)
279         if data_buffer != '':
280             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
281             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
282             self._data_buffer_len = len(self._data_buffer[0])
283             self.checkpoint_state()
284
285     def start_new_file(self, newfilename=None):
286         self.finish_current_file()
287         self.set_current_file_name(newfilename)
288
289     def set_current_file_name(self, newfilename):
290         if re.search(r'[\t\n]', newfilename):
291             raise errors.AssertionError(
292                 "Manifest filenames cannot contain whitespace: %s" %
293                 newfilename)
294         self._current_file_name = newfilename
295
296     def current_file_name(self):
297         return self._current_file_name
298
299     def finish_current_file(self):
300         if self._current_file_name == None:
301             if self._current_file_pos == self._current_stream_length:
302                 return
303             raise errors.AssertionError(
304                 "Cannot finish an unnamed file " +
305                 "(%d bytes at offset %d in '%s' stream)" %
306                 (self._current_stream_length - self._current_file_pos,
307                  self._current_file_pos,
308                  self._current_stream_name))
309         self._current_stream_files += [[self._current_file_pos,
310                                        self._current_stream_length - self._current_file_pos,
311                                        self._current_file_name]]
312         self._current_file_pos = self._current_stream_length
313
314     def start_new_stream(self, newstreamname='.'):
315         self.finish_current_stream()
316         self.set_current_stream_name(newstreamname)
317
318     def set_current_stream_name(self, newstreamname):
319         if re.search(r'[\t\n]', newstreamname):
320             raise errors.AssertionError(
321                 "Manifest stream names cannot contain whitespace")
322         self._current_stream_name = '.' if newstreamname=='' else newstreamname
323
324     def current_stream_name(self):
325         return self._current_stream_name
326
327     def finish_current_stream(self):
328         self.finish_current_file()
329         self.flush_data()
330         if len(self._current_stream_files) == 0:
331             pass
332         elif self._current_stream_name == None:
333             raise errors.AssertionError(
334                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
335                 (self._current_stream_length, len(self._current_stream_files)))
336         else:
337             if len(self._current_stream_locators) == 0:
338                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
339             self._finished_streams += [[self._current_stream_name,
340                                        self._current_stream_locators,
341                                        self._current_stream_files]]
342         self._current_stream_files = []
343         self._current_stream_length = 0
344         self._current_stream_locators = []
345         self._current_stream_name = None
346         self._current_file_pos = 0
347         self._current_file_name = None
348
349     def finish(self):
350         return Keep.put(self.manifest_text())
351
352     def manifest_text(self):
353         self.finish_current_stream()
354         manifest = ''
355
356         for stream in self._finished_streams:
357             if not re.search(r'^\.(/.*)?$', stream[0]):
358                 manifest += './'
359             manifest += stream[0].replace(' ', '\\040')
360             manifest += ' ' + ' '.join(stream[1])
361             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
362             manifest += "\n"
363
364         #print 'writer',manifest
365         #print 'after reader',CollectionReader(manifest).manifest_text()
366
367         return CollectionReader(manifest).manifest_text()
368
369     def data_locators(self):
370         ret = []
371         for name, locators, files in self._finished_streams:
372             ret += locators
373         return ret
374
375
376 class ResumableCollectionWriter(CollectionWriter):
377     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
378                    '_current_stream_locators', '_current_stream_name',
379                    '_current_file_name', '_current_file_pos', '_close_file',
380                    '_data_buffer', '_dependencies', '_finished_streams',
381                    '_queued_dirents', '_queued_trees']
382
383     def __init__(self):
384         self._dependencies = {}
385         super(ResumableCollectionWriter, self).__init__()
386
387     @classmethod
388     def from_state(cls, state):
389         writer = cls()
390         for attr_name in cls.STATE_PROPS:
391             attr_value = state[attr_name]
392             attr_class = getattr(writer, attr_name).__class__
393             # Coerce the value into the same type as the initial value, if
394             # needed.
395             if attr_class not in (type(None), attr_value.__class__):
396                 attr_value = attr_class(attr_value)
397             setattr(writer, attr_name, attr_value)
398         # Check dependencies before we try to resume anything.
399         writer.check_dependencies()
400         if state['_current_file'] is not None:
401             path, pos = state['_current_file']
402             try:
403                 writer._queued_file = open(path, 'rb')
404                 writer._queued_file.seek(pos)
405             except IOError as error:
406                 raise errors.StaleWriterStateError(
407                     "failed to reopen active file {}: {}".format(path, error))
408         writer._do_queued_work()
409         return writer
410
411     def check_dependencies(self):
412         for path, orig_stat in self._dependencies.items():
413             if not S_ISREG(orig_stat[ST_MODE]):
414                 raise errors.StaleWriterStateError("{} not file".format(path))
415             try:
416                 now_stat = tuple(os.stat(path))
417             except OSError as error:
418                 raise errors.StaleWriterStateError(
419                     "failed to stat {}: {}".format(path, error))
420             if ((not S_ISREG(now_stat[ST_MODE])) or
421                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
422                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
423                 raise errors.StaleWriterStateError("{} changed".format(path))
424
425     def dump_state(self, copy_func=lambda x: x):
426         state = {attr: copy_func(getattr(self, attr))
427                  for attr in self.STATE_PROPS}
428         if self._queued_file is None:
429             state['_current_file'] = None
430         else:
431             state['_current_file'] = (os.path.realpath(self._queued_file.name),
432                                       self._queued_file.tell())
433         return state
434
435     def _queue_file(self, source, filename=None):
436         try:
437             src_path = os.path.realpath(source)
438         except Exception:
439             raise errors.AssertionError("{} not a file path".format(source))
440         try:
441             path_stat = os.stat(src_path)
442         except OSError as error:
443             raise errors.AssertionError(
444                 "could not stat {}: {}".format(source, error))
445         super(ResumableCollectionWriter, self)._queue_file(source, filename)
446         fd_stat = os.fstat(self._queued_file.fileno())
447         if path_stat.st_ino != fd_stat.st_ino:
448             raise errors.AssertionError(
449                 "{} changed between open and stat calls".format(source))
450         self._dependencies[src_path] = tuple(fd_stat)
451
452     def write(self, data):
453         if self._queued_file is None:
454             raise errors.AssertionError(
455                 "resumable writer can't accept unsourced data")
456         return super(ResumableCollectionWriter, self).write(data)