9701: Better bufferblock id generation.
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41 class _FileLikeObjectBase(object):
42     def __init__(self, name, mode):
43         self.name = name
44         self.mode = mode
45         self.closed = False
46
47     @staticmethod
48     def _before_close(orig_func):
49         @functools.wraps(orig_func)
50         def before_close_wrapper(self, *args, **kwargs):
51             if self.closed:
52                 raise ValueError("I/O operation on closed stream file")
53             return orig_func(self, *args, **kwargs)
54         return before_close_wrapper
55
56     def __enter__(self):
57         return self
58
59     def __exit__(self, exc_type, exc_value, traceback):
60         try:
61             self.close()
62         except Exception:
63             if exc_type is None:
64                 raise
65
66     def close(self):
67         self.closed = True
68
69
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71     def __init__(self, name, mode, num_retries=None):
72         super(ArvadosFileReaderBase, self).__init__(name, mode)
73         self._filepos = 0L
74         self.num_retries = num_retries
75         self._readline_cache = (None, None)
76
77     def __iter__(self):
78         while True:
79             data = self.readline()
80             if not data:
81                 break
82             yield data
83
84     def decompressed_name(self):
85         return re.sub('\.(bz2|gz)$', '', self.name)
86
87     @_FileLikeObjectBase._before_close
88     def seek(self, pos, whence=os.SEEK_SET):
89         if whence == os.SEEK_CUR:
90             pos += self._filepos
91         elif whence == os.SEEK_END:
92             pos += self.size()
93         self._filepos = min(max(pos, 0L), self.size())
94
95     def tell(self):
96         return self._filepos
97
98     @_FileLikeObjectBase._before_close
99     @retry_method
100     def readall(self, size=2**20, num_retries=None):
101         while True:
102             data = self.read(size, num_retries=num_retries)
103             if data == '':
104                 break
105             yield data
106
107     @_FileLikeObjectBase._before_close
108     @retry_method
109     def readline(self, size=float('inf'), num_retries=None):
110         cache_pos, cache_data = self._readline_cache
111         if self.tell() == cache_pos:
112             data = [cache_data]
113             self._filepos += len(cache_data)
114         else:
115             data = ['']
116         data_size = len(data[-1])
117         while (data_size < size) and ('\n' not in data[-1]):
118             next_read = self.read(2 ** 20, num_retries=num_retries)
119             if not next_read:
120                 break
121             data.append(next_read)
122             data_size += len(next_read)
123         data = ''.join(data)
124         try:
125             nextline_index = data.index('\n') + 1
126         except ValueError:
127             nextline_index = len(data)
128         nextline_index = min(nextline_index, size)
129         self._filepos -= len(data) - nextline_index
130         self._readline_cache = (self.tell(), data[nextline_index:])
131         return data[:nextline_index]
132
133     @_FileLikeObjectBase._before_close
134     @retry_method
135     def decompress(self, decompress, size, num_retries=None):
136         for segment in self.readall(size, num_retries=num_retries):
137             data = decompress(segment)
138             if data:
139                 yield data
140
141     @_FileLikeObjectBase._before_close
142     @retry_method
143     def readall_decompressed(self, size=2**20, num_retries=None):
144         self.seek(0)
145         if self.name.endswith('.bz2'):
146             dc = bz2.BZ2Decompressor()
147             return self.decompress(dc.decompress, size,
148                                    num_retries=num_retries)
149         elif self.name.endswith('.gz'):
150             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152                                    size, num_retries=num_retries)
153         else:
154             return self.readall(size, num_retries=num_retries)
155
156     @_FileLikeObjectBase._before_close
157     @retry_method
158     def readlines(self, sizehint=float('inf'), num_retries=None):
159         data = []
160         data_size = 0
161         for s in self.readall(num_retries=num_retries):
162             data.append(s)
163             data_size += len(s)
164             if data_size >= sizehint:
165                 break
166         return ''.join(data).splitlines(True)
167
168     def size(self):
169         raise NotImplementedError()
170
171     def read(self, size, num_retries=None):
172         raise NotImplementedError()
173
174     def readfrom(self, start, size, num_retries=None):
175         raise NotImplementedError()
176
177
178 class StreamFileReader(ArvadosFileReaderBase):
179     class _NameAttribute(str):
180         # The Python file API provides a plain .name attribute.
181         # Older SDK provided a name() method.
182         # This class provides both, for maximum compatibility.
183         def __call__(self):
184             return self
185
186     def __init__(self, stream, segments, name):
187         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188         self._stream = stream
189         self.segments = segments
190
191     def stream_name(self):
192         return self._stream.name()
193
194     def size(self):
195         n = self.segments[-1]
196         return n.range_start + n.range_size
197
198     @_FileLikeObjectBase._before_close
199     @retry_method
200     def read(self, size, num_retries=None):
201         """Read up to 'size' bytes from the stream, starting at the current file position"""
202         if size == 0:
203             return ''
204
205         data = ''
206         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
207         if available_chunks:
208             lr = available_chunks[0]
209             data = self._stream.readfrom(lr.locator+lr.segment_offset,
210                                           lr.segment_size,
211                                           num_retries=num_retries)
212
213         self._filepos += len(data)
214         return data
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def readfrom(self, start, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at 'start'"""
220         if size == 0:
221             return ''
222
223         data = []
224         for lr in locators_and_ranges(self.segments, start, size):
225             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226                                               num_retries=num_retries))
227         return ''.join(data)
228
229     def as_manifest(self):
230         segs = []
231         for r in self.segments:
232             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234
235
236 def synchronized(orig_func):
237     @functools.wraps(orig_func)
238     def synchronized_wrapper(self, *args, **kwargs):
239         with self.lock:
240             return orig_func(self, *args, **kwargs)
241     return synchronized_wrapper
242
243
244 class StateChangeError(Exception):
245     def __init__(self, message, state, nextstate):
246         super(StateChangeError, self).__init__(message)
247         self.state = state
248         self.nextstate = nextstate
249
250 class _BufferBlock(object):
251     """A stand-in for a Keep block that is in the process of being written.
252
253     Writers can append to it, get the size, and compute the Keep locator.
254     There are three valid states:
255
256     WRITABLE
257       Can append to block.
258
259     PENDING
260       Block is in the process of being uploaded to Keep, append is an error.
261
262     COMMITTED
263       The block has been written to Keep, its internal buffer has been
264       released, fetching the block will fetch it via keep client (since we
265       discarded the internal copy), and identifiers referring to the BufferBlock
266       can be replaced with the block locator.
267
268     """
269
270     WRITABLE = 0
271     PENDING = 1
272     COMMITTED = 2
273     ERROR = 3
274
275     def __init__(self, blockid, starting_capacity, owner):
276         """
277         :blockid:
278           the identifier for this block
279
280         :starting_capacity:
281           the initial buffer capacity
282
283         :owner:
284           ArvadosFile that owns this block
285
286         """
287         self.blockid = blockid
288         self.buffer_block = bytearray(starting_capacity)
289         self.buffer_view = memoryview(self.buffer_block)
290         self.write_pointer = 0
291         self._state = _BufferBlock.WRITABLE
292         self._locator = None
293         self.owner = owner
294         self.lock = threading.Lock()
295         self.wait_for_commit = threading.Event()
296         self.error = None
297
298     @synchronized
299     def append(self, data):
300         """Append some data to the buffer.
301
302         Only valid if the block is in WRITABLE state.  Implements an expanding
303         buffer, doubling capacity as needed to accomdate all the data.
304
305         """
306         if self._state == _BufferBlock.WRITABLE:
307             while (self.write_pointer+len(data)) > len(self.buffer_block):
308                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310                 self.buffer_block = new_buffer_block
311                 self.buffer_view = memoryview(self.buffer_block)
312             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313             self.write_pointer += len(data)
314             self._locator = None
315         else:
316             raise AssertionError("Buffer block is not writable")
317
318     STATE_TRANSITIONS = frozenset([
319             (WRITABLE, PENDING),
320             (PENDING, COMMITTED),
321             (PENDING, ERROR),
322             (ERROR, PENDING)])
323
324     @synchronized
325     def set_state(self, nextstate, val=None):
326         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328         self._state = nextstate
329
330         if self._state == _BufferBlock.PENDING:
331             self.wait_for_commit.clear()
332
333         if self._state == _BufferBlock.COMMITTED:
334             self._locator = val
335             self.buffer_view = None
336             self.buffer_block = None
337             self.wait_for_commit.set()
338
339         if self._state == _BufferBlock.ERROR:
340             self.error = val
341             self.wait_for_commit.set()
342
343     @synchronized
344     def state(self):
345         return self._state
346
347     def size(self):
348         """The amount of data written to the buffer."""
349         return self.write_pointer
350
351     @synchronized
352     def locator(self):
353         """The Keep locator for this buffer's contents."""
354         if self._locator is None:
355             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
356         return self._locator
357
358     @synchronized
359     def clone(self, new_blockid, owner):
360         if self._state == _BufferBlock.COMMITTED:
361             raise AssertionError("Cannot duplicate committed buffer block")
362         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363         bufferblock.append(self.buffer_view[0:self.size()])
364         return bufferblock
365
366     @synchronized
367     def clear(self):
368         self.owner = None
369         self.buffer_block = None
370         self.buffer_view = None
371
372
373 class NoopLock(object):
374     def __enter__(self):
375         return self
376
377     def __exit__(self, exc_type, exc_value, traceback):
378         pass
379
380     def acquire(self, blocking=False):
381         pass
382
383     def release(self):
384         pass
385
386
387 def must_be_writable(orig_func):
388     @functools.wraps(orig_func)
389     def must_be_writable_wrapper(self, *args, **kwargs):
390         if not self.writable():
391             raise IOError(errno.EROFS, "Collection is read-only.")
392         return orig_func(self, *args, **kwargs)
393     return must_be_writable_wrapper
394
395
396 class _BlockManager(object):
397     """BlockManager handles buffer blocks.
398
399     Also handles background block uploads, and background block prefetch for a
400     Collection of ArvadosFiles.
401
402     """
403
404     DEFAULT_PUT_THREADS = 2
405     DEFAULT_GET_THREADS = 2
406
407     def __init__(self, keep, copies=None):
408         """keep: KeepClient object to use"""
409         self._keep = keep
410         self._bufferblocks = collections.OrderedDict()
411         self._put_queue = None
412         self._put_threads = None
413         self._prefetch_queue = None
414         self._prefetch_threads = None
415         self.lock = threading.Lock()
416         self.prefetch_enabled = True
417         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
418         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
419         self.copies = copies
420
421     @synchronized
422     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
423         """Allocate a new, empty bufferblock in WRITABLE state and return it.
424
425         :blockid:
426           optional block identifier, otherwise one will be automatically assigned
427
428         :starting_capacity:
429           optional capacity, otherwise will use default capacity
430
431         :owner:
432           ArvadosFile that owns this block
433
434         """
435         if blockid is None:
436             blockid = "%s" % uuid.uuid4()
437         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
438         self._bufferblocks[bufferblock.blockid] = bufferblock
439         return bufferblock
440
441     @synchronized
442     def dup_block(self, block, owner):
443         """Create a new bufferblock initialized with the content of an existing bufferblock.
444
445         :block:
446           the buffer block to copy.
447
448         :owner:
449           ArvadosFile that owns the new block
450
451         """
452         new_blockid = "bufferblock%i" % len(self._bufferblocks)
453         bufferblock = block.clone(new_blockid, owner)
454         self._bufferblocks[bufferblock.blockid] = bufferblock
455         return bufferblock
456
457     @synchronized
458     def is_bufferblock(self, locator):
459         return locator in self._bufferblocks
460
461     def _commit_bufferblock_worker(self):
462         """Background uploader thread."""
463
464         while True:
465             try:
466                 bufferblock = self._put_queue.get()
467                 if bufferblock is None:
468                     return
469
470                 if self.copies is None:
471                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
472                 else:
473                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
474                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
475
476             except Exception as e:
477                 bufferblock.set_state(_BufferBlock.ERROR, e)
478             finally:
479                 if self._put_queue is not None:
480                     self._put_queue.task_done()
481
482     @synchronized
483     def start_put_threads(self):
484         if self._put_threads is None:
485             # Start uploader threads.
486
487             # If we don't limit the Queue size, the upload queue can quickly
488             # grow to take up gigabytes of RAM if the writing process is
489             # generating data more quickly than it can be send to the Keep
490             # servers.
491             #
492             # With two upload threads and a queue size of 2, this means up to 4
493             # blocks pending.  If they are full 64 MiB blocks, that means up to
494             # 256 MiB of internal buffering, which is the same size as the
495             # default download block cache in KeepClient.
496             self._put_queue = Queue.Queue(maxsize=2)
497
498             self._put_threads = []
499             for i in xrange(0, self.num_put_threads):
500                 thread = threading.Thread(target=self._commit_bufferblock_worker)
501                 self._put_threads.append(thread)
502                 thread.daemon = True
503                 thread.start()
504
505     def _block_prefetch_worker(self):
506         """The background downloader thread."""
507         while True:
508             try:
509                 b = self._prefetch_queue.get()
510                 if b is None:
511                     return
512                 self._keep.get(b)
513             except Exception:
514                 pass
515
516     @synchronized
517     def start_get_threads(self):
518         if self._prefetch_threads is None:
519             self._prefetch_queue = Queue.Queue()
520             self._prefetch_threads = []
521             for i in xrange(0, self.num_get_threads):
522                 thread = threading.Thread(target=self._block_prefetch_worker)
523                 self._prefetch_threads.append(thread)
524                 thread.daemon = True
525                 thread.start()
526
527
528     @synchronized
529     def stop_threads(self):
530         """Shut down and wait for background upload and download threads to finish."""
531
532         if self._put_threads is not None:
533             for t in self._put_threads:
534                 self._put_queue.put(None)
535             for t in self._put_threads:
536                 t.join()
537         self._put_threads = None
538         self._put_queue = None
539
540         if self._prefetch_threads is not None:
541             for t in self._prefetch_threads:
542                 self._prefetch_queue.put(None)
543             for t in self._prefetch_threads:
544                 t.join()
545         self._prefetch_threads = None
546         self._prefetch_queue = None
547
548     def __enter__(self):
549         return self
550
551     def __exit__(self, exc_type, exc_value, traceback):
552         self.stop_threads()
553
554     @synchronized
555     def repack_small_blocks(self, force=False, sync=False):
556         """Packs small blocks together before uploading"""
557         # Search blocks ready for getting packed together before being committed to Keep.
558         small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
559         if len(small_blocks) <= 1:
560             # Not enough small blocks for repacking
561             return
562
563         # Check if there are enough small blocks for filling up one in full
564         pending_write_size = sum([b.size() for b in small_blocks])
565         if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
566             new_bb = _BufferBlock("%s" % uuid.uuid4(), 2**14, None)
567             self._bufferblocks[new_bb.blockid] = new_bb
568             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
569                 bb = small_blocks.pop(0)
570                 arvfile = bb.owner
571                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
572                 arvfile.set_segments([Range(new_bb.blockid,
573                                             0,
574                                             bb.size(),
575                                             new_bb.write_pointer - bb.size())])
576                 bb.clear()
577                 del self._bufferblocks[bb.blockid]
578             self.commit_bufferblock(new_bb, sync=sync)
579
580     def commit_bufferblock(self, block, sync):
581         """Initiate a background upload of a bufferblock.
582
583         :block:
584           The block object to upload
585
586         :sync:
587           If `sync` is True, upload the block synchronously.
588           If `sync` is False, upload the block asynchronously.  This will
589           return immediately unless the upload queue is at capacity, in
590           which case it will wait on an upload queue slot.
591
592         """
593         try:
594             # Mark the block as PENDING so to disallow any more appends.
595             block.set_state(_BufferBlock.PENDING)
596         except StateChangeError as e:
597             if e.state == _BufferBlock.PENDING:
598                 if sync:
599                     block.wait_for_commit.wait()
600                 else:
601                     return
602             if block.state() == _BufferBlock.COMMITTED:
603                 return
604             elif block.state() == _BufferBlock.ERROR:
605                 raise block.error
606             else:
607                 raise
608
609         if sync:
610             try:
611                 if self.copies is None:
612                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
613                 else:
614                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
615                 block.set_state(_BufferBlock.COMMITTED, loc)
616             except Exception as e:
617                 block.set_state(_BufferBlock.ERROR, e)
618                 raise
619         else:
620             self.start_put_threads()
621             self._put_queue.put(block)
622
623     @synchronized
624     def get_bufferblock(self, locator):
625         return self._bufferblocks.get(locator)
626
627     @synchronized
628     def delete_bufferblock(self, locator):
629         bb = self._bufferblocks[locator]
630         bb.clear()
631         del self._bufferblocks[locator]
632
633     def get_block_contents(self, locator, num_retries, cache_only=False):
634         """Fetch a block.
635
636         First checks to see if the locator is a BufferBlock and return that, if
637         not, passes the request through to KeepClient.get().
638
639         """
640         with self.lock:
641             if locator in self._bufferblocks:
642                 bufferblock = self._bufferblocks[locator]
643                 if bufferblock.state() != _BufferBlock.COMMITTED:
644                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
645                 else:
646                     locator = bufferblock._locator
647         if cache_only:
648             return self._keep.get_from_cache(locator)
649         else:
650             return self._keep.get(locator, num_retries=num_retries)
651
652     def commit_all(self):
653         """Commit all outstanding buffer blocks.
654
655         This is a synchronous call, and will not return until all buffer blocks
656         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
657
658         """
659         self.repack_small_blocks(force=True, sync=True)
660
661         with self.lock:
662             items = self._bufferblocks.items()
663
664         for k,v in items:
665             if v.state() != _BufferBlock.COMMITTED and v.owner:
666                 v.owner.flush(sync=False)
667
668         with self.lock:
669             if self._put_queue is not None:
670                 self._put_queue.join()
671
672                 err = []
673                 for k,v in items:
674                     if v.state() == _BufferBlock.ERROR:
675                         err.append((v.locator(), v.error))
676                 if err:
677                     raise KeepWriteError("Error writing some blocks", err, label="block")
678
679         for k,v in items:
680             # flush again with sync=True to remove committed bufferblocks from
681             # the segments.
682             if v.owner:
683                 v.owner.flush(sync=True)
684
685     def block_prefetch(self, locator):
686         """Initiate a background download of a block.
687
688         This assumes that the underlying KeepClient implements a block cache,
689         so repeated requests for the same block will not result in repeated
690         downloads (unless the block is evicted from the cache.)  This method
691         does not block.
692
693         """
694
695         if not self.prefetch_enabled:
696             return
697
698         if self._keep.get_from_cache(locator) is not None:
699             return
700
701         with self.lock:
702             if locator in self._bufferblocks:
703                 return
704
705         self.start_get_threads()
706         self._prefetch_queue.put(locator)
707
708
709 class ArvadosFile(object):
710     """Represent a file in a Collection.
711
712     ArvadosFile manages the underlying representation of a file in Keep as a
713     sequence of segments spanning a set of blocks, and implements random
714     read/write access.
715
716     This object may be accessed from multiple threads.
717
718     """
719
720     def __init__(self, parent, name, stream=[], segments=[]):
721         """
722         ArvadosFile constructor.
723
724         :stream:
725           a list of Range objects representing a block stream
726
727         :segments:
728           a list of Range objects representing segments
729         """
730         self.parent = parent
731         self.name = name
732         self._writers = set()
733         self._committed = False
734         self._segments = []
735         self.lock = parent.root_collection().lock
736         for s in segments:
737             self._add_segment(stream, s.locator, s.range_size)
738         self._current_bblock = None
739
740     def writable(self):
741         return self.parent.writable()
742
743     @synchronized
744     def segments(self):
745         return copy.copy(self._segments)
746
747     @synchronized
748     def clone(self, new_parent, new_name):
749         """Make a copy of this file."""
750         cp = ArvadosFile(new_parent, new_name)
751         cp.replace_contents(self)
752         return cp
753
754     @must_be_writable
755     @synchronized
756     def replace_contents(self, other):
757         """Replace segments of this file with segments from another `ArvadosFile` object."""
758
759         map_loc = {}
760         self._segments = []
761         for other_segment in other.segments():
762             new_loc = other_segment.locator
763             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
764                 if other_segment.locator not in map_loc:
765                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
766                     if bufferblock.state() != _BufferBlock.WRITABLE:
767                         map_loc[other_segment.locator] = bufferblock.locator()
768                     else:
769                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
770                 new_loc = map_loc[other_segment.locator]
771
772             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
773
774         self._committed = False
775
776     def __eq__(self, other):
777         if other is self:
778             return True
779         if not isinstance(other, ArvadosFile):
780             return False
781
782         othersegs = other.segments()
783         with self.lock:
784             if len(self._segments) != len(othersegs):
785                 return False
786             for i in xrange(0, len(othersegs)):
787                 seg1 = self._segments[i]
788                 seg2 = othersegs[i]
789                 loc1 = seg1.locator
790                 loc2 = seg2.locator
791
792                 if self.parent._my_block_manager().is_bufferblock(loc1):
793                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
794
795                 if other.parent._my_block_manager().is_bufferblock(loc2):
796                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
797
798                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
799                     seg1.range_start != seg2.range_start or
800                     seg1.range_size != seg2.range_size or
801                     seg1.segment_offset != seg2.segment_offset):
802                     return False
803
804         return True
805
806     def __ne__(self, other):
807         return not self.__eq__(other)
808
809     @synchronized
810     def set_segments(self, segs):
811         self._segments = segs
812
813     @synchronized
814     def set_committed(self):
815         """Set committed flag to True"""
816         self._committed = True
817
818     @synchronized
819     def committed(self):
820         """Get whether this is committed or not."""
821         return self._committed
822
823     @synchronized
824     def add_writer(self, writer):
825         """Add an ArvadosFileWriter reference to the list of writers"""
826         if isinstance(writer, ArvadosFileWriter):
827             self._writers.add(writer)
828
829     @synchronized
830     def remove_writer(self, writer):
831         """
832         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
833         and do some block maintenance tasks.
834         """
835         self._writers.remove(writer)
836
837         if self.size() > config.KEEP_BLOCK_SIZE / 2:
838             # File writer closed, not small enough for repacking
839             self.flush()
840         elif self.closed():
841             # All writers closed and size is adequate for repacking
842             self.parent._my_block_manager().repack_small_blocks()
843
844     def closed(self):
845         """
846         Get whether this is closed or not. When the writers list is empty, the file
847         is supposed to be closed.
848         """
849         return len(self._writers) == 0
850
851     @must_be_writable
852     @synchronized
853     def truncate(self, size):
854         """Shrink the size of the file.
855
856         If `size` is less than the size of the file, the file contents after
857         `size` will be discarded.  If `size` is greater than the current size
858         of the file, an IOError will be raised.
859
860         """
861         if size < self.size():
862             new_segs = []
863             for r in self._segments:
864                 range_end = r.range_start+r.range_size
865                 if r.range_start >= size:
866                     # segment is past the trucate size, all done
867                     break
868                 elif size < range_end:
869                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
870                     nr.segment_offset = r.segment_offset
871                     new_segs.append(nr)
872                     break
873                 else:
874                     new_segs.append(r)
875
876             self._segments = new_segs
877             self._committed = False
878         elif size > self.size():
879             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
880
881     def readfrom(self, offset, size, num_retries, exact=False):
882         """Read up to `size` bytes from the file starting at `offset`.
883
884         :exact:
885          If False (default), return less data than requested if the read
886          crosses a block boundary and the next block isn't cached.  If True,
887          only return less data than requested when hitting EOF.
888         """
889
890         with self.lock:
891             if size == 0 or offset >= self.size():
892                 return ''
893             readsegs = locators_and_ranges(self._segments, offset, size)
894             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
895
896         locs = set()
897         data = []
898         for lr in readsegs:
899             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
900             if block:
901                 blockview = memoryview(block)
902                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
903                 locs.add(lr.locator)
904             else:
905                 break
906
907         for lr in prefetch:
908             if lr.locator not in locs:
909                 self.parent._my_block_manager().block_prefetch(lr.locator)
910                 locs.add(lr.locator)
911
912         return ''.join(data)
913
914     def _repack_writes(self, num_retries):
915         """Test if the buffer block has more data than actual segments.
916
917         This happens when a buffered write over-writes a file range written in
918         a previous buffered write.  Re-pack the buffer block for efficiency
919         and to avoid leaking information.
920
921         """
922         segs = self._segments
923
924         # Sum up the segments to get the total bytes of the file referencing
925         # into the buffer block.
926         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
927         write_total = sum([s.range_size for s in bufferblock_segs])
928
929         if write_total < self._current_bblock.size():
930             # There is more data in the buffer block than is actually accounted for by segments, so
931             # re-pack into a new buffer by copying over to a new buffer block.
932             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
933             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
934             for t in bufferblock_segs:
935                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
936                 t.segment_offset = new_bb.size() - t.range_size
937
938             self._current_bblock = new_bb
939
940     @must_be_writable
941     @synchronized
942     def writeto(self, offset, data, num_retries):
943         """Write `data` to the file starting at `offset`.
944
945         This will update existing bytes and/or extend the size of the file as
946         necessary.
947
948         """
949         if len(data) == 0:
950             return
951
952         if offset > self.size():
953             raise ArgumentError("Offset is past the end of the file")
954
955         if len(data) > config.KEEP_BLOCK_SIZE:
956             # Chunk it up into smaller writes
957             n = 0
958             dataview = memoryview(data)
959             while n < len(data):
960                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
961                 n += config.KEEP_BLOCK_SIZE
962             return
963
964         self._committed = False
965
966         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
967             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
968
969         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
970             self._repack_writes(num_retries)
971             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
972                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
973                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
974
975         self._current_bblock.append(data)
976
977         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
978
979         self.parent.notify(WRITE, self.parent, self.name, (self, self))
980
981         return len(data)
982
983     @synchronized
984     def flush(self, sync=True, num_retries=0):
985         """Flush the current bufferblock to Keep.
986
987         :sync:
988           If True, commit block synchronously, wait until buffer block has been written.
989           If False, commit block asynchronously, return immediately after putting block into
990           the keep put queue.
991         """
992         if self.committed():
993             return
994
995         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
996             if self._current_bblock.state() == _BufferBlock.WRITABLE:
997                 self._repack_writes(num_retries)
998             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
999
1000         if sync:
1001             to_delete = set()
1002             for s in self._segments:
1003                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1004                 if bb:
1005                     if bb.state() != _BufferBlock.COMMITTED:
1006                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1007                     to_delete.add(s.locator)
1008                     s.locator = bb.locator()
1009             for s in to_delete:
1010                self.parent._my_block_manager().delete_bufferblock(s)
1011
1012         self.parent.notify(MOD, self.parent, self.name, (self, self))
1013
1014     @must_be_writable
1015     @synchronized
1016     def add_segment(self, blocks, pos, size):
1017         """Add a segment to the end of the file.
1018
1019         `pos` and `offset` reference a section of the stream described by
1020         `blocks` (a list of Range objects)
1021
1022         """
1023         self._add_segment(blocks, pos, size)
1024
1025     def _add_segment(self, blocks, pos, size):
1026         """Internal implementation of add_segment."""
1027         self._committed = False
1028         for lr in locators_and_ranges(blocks, pos, size):
1029             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1030             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1031             self._segments.append(r)
1032
1033     @synchronized
1034     def size(self):
1035         """Get the file size."""
1036         if self._segments:
1037             n = self._segments[-1]
1038             return n.range_start + n.range_size
1039         else:
1040             return 0
1041
1042     @synchronized
1043     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1044         buf = ""
1045         filestream = []
1046         for segment in self.segments:
1047             loc = segment.locator
1048             if loc.startswith("bufferblock"):
1049                 loc = self._bufferblocks[loc].calculate_locator()
1050             if portable_locators:
1051                 loc = KeepLocator(loc).stripped()
1052             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1053                                  segment.segment_offset, segment.range_size))
1054         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1055         buf += "\n"
1056         return buf
1057
1058     @must_be_writable
1059     @synchronized
1060     def _reparent(self, newparent, newname):
1061         self._committed = False
1062         self.flush(sync=True)
1063         self.parent.remove(self.name)
1064         self.parent = newparent
1065         self.name = newname
1066         self.lock = self.parent.root_collection().lock
1067
1068
1069 class ArvadosFileReader(ArvadosFileReaderBase):
1070     """Wraps ArvadosFile in a file-like object supporting reading only.
1071
1072     Be aware that this class is NOT thread safe as there is no locking around
1073     updating file pointer.
1074
1075     """
1076
1077     def __init__(self, arvadosfile, num_retries=None):
1078         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1079         self.arvadosfile = arvadosfile
1080
1081     def size(self):
1082         return self.arvadosfile.size()
1083
1084     def stream_name(self):
1085         return self.arvadosfile.parent.stream_name()
1086
1087     @_FileLikeObjectBase._before_close
1088     @retry_method
1089     def read(self, size=None, num_retries=None):
1090         """Read up to `size` bytes from the file and return the result.
1091
1092         Starts at the current file position.  If `size` is None, read the
1093         entire remainder of the file.
1094         """
1095         if size is None:
1096             data = []
1097             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1098             while rd:
1099                 data.append(rd)
1100                 self._filepos += len(rd)
1101                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1102             return ''.join(data)
1103         else:
1104             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1105             self._filepos += len(data)
1106             return data
1107
1108     @_FileLikeObjectBase._before_close
1109     @retry_method
1110     def readfrom(self, offset, size, num_retries=None):
1111         """Read up to `size` bytes from the stream, starting at the specified file offset.
1112
1113         This method does not change the file position.
1114         """
1115         return self.arvadosfile.readfrom(offset, size, num_retries)
1116
1117     def flush(self):
1118         pass
1119
1120
1121 class ArvadosFileWriter(ArvadosFileReader):
1122     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1123
1124     Be aware that this class is NOT thread safe as there is no locking around
1125     updating file pointer.
1126
1127     """
1128
1129     def __init__(self, arvadosfile, mode, num_retries=None):
1130         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1131         self.mode = mode
1132         self.arvadosfile.add_writer(self)
1133
1134     @_FileLikeObjectBase._before_close
1135     @retry_method
1136     def write(self, data, num_retries=None):
1137         if self.mode[0] == "a":
1138             self.arvadosfile.writeto(self.size(), data, num_retries)
1139         else:
1140             self.arvadosfile.writeto(self._filepos, data, num_retries)
1141             self._filepos += len(data)
1142         return len(data)
1143
1144     @_FileLikeObjectBase._before_close
1145     @retry_method
1146     def writelines(self, seq, num_retries=None):
1147         for s in seq:
1148             self.write(s, num_retries=num_retries)
1149
1150     @_FileLikeObjectBase._before_close
1151     def truncate(self, size=None):
1152         if size is None:
1153             size = self._filepos
1154         self.arvadosfile.truncate(size)
1155         if self._filepos > self.size():
1156             self._filepos = self.size()
1157
1158     @_FileLikeObjectBase._before_close
1159     def flush(self):
1160         self.arvadosfile.flush()
1161
1162     def close(self):
1163         if not self.closed:
1164             self.arvadosfile.remove_writer(self)
1165             super(ArvadosFileWriter, self).close()