9701: Simplifying small bufferblock query
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21 WRITE = "write"
22
23 _logger = logging.getLogger('arvados.arvfile')
24
25 def split(path):
26     """split(path) -> streamname, filename
27
28     Separate the stream name and file name in a /-separated stream path and
29     return a tuple (stream_name, file_name).  If no stream name is available,
30     assume '.'.
31
32     """
33     try:
34         stream_name, file_name = path.rsplit('/', 1)
35     except ValueError:  # No / in string
36         stream_name, file_name = '.', path
37     return stream_name, file_name
38
39 class _FileLikeObjectBase(object):
40     def __init__(self, name, mode):
41         self.name = name
42         self.mode = mode
43         self.closed = False
44
45     @staticmethod
46     def _before_close(orig_func):
47         @functools.wraps(orig_func)
48         def before_close_wrapper(self, *args, **kwargs):
49             if self.closed:
50                 raise ValueError("I/O operation on closed stream file")
51             return orig_func(self, *args, **kwargs)
52         return before_close_wrapper
53
54     def __enter__(self):
55         return self
56
57     def __exit__(self, exc_type, exc_value, traceback):
58         try:
59             self.close()
60         except Exception:
61             if exc_type is None:
62                 raise
63
64     def close(self):
65         self.closed = True
66
67
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69     def __init__(self, name, mode, num_retries=None):
70         super(ArvadosFileReaderBase, self).__init__(name, mode)
71         self._filepos = 0L
72         self.num_retries = num_retries
73         self._readline_cache = (None, None)
74
75     def __iter__(self):
76         while True:
77             data = self.readline()
78             if not data:
79                 break
80             yield data
81
82     def decompressed_name(self):
83         return re.sub('\.(bz2|gz)$', '', self.name)
84
85     @_FileLikeObjectBase._before_close
86     def seek(self, pos, whence=os.SEEK_SET):
87         if whence == os.SEEK_CUR:
88             pos += self._filepos
89         elif whence == os.SEEK_END:
90             pos += self.size()
91         self._filepos = min(max(pos, 0L), self.size())
92
93     def tell(self):
94         return self._filepos
95
96     @_FileLikeObjectBase._before_close
97     @retry_method
98     def readall(self, size=2**20, num_retries=None):
99         while True:
100             data = self.read(size, num_retries=num_retries)
101             if data == '':
102                 break
103             yield data
104
105     @_FileLikeObjectBase._before_close
106     @retry_method
107     def readline(self, size=float('inf'), num_retries=None):
108         cache_pos, cache_data = self._readline_cache
109         if self.tell() == cache_pos:
110             data = [cache_data]
111             self._filepos += len(cache_data)
112         else:
113             data = ['']
114         data_size = len(data[-1])
115         while (data_size < size) and ('\n' not in data[-1]):
116             next_read = self.read(2 ** 20, num_retries=num_retries)
117             if not next_read:
118                 break
119             data.append(next_read)
120             data_size += len(next_read)
121         data = ''.join(data)
122         try:
123             nextline_index = data.index('\n') + 1
124         except ValueError:
125             nextline_index = len(data)
126         nextline_index = min(nextline_index, size)
127         self._filepos -= len(data) - nextline_index
128         self._readline_cache = (self.tell(), data[nextline_index:])
129         return data[:nextline_index]
130
131     @_FileLikeObjectBase._before_close
132     @retry_method
133     def decompress(self, decompress, size, num_retries=None):
134         for segment in self.readall(size, num_retries=num_retries):
135             data = decompress(segment)
136             if data:
137                 yield data
138
139     @_FileLikeObjectBase._before_close
140     @retry_method
141     def readall_decompressed(self, size=2**20, num_retries=None):
142         self.seek(0)
143         if self.name.endswith('.bz2'):
144             dc = bz2.BZ2Decompressor()
145             return self.decompress(dc.decompress, size,
146                                    num_retries=num_retries)
147         elif self.name.endswith('.gz'):
148             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
149             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
150                                    size, num_retries=num_retries)
151         else:
152             return self.readall(size, num_retries=num_retries)
153
154     @_FileLikeObjectBase._before_close
155     @retry_method
156     def readlines(self, sizehint=float('inf'), num_retries=None):
157         data = []
158         data_size = 0
159         for s in self.readall(num_retries=num_retries):
160             data.append(s)
161             data_size += len(s)
162             if data_size >= sizehint:
163                 break
164         return ''.join(data).splitlines(True)
165
166     def size(self):
167         raise NotImplementedError()
168
169     def read(self, size, num_retries=None):
170         raise NotImplementedError()
171
172     def readfrom(self, start, size, num_retries=None):
173         raise NotImplementedError()
174
175
176 class StreamFileReader(ArvadosFileReaderBase):
177     class _NameAttribute(str):
178         # The Python file API provides a plain .name attribute.
179         # Older SDK provided a name() method.
180         # This class provides both, for maximum compatibility.
181         def __call__(self):
182             return self
183
184     def __init__(self, stream, segments, name):
185         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
186         self._stream = stream
187         self.segments = segments
188
189     def stream_name(self):
190         return self._stream.name()
191
192     def size(self):
193         n = self.segments[-1]
194         return n.range_start + n.range_size
195
196     @_FileLikeObjectBase._before_close
197     @retry_method
198     def read(self, size, num_retries=None):
199         """Read up to 'size' bytes from the stream, starting at the current file position"""
200         if size == 0:
201             return ''
202
203         data = ''
204         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
205         if available_chunks:
206             lr = available_chunks[0]
207             data = self._stream.readfrom(lr.locator+lr.segment_offset,
208                                           lr.segment_size,
209                                           num_retries=num_retries)
210
211         self._filepos += len(data)
212         return data
213
214     @_FileLikeObjectBase._before_close
215     @retry_method
216     def readfrom(self, start, size, num_retries=None):
217         """Read up to 'size' bytes from the stream, starting at 'start'"""
218         if size == 0:
219             return ''
220
221         data = []
222         for lr in locators_and_ranges(self.segments, start, size):
223             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
224                                               num_retries=num_retries))
225         return ''.join(data)
226
227     def as_manifest(self):
228         segs = []
229         for r in self.segments:
230             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
231         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
232
233
234 def synchronized(orig_func):
235     @functools.wraps(orig_func)
236     def synchronized_wrapper(self, *args, **kwargs):
237         with self.lock:
238             return orig_func(self, *args, **kwargs)
239     return synchronized_wrapper
240
241
242 class StateChangeError(Exception):
243     def __init__(self, message, state, nextstate):
244         super(StateChangeError, self).__init__(message)
245         self.state = state
246         self.nextstate = nextstate
247
248 class _BufferBlock(object):
249     """A stand-in for a Keep block that is in the process of being written.
250
251     Writers can append to it, get the size, and compute the Keep locator.
252     There are three valid states:
253
254     WRITABLE
255       Can append to block.
256
257     PENDING
258       Block is in the process of being uploaded to Keep, append is an error.
259
260     COMMITTED
261       The block has been written to Keep, its internal buffer has been
262       released, fetching the block will fetch it via keep client (since we
263       discarded the internal copy), and identifiers referring to the BufferBlock
264       can be replaced with the block locator.
265
266     """
267
268     WRITABLE = 0
269     PENDING = 1
270     COMMITTED = 2
271     ERROR = 3
272
273     def __init__(self, blockid, starting_capacity, owner):
274         """
275         :blockid:
276           the identifier for this block
277
278         :starting_capacity:
279           the initial buffer capacity
280
281         :owner:
282           ArvadosFile that owns this block
283
284         """
285         self.blockid = blockid
286         self.buffer_block = bytearray(starting_capacity)
287         self.buffer_view = memoryview(self.buffer_block)
288         self.write_pointer = 0
289         self._state = _BufferBlock.WRITABLE
290         self._locator = None
291         self.owner = owner
292         self.lock = threading.Lock()
293         self.wait_for_commit = threading.Event()
294         self.error = None
295
296     @synchronized
297     def append(self, data):
298         """Append some data to the buffer.
299
300         Only valid if the block is in WRITABLE state.  Implements an expanding
301         buffer, doubling capacity as needed to accomdate all the data.
302
303         """
304         if self._state == _BufferBlock.WRITABLE:
305             while (self.write_pointer+len(data)) > len(self.buffer_block):
306                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
307                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
308                 self.buffer_block = new_buffer_block
309                 self.buffer_view = memoryview(self.buffer_block)
310             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
311             self.write_pointer += len(data)
312             self._locator = None
313         else:
314             raise AssertionError("Buffer block is not writable")
315
316     STATE_TRANSITIONS = frozenset([
317             (WRITABLE, PENDING),
318             (PENDING, COMMITTED),
319             (PENDING, ERROR),
320             (ERROR, PENDING)])
321
322     @synchronized
323     def set_state(self, nextstate, val=None):
324         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
325             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
326         self._state = nextstate
327
328         if self._state == _BufferBlock.PENDING:
329             self.wait_for_commit.clear()
330
331         if self._state == _BufferBlock.COMMITTED:
332             self._locator = val
333             self.buffer_view = None
334             self.buffer_block = None
335             self.wait_for_commit.set()
336
337         if self._state == _BufferBlock.ERROR:
338             self.error = val
339             self.wait_for_commit.set()
340
341     @synchronized
342     def state(self):
343         return self._state
344
345     def size(self):
346         """The amount of data written to the buffer."""
347         return self.write_pointer
348
349     @synchronized
350     def locator(self):
351         """The Keep locator for this buffer's contents."""
352         if self._locator is None:
353             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
354         return self._locator
355
356     @synchronized
357     def clone(self, new_blockid, owner):
358         if self._state == _BufferBlock.COMMITTED:
359             raise AssertionError("Cannot duplicate committed buffer block")
360         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
361         bufferblock.append(self.buffer_view[0:self.size()])
362         return bufferblock
363
364     @synchronized
365     def clear(self):
366         self.owner = None
367         self.buffer_block = None
368         self.buffer_view = None
369
370
371 class NoopLock(object):
372     def __enter__(self):
373         return self
374
375     def __exit__(self, exc_type, exc_value, traceback):
376         pass
377
378     def acquire(self, blocking=False):
379         pass
380
381     def release(self):
382         pass
383
384
385 def must_be_writable(orig_func):
386     @functools.wraps(orig_func)
387     def must_be_writable_wrapper(self, *args, **kwargs):
388         if not self.writable():
389             raise IOError(errno.EROFS, "Collection is read-only.")
390         return orig_func(self, *args, **kwargs)
391     return must_be_writable_wrapper
392
393
394 class _BlockManager(object):
395     """BlockManager handles buffer blocks.
396
397     Also handles background block uploads, and background block prefetch for a
398     Collection of ArvadosFiles.
399
400     """
401
402     DEFAULT_PUT_THREADS = 2
403     DEFAULT_GET_THREADS = 2
404
405     def __init__(self, keep, copies=None):
406         """keep: KeepClient object to use"""
407         self._keep = keep
408         self._bufferblocks = {}
409         self._put_queue = None
410         self._put_threads = None
411         self._prefetch_queue = None
412         self._prefetch_threads = None
413         self.lock = threading.Lock()
414         self.prefetch_enabled = True
415         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
416         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
417         self.copies = copies
418
419     @synchronized
420     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
421         """Allocate a new, empty bufferblock in WRITABLE state and return it.
422
423         :blockid:
424           optional block identifier, otherwise one will be automatically assigned
425
426         :starting_capacity:
427           optional capacity, otherwise will use default capacity
428
429         :owner:
430           ArvadosFile that owns this block
431
432         """
433         if blockid is None:
434             blockid = "bufferblock%i" % len(self._bufferblocks)
435         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
436         self._bufferblocks[bufferblock.blockid] = bufferblock
437         return bufferblock
438
439     @synchronized
440     def dup_block(self, block, owner):
441         """Create a new bufferblock initialized with the content of an existing bufferblock.
442
443         :block:
444           the buffer block to copy.
445
446         :owner:
447           ArvadosFile that owns the new block
448
449         """
450         new_blockid = "bufferblock%i" % len(self._bufferblocks)
451         bufferblock = block.clone(new_blockid, owner)
452         self._bufferblocks[bufferblock.blockid] = bufferblock
453         return bufferblock
454
455     @synchronized
456     def is_bufferblock(self, locator):
457         return locator in self._bufferblocks
458
459     def _commit_bufferblock_worker(self):
460         """Background uploader thread."""
461
462         while True:
463             try:
464                 bufferblock = self._put_queue.get()
465                 if bufferblock is None:
466                     return
467
468                 if self.copies is None:
469                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
470                 else:
471                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
472                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
473
474             except Exception as e:
475                 bufferblock.set_state(_BufferBlock.ERROR, e)
476             finally:
477                 if self._put_queue is not None:
478                     self._put_queue.task_done()
479
480     @synchronized
481     def start_put_threads(self):
482         if self._put_threads is None:
483             # Start uploader threads.
484
485             # If we don't limit the Queue size, the upload queue can quickly
486             # grow to take up gigabytes of RAM if the writing process is
487             # generating data more quickly than it can be send to the Keep
488             # servers.
489             #
490             # With two upload threads and a queue size of 2, this means up to 4
491             # blocks pending.  If they are full 64 MiB blocks, that means up to
492             # 256 MiB of internal buffering, which is the same size as the
493             # default download block cache in KeepClient.
494             self._put_queue = Queue.Queue(maxsize=2)
495
496             self._put_threads = []
497             for i in xrange(0, self.num_put_threads):
498                 thread = threading.Thread(target=self._commit_bufferblock_worker)
499                 self._put_threads.append(thread)
500                 thread.daemon = True
501                 thread.start()
502
503     def _block_prefetch_worker(self):
504         """The background downloader thread."""
505         while True:
506             try:
507                 b = self._prefetch_queue.get()
508                 if b is None:
509                     return
510                 self._keep.get(b)
511             except Exception:
512                 pass
513
514     @synchronized
515     def start_get_threads(self):
516         if self._prefetch_threads is None:
517             self._prefetch_queue = Queue.Queue()
518             self._prefetch_threads = []
519             for i in xrange(0, self.num_get_threads):
520                 thread = threading.Thread(target=self._block_prefetch_worker)
521                 self._prefetch_threads.append(thread)
522                 thread.daemon = True
523                 thread.start()
524
525
526     @synchronized
527     def stop_threads(self):
528         """Shut down and wait for background upload and download threads to finish."""
529
530         if self._put_threads is not None:
531             for t in self._put_threads:
532                 self._put_queue.put(None)
533             for t in self._put_threads:
534                 t.join()
535         self._put_threads = None
536         self._put_queue = None
537
538         if self._prefetch_threads is not None:
539             for t in self._prefetch_threads:
540                 self._prefetch_queue.put(None)
541             for t in self._prefetch_threads:
542                 t.join()
543         self._prefetch_threads = None
544         self._prefetch_queue = None
545
546     def __enter__(self):
547         return self
548
549     def __exit__(self, exc_type, exc_value, traceback):
550         self.stop_threads()
551
552     @synchronized
553     def repack_small_blocks(self, force=False, sync=False):
554         """Packs small blocks together before uploading"""
555         # Search blocks ready for getting packed together before being committed to Keep.
556         small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
557         if len(small_blocks) <= 1:
558             # Not enough small blocks for repacking
559             return
560
561         # Check if there are enough small blocks for filling up one in full
562         pending_write_size = sum([b.size() for b in small_blocks])
563         if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
564             new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
565             self._bufferblocks[new_bb.blockid] = new_bb
566             size = 0
567             while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
568                 bb = small_blocks.pop(0)
569                 size += bb.size()
570                 arvfile = bb.owner
571                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
572                 arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())])
573                 bb.clear()
574                 del self._bufferblocks[bb.blockid]
575             self.commit_bufferblock(new_bb, sync=sync)
576
577     def commit_bufferblock(self, block, sync):
578         """Initiate a background upload of a bufferblock.
579
580         :block:
581           The block object to upload
582
583         :sync:
584           If `sync` is True, upload the block synchronously.
585           If `sync` is False, upload the block asynchronously.  This will
586           return immediately unless the upload queue is at capacity, in
587           which case it will wait on an upload queue slot.
588
589         """
590         try:
591             # Mark the block as PENDING so to disallow any more appends.
592             block.set_state(_BufferBlock.PENDING)
593         except StateChangeError as e:
594             if e.state == _BufferBlock.PENDING:
595                 if sync:
596                     block.wait_for_commit.wait()
597                 else:
598                     return
599             if block.state() == _BufferBlock.COMMITTED:
600                 return
601             elif block.state() == _BufferBlock.ERROR:
602                 raise block.error
603             else:
604                 raise
605
606         if sync:
607             try:
608                 if self.copies is None:
609                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
610                 else:
611                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
612                 block.set_state(_BufferBlock.COMMITTED, loc)
613             except Exception as e:
614                 block.set_state(_BufferBlock.ERROR, e)
615                 raise
616         else:
617             self.start_put_threads()
618             self._put_queue.put(block)
619
620     @synchronized
621     def get_bufferblock(self, locator):
622         return self._bufferblocks.get(locator)
623
624     @synchronized
625     def delete_bufferblock(self, locator):
626         bb = self._bufferblocks[locator]
627         bb.clear()
628         del self._bufferblocks[locator]
629
630     def get_block_contents(self, locator, num_retries, cache_only=False):
631         """Fetch a block.
632
633         First checks to see if the locator is a BufferBlock and return that, if
634         not, passes the request through to KeepClient.get().
635
636         """
637         with self.lock:
638             if locator in self._bufferblocks:
639                 bufferblock = self._bufferblocks[locator]
640                 if bufferblock.state() != _BufferBlock.COMMITTED:
641                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
642                 else:
643                     locator = bufferblock._locator
644         if cache_only:
645             return self._keep.get_from_cache(locator)
646         else:
647             return self._keep.get(locator, num_retries=num_retries)
648
649     def commit_all(self):
650         """Commit all outstanding buffer blocks.
651
652         This is a synchronous call, and will not return until all buffer blocks
653         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
654
655         """
656         self.repack_small_blocks(force=True, sync=True)
657
658         with self.lock:
659             items = self._bufferblocks.items()
660
661         for k,v in items:
662             if v.state() != _BufferBlock.COMMITTED and v.owner:
663                 v.owner.flush(sync=False)
664
665         with self.lock:
666             if self._put_queue is not None:
667                 self._put_queue.join()
668
669                 err = []
670                 for k,v in items:
671                     if v.state() == _BufferBlock.ERROR:
672                         err.append((v.locator(), v.error))
673                 if err:
674                     raise KeepWriteError("Error writing some blocks", err, label="block")
675
676         for k,v in items:
677             # flush again with sync=True to remove committed bufferblocks from
678             # the segments.
679             if v.owner:
680                 v.owner.flush(sync=True)
681
682     def block_prefetch(self, locator):
683         """Initiate a background download of a block.
684
685         This assumes that the underlying KeepClient implements a block cache,
686         so repeated requests for the same block will not result in repeated
687         downloads (unless the block is evicted from the cache.)  This method
688         does not block.
689
690         """
691
692         if not self.prefetch_enabled:
693             return
694
695         if self._keep.get_from_cache(locator) is not None:
696             return
697
698         with self.lock:
699             if locator in self._bufferblocks:
700                 return
701
702         self.start_get_threads()
703         self._prefetch_queue.put(locator)
704
705
706 class ArvadosFile(object):
707     """Represent a file in a Collection.
708
709     ArvadosFile manages the underlying representation of a file in Keep as a
710     sequence of segments spanning a set of blocks, and implements random
711     read/write access.
712
713     This object may be accessed from multiple threads.
714
715     """
716
717     def __init__(self, parent, name, stream=[], segments=[]):
718         """
719         ArvadosFile constructor.
720
721         :stream:
722           a list of Range objects representing a block stream
723
724         :segments:
725           a list of Range objects representing segments
726         """
727         self.parent = parent
728         self.name = name
729         self._writers = set()
730         self._committed = False
731         self._segments = []
732         self.lock = parent.root_collection().lock
733         for s in segments:
734             self._add_segment(stream, s.locator, s.range_size)
735         self._current_bblock = None
736
737     def writable(self):
738         return self.parent.writable()
739
740     @synchronized
741     def segments(self):
742         return copy.copy(self._segments)
743
744     @synchronized
745     def clone(self, new_parent, new_name):
746         """Make a copy of this file."""
747         cp = ArvadosFile(new_parent, new_name)
748         cp.replace_contents(self)
749         return cp
750
751     @must_be_writable
752     @synchronized
753     def replace_contents(self, other):
754         """Replace segments of this file with segments from another `ArvadosFile` object."""
755
756         map_loc = {}
757         self._segments = []
758         for other_segment in other.segments():
759             new_loc = other_segment.locator
760             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
761                 if other_segment.locator not in map_loc:
762                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
763                     if bufferblock.state() != _BufferBlock.WRITABLE:
764                         map_loc[other_segment.locator] = bufferblock.locator()
765                     else:
766                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
767                 new_loc = map_loc[other_segment.locator]
768
769             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
770
771         self._committed = False
772
773     def __eq__(self, other):
774         if other is self:
775             return True
776         if not isinstance(other, ArvadosFile):
777             return False
778
779         othersegs = other.segments()
780         with self.lock:
781             if len(self._segments) != len(othersegs):
782                 return False
783             for i in xrange(0, len(othersegs)):
784                 seg1 = self._segments[i]
785                 seg2 = othersegs[i]
786                 loc1 = seg1.locator
787                 loc2 = seg2.locator
788
789                 if self.parent._my_block_manager().is_bufferblock(loc1):
790                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
791
792                 if other.parent._my_block_manager().is_bufferblock(loc2):
793                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
794
795                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
796                     seg1.range_start != seg2.range_start or
797                     seg1.range_size != seg2.range_size or
798                     seg1.segment_offset != seg2.segment_offset):
799                     return False
800
801         return True
802
803     def __ne__(self, other):
804         return not self.__eq__(other)
805
806     @synchronized
807     def set_segments(self, segs):
808         self._segments = segs
809
810     @synchronized
811     def set_committed(self):
812         """Set committed flag to True"""
813         self._committed = True
814
815     @synchronized
816     def committed(self):
817         """Get whether this is committed or not."""
818         return self._committed
819
820     @synchronized
821     def add_writer(self, writer):
822         """Add an ArvadosFileWriter reference to the list of writers"""
823         if isinstance(writer, ArvadosFileWriter):
824             self._writers.add(writer)
825
826     @synchronized
827     def remove_writer(self, writer):
828         """
829         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
830         and do some block maintenance tasks.
831         """
832         self._writers.remove(writer)
833
834         if self.size() > config.KEEP_BLOCK_SIZE / 2:
835             # File writer closed, not small enough for repacking
836             self.flush()
837         elif self.closed():
838             # All writers closed and size is adequate for repacking
839             self.parent._my_block_manager().repack_small_blocks()
840
841     def closed(self):
842         """
843         Get whether this is closed or not. When the writers list is empty, the file
844         is supposed to be closed.
845         """
846         return len(self._writers) == 0
847
848     @must_be_writable
849     @synchronized
850     def truncate(self, size):
851         """Shrink the size of the file.
852
853         If `size` is less than the size of the file, the file contents after
854         `size` will be discarded.  If `size` is greater than the current size
855         of the file, an IOError will be raised.
856
857         """
858         if size < self.size():
859             new_segs = []
860             for r in self._segments:
861                 range_end = r.range_start+r.range_size
862                 if r.range_start >= size:
863                     # segment is past the trucate size, all done
864                     break
865                 elif size < range_end:
866                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
867                     nr.segment_offset = r.segment_offset
868                     new_segs.append(nr)
869                     break
870                 else:
871                     new_segs.append(r)
872
873             self._segments = new_segs
874             self._committed = False
875         elif size > self.size():
876             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
877
878     def readfrom(self, offset, size, num_retries, exact=False):
879         """Read up to `size` bytes from the file starting at `offset`.
880
881         :exact:
882          If False (default), return less data than requested if the read
883          crosses a block boundary and the next block isn't cached.  If True,
884          only return less data than requested when hitting EOF.
885         """
886
887         with self.lock:
888             if size == 0 or offset >= self.size():
889                 return ''
890             readsegs = locators_and_ranges(self._segments, offset, size)
891             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
892
893         locs = set()
894         data = []
895         for lr in readsegs:
896             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
897             if block:
898                 blockview = memoryview(block)
899                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
900                 locs.add(lr.locator)
901             else:
902                 break
903
904         for lr in prefetch:
905             if lr.locator not in locs:
906                 self.parent._my_block_manager().block_prefetch(lr.locator)
907                 locs.add(lr.locator)
908
909         return ''.join(data)
910
911     def _repack_writes(self, num_retries):
912         """Test if the buffer block has more data than actual segments.
913
914         This happens when a buffered write over-writes a file range written in
915         a previous buffered write.  Re-pack the buffer block for efficiency
916         and to avoid leaking information.
917
918         """
919         segs = self._segments
920
921         # Sum up the segments to get the total bytes of the file referencing
922         # into the buffer block.
923         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
924         write_total = sum([s.range_size for s in bufferblock_segs])
925
926         if write_total < self._current_bblock.size():
927             # There is more data in the buffer block than is actually accounted for by segments, so
928             # re-pack into a new buffer by copying over to a new buffer block.
929             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
930             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
931             for t in bufferblock_segs:
932                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
933                 t.segment_offset = new_bb.size() - t.range_size
934
935             self._current_bblock = new_bb
936
937     @must_be_writable
938     @synchronized
939     def writeto(self, offset, data, num_retries):
940         """Write `data` to the file starting at `offset`.
941
942         This will update existing bytes and/or extend the size of the file as
943         necessary.
944
945         """
946         if len(data) == 0:
947             return
948
949         if offset > self.size():
950             raise ArgumentError("Offset is past the end of the file")
951
952         if len(data) > config.KEEP_BLOCK_SIZE:
953             # Chunk it up into smaller writes
954             n = 0
955             dataview = memoryview(data)
956             while n < len(data):
957                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
958                 n += config.KEEP_BLOCK_SIZE
959             return
960
961         self._committed = False
962
963         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
964             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
965
966         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
967             self._repack_writes(num_retries)
968             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
969                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
970                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
971
972         self._current_bblock.append(data)
973
974         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
975
976         self.parent.notify(WRITE, self.parent, self.name, (self, self))
977
978         return len(data)
979
980     @synchronized
981     def flush(self, sync=True, num_retries=0):
982         """Flush the current bufferblock to Keep.
983
984         :sync:
985           If True, commit block synchronously, wait until buffer block has been written.
986           If False, commit block asynchronously, return immediately after putting block into
987           the keep put queue.
988         """
989         if self.committed():
990             return
991
992         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
993             if self._current_bblock.state() == _BufferBlock.WRITABLE:
994                 self._repack_writes(num_retries)
995             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
996
997         if sync:
998             to_delete = set()
999             for s in self._segments:
1000                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1001                 if bb:
1002                     if bb.state() != _BufferBlock.COMMITTED:
1003                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1004                     to_delete.add(s.locator)
1005                     s.locator = bb.locator()
1006             for s in to_delete:
1007                self.parent._my_block_manager().delete_bufferblock(s)
1008
1009         self.parent.notify(MOD, self.parent, self.name, (self, self))
1010
1011     @must_be_writable
1012     @synchronized
1013     def add_segment(self, blocks, pos, size):
1014         """Add a segment to the end of the file.
1015
1016         `pos` and `offset` reference a section of the stream described by
1017         `blocks` (a list of Range objects)
1018
1019         """
1020         self._add_segment(blocks, pos, size)
1021
1022     def _add_segment(self, blocks, pos, size):
1023         """Internal implementation of add_segment."""
1024         self._committed = False
1025         for lr in locators_and_ranges(blocks, pos, size):
1026             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1027             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1028             self._segments.append(r)
1029
1030     @synchronized
1031     def size(self):
1032         """Get the file size."""
1033         if self._segments:
1034             n = self._segments[-1]
1035             return n.range_start + n.range_size
1036         else:
1037             return 0
1038
1039     @synchronized
1040     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1041         buf = ""
1042         filestream = []
1043         for segment in self.segments:
1044             loc = segment.locator
1045             if loc.startswith("bufferblock"):
1046                 loc = self._bufferblocks[loc].calculate_locator()
1047             if portable_locators:
1048                 loc = KeepLocator(loc).stripped()
1049             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1050                                  segment.segment_offset, segment.range_size))
1051         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1052         buf += "\n"
1053         return buf
1054
1055     @must_be_writable
1056     @synchronized
1057     def _reparent(self, newparent, newname):
1058         self._committed = False
1059         self.flush(sync=True)
1060         self.parent.remove(self.name)
1061         self.parent = newparent
1062         self.name = newname
1063         self.lock = self.parent.root_collection().lock
1064
1065
1066 class ArvadosFileReader(ArvadosFileReaderBase):
1067     """Wraps ArvadosFile in a file-like object supporting reading only.
1068
1069     Be aware that this class is NOT thread safe as there is no locking around
1070     updating file pointer.
1071
1072     """
1073
1074     def __init__(self, arvadosfile, num_retries=None):
1075         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1076         self.arvadosfile = arvadosfile
1077
1078     def size(self):
1079         return self.arvadosfile.size()
1080
1081     def stream_name(self):
1082         return self.arvadosfile.parent.stream_name()
1083
1084     @_FileLikeObjectBase._before_close
1085     @retry_method
1086     def read(self, size=None, num_retries=None):
1087         """Read up to `size` bytes from the file and return the result.
1088
1089         Starts at the current file position.  If `size` is None, read the
1090         entire remainder of the file.
1091         """
1092         if size is None:
1093             data = []
1094             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1095             while rd:
1096                 data.append(rd)
1097                 self._filepos += len(rd)
1098                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1099             return ''.join(data)
1100         else:
1101             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1102             self._filepos += len(data)
1103             return data
1104
1105     @_FileLikeObjectBase._before_close
1106     @retry_method
1107     def readfrom(self, offset, size, num_retries=None):
1108         """Read up to `size` bytes from the stream, starting at the specified file offset.
1109
1110         This method does not change the file position.
1111         """
1112         return self.arvadosfile.readfrom(offset, size, num_retries)
1113
1114     def flush(self):
1115         pass
1116
1117
1118 class ArvadosFileWriter(ArvadosFileReader):
1119     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1120
1121     Be aware that this class is NOT thread safe as there is no locking around
1122     updating file pointer.
1123
1124     """
1125
1126     def __init__(self, arvadosfile, mode, num_retries=None):
1127         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1128         self.mode = mode
1129         self.arvadosfile.add_writer(self)
1130
1131     @_FileLikeObjectBase._before_close
1132     @retry_method
1133     def write(self, data, num_retries=None):
1134         if self.mode[0] == "a":
1135             self.arvadosfile.writeto(self.size(), data, num_retries)
1136         else:
1137             self.arvadosfile.writeto(self._filepos, data, num_retries)
1138             self._filepos += len(data)
1139         return len(data)
1140
1141     @_FileLikeObjectBase._before_close
1142     @retry_method
1143     def writelines(self, seq, num_retries=None):
1144         for s in seq:
1145             self.write(s, num_retries=num_retries)
1146
1147     @_FileLikeObjectBase._before_close
1148     def truncate(self, size=None):
1149         if size is None:
1150             size = self._filepos
1151         self.arvadosfile.truncate(size)
1152         if self._filepos > self.size():
1153             self._filepos = self.size()
1154
1155     @_FileLikeObjectBase._before_close
1156     def flush(self):
1157         self.arvadosfile.flush()
1158
1159     def close(self):
1160         if not self.closed:
1161             self.arvadosfile.remove_writer(self)
1162             super(ArvadosFileWriter, self).close()