10315: Enhanced performance when writing lots of small files without flushing by...
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41 class _FileLikeObjectBase(object):
42     def __init__(self, name, mode):
43         self.name = name
44         self.mode = mode
45         self.closed = False
46
47     @staticmethod
48     def _before_close(orig_func):
49         @functools.wraps(orig_func)
50         def before_close_wrapper(self, *args, **kwargs):
51             if self.closed:
52                 raise ValueError("I/O operation on closed stream file")
53             return orig_func(self, *args, **kwargs)
54         return before_close_wrapper
55
56     def __enter__(self):
57         return self
58
59     def __exit__(self, exc_type, exc_value, traceback):
60         try:
61             self.close()
62         except Exception:
63             if exc_type is None:
64                 raise
65
66     def close(self):
67         self.closed = True
68
69
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71     def __init__(self, name, mode, num_retries=None):
72         super(ArvadosFileReaderBase, self).__init__(name, mode)
73         self._filepos = 0L
74         self.num_retries = num_retries
75         self._readline_cache = (None, None)
76
77     def __iter__(self):
78         while True:
79             data = self.readline()
80             if not data:
81                 break
82             yield data
83
84     def decompressed_name(self):
85         return re.sub('\.(bz2|gz)$', '', self.name)
86
87     @_FileLikeObjectBase._before_close
88     def seek(self, pos, whence=os.SEEK_SET):
89         if whence == os.SEEK_CUR:
90             pos += self._filepos
91         elif whence == os.SEEK_END:
92             pos += self.size()
93         self._filepos = min(max(pos, 0L), self.size())
94
95     def tell(self):
96         return self._filepos
97
98     @_FileLikeObjectBase._before_close
99     @retry_method
100     def readall(self, size=2**20, num_retries=None):
101         while True:
102             data = self.read(size, num_retries=num_retries)
103             if data == '':
104                 break
105             yield data
106
107     @_FileLikeObjectBase._before_close
108     @retry_method
109     def readline(self, size=float('inf'), num_retries=None):
110         cache_pos, cache_data = self._readline_cache
111         if self.tell() == cache_pos:
112             data = [cache_data]
113             self._filepos += len(cache_data)
114         else:
115             data = ['']
116         data_size = len(data[-1])
117         while (data_size < size) and ('\n' not in data[-1]):
118             next_read = self.read(2 ** 20, num_retries=num_retries)
119             if not next_read:
120                 break
121             data.append(next_read)
122             data_size += len(next_read)
123         data = ''.join(data)
124         try:
125             nextline_index = data.index('\n') + 1
126         except ValueError:
127             nextline_index = len(data)
128         nextline_index = min(nextline_index, size)
129         self._filepos -= len(data) - nextline_index
130         self._readline_cache = (self.tell(), data[nextline_index:])
131         return data[:nextline_index]
132
133     @_FileLikeObjectBase._before_close
134     @retry_method
135     def decompress(self, decompress, size, num_retries=None):
136         for segment in self.readall(size, num_retries=num_retries):
137             data = decompress(segment)
138             if data:
139                 yield data
140
141     @_FileLikeObjectBase._before_close
142     @retry_method
143     def readall_decompressed(self, size=2**20, num_retries=None):
144         self.seek(0)
145         if self.name.endswith('.bz2'):
146             dc = bz2.BZ2Decompressor()
147             return self.decompress(dc.decompress, size,
148                                    num_retries=num_retries)
149         elif self.name.endswith('.gz'):
150             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152                                    size, num_retries=num_retries)
153         else:
154             return self.readall(size, num_retries=num_retries)
155
156     @_FileLikeObjectBase._before_close
157     @retry_method
158     def readlines(self, sizehint=float('inf'), num_retries=None):
159         data = []
160         data_size = 0
161         for s in self.readall(num_retries=num_retries):
162             data.append(s)
163             data_size += len(s)
164             if data_size >= sizehint:
165                 break
166         return ''.join(data).splitlines(True)
167
168     def size(self):
169         raise NotImplementedError()
170
171     def read(self, size, num_retries=None):
172         raise NotImplementedError()
173
174     def readfrom(self, start, size, num_retries=None):
175         raise NotImplementedError()
176
177
178 class StreamFileReader(ArvadosFileReaderBase):
179     class _NameAttribute(str):
180         # The Python file API provides a plain .name attribute.
181         # Older SDK provided a name() method.
182         # This class provides both, for maximum compatibility.
183         def __call__(self):
184             return self
185
186     def __init__(self, stream, segments, name):
187         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188         self._stream = stream
189         self.segments = segments
190
191     def stream_name(self):
192         return self._stream.name()
193
194     def size(self):
195         n = self.segments[-1]
196         return n.range_start + n.range_size
197
198     @_FileLikeObjectBase._before_close
199     @retry_method
200     def read(self, size, num_retries=None):
201         """Read up to 'size' bytes from the stream, starting at the current file position"""
202         if size == 0:
203             return ''
204
205         data = ''
206         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
207         if available_chunks:
208             lr = available_chunks[0]
209             data = self._stream.readfrom(lr.locator+lr.segment_offset,
210                                           lr.segment_size,
211                                           num_retries=num_retries)
212
213         self._filepos += len(data)
214         return data
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def readfrom(self, start, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at 'start'"""
220         if size == 0:
221             return ''
222
223         data = []
224         for lr in locators_and_ranges(self.segments, start, size):
225             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226                                               num_retries=num_retries))
227         return ''.join(data)
228
229     def as_manifest(self):
230         segs = []
231         for r in self.segments:
232             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234
235
236 def synchronized(orig_func):
237     @functools.wraps(orig_func)
238     def synchronized_wrapper(self, *args, **kwargs):
239         with self.lock:
240             return orig_func(self, *args, **kwargs)
241     return synchronized_wrapper
242
243
244 class StateChangeError(Exception):
245     def __init__(self, message, state, nextstate):
246         super(StateChangeError, self).__init__(message)
247         self.state = state
248         self.nextstate = nextstate
249
250 class _BufferBlock(object):
251     """A stand-in for a Keep block that is in the process of being written.
252
253     Writers can append to it, get the size, and compute the Keep locator.
254     There are three valid states:
255
256     WRITABLE
257       Can append to block.
258
259     PENDING
260       Block is in the process of being uploaded to Keep, append is an error.
261
262     COMMITTED
263       The block has been written to Keep, its internal buffer has been
264       released, fetching the block will fetch it via keep client (since we
265       discarded the internal copy), and identifiers referring to the BufferBlock
266       can be replaced with the block locator.
267
268     """
269
270     WRITABLE = 0
271     PENDING = 1
272     COMMITTED = 2
273     ERROR = 3
274
275     def __init__(self, blockid, starting_capacity, owner):
276         """
277         :blockid:
278           the identifier for this block
279
280         :starting_capacity:
281           the initial buffer capacity
282
283         :owner:
284           ArvadosFile that owns this block
285
286         """
287         self.blockid = blockid
288         self.buffer_block = bytearray(starting_capacity)
289         self.buffer_view = memoryview(self.buffer_block)
290         self.write_pointer = 0
291         self._state = _BufferBlock.WRITABLE
292         self._locator = None
293         self.owner = owner
294         self.lock = threading.Lock()
295         self.wait_for_commit = threading.Event()
296         self.error = None
297
298     @synchronized
299     def append(self, data):
300         """Append some data to the buffer.
301
302         Only valid if the block is in WRITABLE state.  Implements an expanding
303         buffer, doubling capacity as needed to accomdate all the data.
304
305         """
306         if self._state == _BufferBlock.WRITABLE:
307             while (self.write_pointer+len(data)) > len(self.buffer_block):
308                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310                 self.buffer_block = new_buffer_block
311                 self.buffer_view = memoryview(self.buffer_block)
312             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313             self.write_pointer += len(data)
314             self._locator = None
315         else:
316             raise AssertionError("Buffer block is not writable")
317
318     STATE_TRANSITIONS = frozenset([
319             (WRITABLE, PENDING),
320             (PENDING, COMMITTED),
321             (PENDING, ERROR),
322             (ERROR, PENDING)])
323
324     @synchronized
325     def set_state(self, nextstate, val=None):
326         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328         self._state = nextstate
329
330         if self._state == _BufferBlock.PENDING:
331             self.wait_for_commit.clear()
332
333         if self._state == _BufferBlock.COMMITTED:
334             self._locator = val
335             self.buffer_view = None
336             self.buffer_block = None
337             self.wait_for_commit.set()
338
339         if self._state == _BufferBlock.ERROR:
340             self.error = val
341             self.wait_for_commit.set()
342
343     @synchronized
344     def state(self):
345         return self._state
346
347     def size(self):
348         """The amount of data written to the buffer."""
349         return self.write_pointer
350
351     @synchronized
352     def locator(self):
353         """The Keep locator for this buffer's contents."""
354         if self._locator is None:
355             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
356         return self._locator
357
358     @synchronized
359     def clone(self, new_blockid, owner):
360         if self._state == _BufferBlock.COMMITTED:
361             raise AssertionError("Cannot duplicate committed buffer block")
362         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363         bufferblock.append(self.buffer_view[0:self.size()])
364         return bufferblock
365
366     @synchronized
367     def clear(self):
368         self.owner = None
369         self.buffer_block = None
370         self.buffer_view = None
371
372
373 class NoopLock(object):
374     def __enter__(self):
375         return self
376
377     def __exit__(self, exc_type, exc_value, traceback):
378         pass
379
380     def acquire(self, blocking=False):
381         pass
382
383     def release(self):
384         pass
385
386
387 def must_be_writable(orig_func):
388     @functools.wraps(orig_func)
389     def must_be_writable_wrapper(self, *args, **kwargs):
390         if not self.writable():
391             raise IOError(errno.EROFS, "Collection is read-only.")
392         return orig_func(self, *args, **kwargs)
393     return must_be_writable_wrapper
394
395
396 class _BlockManager(object):
397     """BlockManager handles buffer blocks.
398
399     Also handles background block uploads, and background block prefetch for a
400     Collection of ArvadosFiles.
401
402     """
403
404     DEFAULT_PUT_THREADS = 2
405     DEFAULT_GET_THREADS = 2
406
407     def __init__(self, keep, copies=None):
408         """keep: KeepClient object to use"""
409         self._keep = keep
410         self._bufferblocks = collections.OrderedDict()
411         self._put_queue = None
412         self._put_threads = None
413         self._prefetch_queue = None
414         self._prefetch_threads = None
415         self.lock = threading.Lock()
416         self.prefetch_enabled = True
417         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
418         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
419         self.copies = copies
420         self._pending_write_size = 0
421
422     @synchronized
423     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
424         """Allocate a new, empty bufferblock in WRITABLE state and return it.
425
426         :blockid:
427           optional block identifier, otherwise one will be automatically assigned
428
429         :starting_capacity:
430           optional capacity, otherwise will use default capacity
431
432         :owner:
433           ArvadosFile that owns this block
434
435         """
436         return self._alloc_bufferblock(blockid, starting_capacity, owner)
437
438     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
439         if blockid is None:
440             blockid = "%s" % uuid.uuid4()
441         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
442         self._bufferblocks[bufferblock.blockid] = bufferblock
443         return bufferblock
444
445     @synchronized
446     def dup_block(self, block, owner):
447         """Create a new bufferblock initialized with the content of an existing bufferblock.
448
449         :block:
450           the buffer block to copy.
451
452         :owner:
453           ArvadosFile that owns the new block
454
455         """
456         new_blockid = "bufferblock%i" % len(self._bufferblocks)
457         bufferblock = block.clone(new_blockid, owner)
458         self._bufferblocks[bufferblock.blockid] = bufferblock
459         return bufferblock
460
461     @synchronized
462     def is_bufferblock(self, locator):
463         return locator in self._bufferblocks
464
465     def _commit_bufferblock_worker(self):
466         """Background uploader thread."""
467
468         while True:
469             try:
470                 bufferblock = self._put_queue.get()
471                 if bufferblock is None:
472                     return
473
474                 if self.copies is None:
475                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
476                 else:
477                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
478                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
479
480             except Exception as e:
481                 bufferblock.set_state(_BufferBlock.ERROR, e)
482             finally:
483                 if self._put_queue is not None:
484                     self._put_queue.task_done()
485
486     @synchronized
487     def start_put_threads(self):
488         if self._put_threads is None:
489             # Start uploader threads.
490
491             # If we don't limit the Queue size, the upload queue can quickly
492             # grow to take up gigabytes of RAM if the writing process is
493             # generating data more quickly than it can be send to the Keep
494             # servers.
495             #
496             # With two upload threads and a queue size of 2, this means up to 4
497             # blocks pending.  If they are full 64 MiB blocks, that means up to
498             # 256 MiB of internal buffering, which is the same size as the
499             # default download block cache in KeepClient.
500             self._put_queue = Queue.Queue(maxsize=2)
501
502             self._put_threads = []
503             for i in xrange(0, self.num_put_threads):
504                 thread = threading.Thread(target=self._commit_bufferblock_worker)
505                 self._put_threads.append(thread)
506                 thread.daemon = True
507                 thread.start()
508
509     def _block_prefetch_worker(self):
510         """The background downloader thread."""
511         while True:
512             try:
513                 b = self._prefetch_queue.get()
514                 if b is None:
515                     return
516                 self._keep.get(b)
517             except Exception:
518                 pass
519
520     @synchronized
521     def start_get_threads(self):
522         if self._prefetch_threads is None:
523             self._prefetch_queue = Queue.Queue()
524             self._prefetch_threads = []
525             for i in xrange(0, self.num_get_threads):
526                 thread = threading.Thread(target=self._block_prefetch_worker)
527                 self._prefetch_threads.append(thread)
528                 thread.daemon = True
529                 thread.start()
530
531
532     @synchronized
533     def stop_threads(self):
534         """Shut down and wait for background upload and download threads to finish."""
535
536         if self._put_threads is not None:
537             for t in self._put_threads:
538                 self._put_queue.put(None)
539             for t in self._put_threads:
540                 t.join()
541         self._put_threads = None
542         self._put_queue = None
543
544         if self._prefetch_threads is not None:
545             for t in self._prefetch_threads:
546                 self._prefetch_queue.put(None)
547             for t in self._prefetch_threads:
548                 t.join()
549         self._prefetch_threads = None
550         self._prefetch_queue = None
551
552     def __enter__(self):
553         return self
554
555     def __exit__(self, exc_type, exc_value, traceback):
556         self.stop_threads()
557
558     @synchronized
559     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
560         """Packs small blocks together before uploading"""
561         self._pending_write_size += closed_file_size
562
563         # Check if there are enough small blocks for filling up one in full
564         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
565
566             # Search blocks ready for getting packed together before being committed to Keep.
567             # A WRITABLE block always has an owner.
568             # A WRITABLE block with its owner.closed() implies that it's
569             # size is <= KEEP_BLOCK_SIZE/2.
570             small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
571
572             if len(small_blocks) <= 1:
573                 # Not enough small blocks for repacking
574                 return
575
576             new_bb = self._alloc_bufferblock()
577             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
578                 bb = small_blocks.pop(0)
579                 arvfile = bb.owner
580                 self._pending_write_size -= bb.size()
581                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
582                 arvfile.set_segments([Range(new_bb.blockid,
583                                             0,
584                                             bb.size(),
585                                             new_bb.write_pointer - bb.size())])
586                 self._delete_bufferblock(bb.blockid)
587             self.commit_bufferblock(new_bb, sync=sync)
588
589     def commit_bufferblock(self, block, sync):
590         """Initiate a background upload of a bufferblock.
591
592         :block:
593           The block object to upload
594
595         :sync:
596           If `sync` is True, upload the block synchronously.
597           If `sync` is False, upload the block asynchronously.  This will
598           return immediately unless the upload queue is at capacity, in
599           which case it will wait on an upload queue slot.
600
601         """
602         try:
603             # Mark the block as PENDING so to disallow any more appends.
604             block.set_state(_BufferBlock.PENDING)
605         except StateChangeError as e:
606             if e.state == _BufferBlock.PENDING:
607                 if sync:
608                     block.wait_for_commit.wait()
609                 else:
610                     return
611             if block.state() == _BufferBlock.COMMITTED:
612                 return
613             elif block.state() == _BufferBlock.ERROR:
614                 raise block.error
615             else:
616                 raise
617
618         if sync:
619             try:
620                 if self.copies is None:
621                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
622                 else:
623                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
624                 block.set_state(_BufferBlock.COMMITTED, loc)
625             except Exception as e:
626                 block.set_state(_BufferBlock.ERROR, e)
627                 raise
628         else:
629             self.start_put_threads()
630             self._put_queue.put(block)
631
632     @synchronized
633     def get_bufferblock(self, locator):
634         return self._bufferblocks.get(locator)
635
636     @synchronized
637     def delete_bufferblock(self, locator):
638         self._delete_bufferblock(locator)
639
640     def _delete_bufferblock(self, locator):
641         bb = self._bufferblocks[locator]
642         bb.clear()
643         del self._bufferblocks[locator]
644
645     def get_block_contents(self, locator, num_retries, cache_only=False):
646         """Fetch a block.
647
648         First checks to see if the locator is a BufferBlock and return that, if
649         not, passes the request through to KeepClient.get().
650
651         """
652         with self.lock:
653             if locator in self._bufferblocks:
654                 bufferblock = self._bufferblocks[locator]
655                 if bufferblock.state() != _BufferBlock.COMMITTED:
656                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
657                 else:
658                     locator = bufferblock._locator
659         if cache_only:
660             return self._keep.get_from_cache(locator)
661         else:
662             return self._keep.get(locator, num_retries=num_retries)
663
664     def commit_all(self):
665         """Commit all outstanding buffer blocks.
666
667         This is a synchronous call, and will not return until all buffer blocks
668         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
669
670         """
671         self.repack_small_blocks(force=True, sync=True)
672
673         with self.lock:
674             items = self._bufferblocks.items()
675
676         for k,v in items:
677             if v.state() != _BufferBlock.COMMITTED and v.owner:
678                 v.owner.flush(sync=False)
679
680         with self.lock:
681             if self._put_queue is not None:
682                 self._put_queue.join()
683
684                 err = []
685                 for k,v in items:
686                     if v.state() == _BufferBlock.ERROR:
687                         err.append((v.locator(), v.error))
688                 if err:
689                     raise KeepWriteError("Error writing some blocks", err, label="block")
690
691         for k,v in items:
692             # flush again with sync=True to remove committed bufferblocks from
693             # the segments.
694             if v.owner:
695                 v.owner.flush(sync=True)
696
697     def block_prefetch(self, locator):
698         """Initiate a background download of a block.
699
700         This assumes that the underlying KeepClient implements a block cache,
701         so repeated requests for the same block will not result in repeated
702         downloads (unless the block is evicted from the cache.)  This method
703         does not block.
704
705         """
706
707         if not self.prefetch_enabled:
708             return
709
710         if self._keep.get_from_cache(locator) is not None:
711             return
712
713         with self.lock:
714             if locator in self._bufferblocks:
715                 return
716
717         self.start_get_threads()
718         self._prefetch_queue.put(locator)
719
720
721 class ArvadosFile(object):
722     """Represent a file in a Collection.
723
724     ArvadosFile manages the underlying representation of a file in Keep as a
725     sequence of segments spanning a set of blocks, and implements random
726     read/write access.
727
728     This object may be accessed from multiple threads.
729
730     """
731
732     def __init__(self, parent, name, stream=[], segments=[]):
733         """
734         ArvadosFile constructor.
735
736         :stream:
737           a list of Range objects representing a block stream
738
739         :segments:
740           a list of Range objects representing segments
741         """
742         self.parent = parent
743         self.name = name
744         self._writers = set()
745         self._committed = False
746         self._segments = []
747         self.lock = parent.root_collection().lock
748         for s in segments:
749             self._add_segment(stream, s.locator, s.range_size)
750         self._current_bblock = None
751
752     def writable(self):
753         return self.parent.writable()
754
755     @synchronized
756     def segments(self):
757         return copy.copy(self._segments)
758
759     @synchronized
760     def clone(self, new_parent, new_name):
761         """Make a copy of this file."""
762         cp = ArvadosFile(new_parent, new_name)
763         cp.replace_contents(self)
764         return cp
765
766     @must_be_writable
767     @synchronized
768     def replace_contents(self, other):
769         """Replace segments of this file with segments from another `ArvadosFile` object."""
770
771         map_loc = {}
772         self._segments = []
773         for other_segment in other.segments():
774             new_loc = other_segment.locator
775             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
776                 if other_segment.locator not in map_loc:
777                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
778                     if bufferblock.state() != _BufferBlock.WRITABLE:
779                         map_loc[other_segment.locator] = bufferblock.locator()
780                     else:
781                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
782                 new_loc = map_loc[other_segment.locator]
783
784             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
785
786         self._committed = False
787
788     def __eq__(self, other):
789         if other is self:
790             return True
791         if not isinstance(other, ArvadosFile):
792             return False
793
794         othersegs = other.segments()
795         with self.lock:
796             if len(self._segments) != len(othersegs):
797                 return False
798             for i in xrange(0, len(othersegs)):
799                 seg1 = self._segments[i]
800                 seg2 = othersegs[i]
801                 loc1 = seg1.locator
802                 loc2 = seg2.locator
803
804                 if self.parent._my_block_manager().is_bufferblock(loc1):
805                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
806
807                 if other.parent._my_block_manager().is_bufferblock(loc2):
808                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
809
810                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
811                     seg1.range_start != seg2.range_start or
812                     seg1.range_size != seg2.range_size or
813                     seg1.segment_offset != seg2.segment_offset):
814                     return False
815
816         return True
817
818     def __ne__(self, other):
819         return not self.__eq__(other)
820
821     @synchronized
822     def set_segments(self, segs):
823         self._segments = segs
824
825     @synchronized
826     def set_committed(self):
827         """Set committed flag to True"""
828         self._committed = True
829
830     @synchronized
831     def committed(self):
832         """Get whether this is committed or not."""
833         return self._committed
834
835     @synchronized
836     def add_writer(self, writer):
837         """Add an ArvadosFileWriter reference to the list of writers"""
838         if isinstance(writer, ArvadosFileWriter):
839             self._writers.add(writer)
840
841     @synchronized
842     def remove_writer(self, writer, flush):
843         """
844         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
845         and do some block maintenance tasks.
846         """
847         self._writers.remove(writer)
848
849         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
850             # File writer closed, not small enough for repacking
851             self.flush()
852         elif self.closed():
853             # All writers closed and size is adequate for repacking
854             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
855
856     def closed(self):
857         """
858         Get whether this is closed or not. When the writers list is empty, the file
859         is supposed to be closed.
860         """
861         return len(self._writers) == 0
862
863     @must_be_writable
864     @synchronized
865     def truncate(self, size):
866         """Shrink the size of the file.
867
868         If `size` is less than the size of the file, the file contents after
869         `size` will be discarded.  If `size` is greater than the current size
870         of the file, an IOError will be raised.
871
872         """
873         if size < self.size():
874             new_segs = []
875             for r in self._segments:
876                 range_end = r.range_start+r.range_size
877                 if r.range_start >= size:
878                     # segment is past the trucate size, all done
879                     break
880                 elif size < range_end:
881                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
882                     nr.segment_offset = r.segment_offset
883                     new_segs.append(nr)
884                     break
885                 else:
886                     new_segs.append(r)
887
888             self._segments = new_segs
889             self._committed = False
890         elif size > self.size():
891             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
892
893     def readfrom(self, offset, size, num_retries, exact=False):
894         """Read up to `size` bytes from the file starting at `offset`.
895
896         :exact:
897          If False (default), return less data than requested if the read
898          crosses a block boundary and the next block isn't cached.  If True,
899          only return less data than requested when hitting EOF.
900         """
901
902         with self.lock:
903             if size == 0 or offset >= self.size():
904                 return ''
905             readsegs = locators_and_ranges(self._segments, offset, size)
906             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
907
908         locs = set()
909         data = []
910         for lr in readsegs:
911             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
912             if block:
913                 blockview = memoryview(block)
914                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
915                 locs.add(lr.locator)
916             else:
917                 break
918
919         for lr in prefetch:
920             if lr.locator not in locs:
921                 self.parent._my_block_manager().block_prefetch(lr.locator)
922                 locs.add(lr.locator)
923
924         return ''.join(data)
925
926     def _repack_writes(self, num_retries):
927         """Test if the buffer block has more data than actual segments.
928
929         This happens when a buffered write over-writes a file range written in
930         a previous buffered write.  Re-pack the buffer block for efficiency
931         and to avoid leaking information.
932
933         """
934         segs = self._segments
935
936         # Sum up the segments to get the total bytes of the file referencing
937         # into the buffer block.
938         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
939         write_total = sum([s.range_size for s in bufferblock_segs])
940
941         if write_total < self._current_bblock.size():
942             # There is more data in the buffer block than is actually accounted for by segments, so
943             # re-pack into a new buffer by copying over to a new buffer block.
944             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
945             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
946             for t in bufferblock_segs:
947                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
948                 t.segment_offset = new_bb.size() - t.range_size
949
950             self._current_bblock = new_bb
951
952     @must_be_writable
953     @synchronized
954     def writeto(self, offset, data, num_retries):
955         """Write `data` to the file starting at `offset`.
956
957         This will update existing bytes and/or extend the size of the file as
958         necessary.
959
960         """
961         if len(data) == 0:
962             return
963
964         if offset > self.size():
965             raise ArgumentError("Offset is past the end of the file")
966
967         if len(data) > config.KEEP_BLOCK_SIZE:
968             # Chunk it up into smaller writes
969             n = 0
970             dataview = memoryview(data)
971             while n < len(data):
972                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
973                 n += config.KEEP_BLOCK_SIZE
974             return
975
976         self._committed = False
977
978         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
979             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
980
981         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
982             self._repack_writes(num_retries)
983             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
984                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
985                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
986
987         self._current_bblock.append(data)
988
989         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
990
991         self.parent.notify(WRITE, self.parent, self.name, (self, self))
992
993         return len(data)
994
995     @synchronized
996     def flush(self, sync=True, num_retries=0):
997         """Flush the current bufferblock to Keep.
998
999         :sync:
1000           If True, commit block synchronously, wait until buffer block has been written.
1001           If False, commit block asynchronously, return immediately after putting block into
1002           the keep put queue.
1003         """
1004         if self.committed():
1005             return
1006
1007         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1008             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1009                 self._repack_writes(num_retries)
1010             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1011
1012         if sync:
1013             to_delete = set()
1014             for s in self._segments:
1015                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1016                 if bb:
1017                     if bb.state() != _BufferBlock.COMMITTED:
1018                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1019                     to_delete.add(s.locator)
1020                     s.locator = bb.locator()
1021             for s in to_delete:
1022                self.parent._my_block_manager().delete_bufferblock(s)
1023
1024         self.parent.notify(MOD, self.parent, self.name, (self, self))
1025
1026     @must_be_writable
1027     @synchronized
1028     def add_segment(self, blocks, pos, size):
1029         """Add a segment to the end of the file.
1030
1031         `pos` and `offset` reference a section of the stream described by
1032         `blocks` (a list of Range objects)
1033
1034         """
1035         self._add_segment(blocks, pos, size)
1036
1037     def _add_segment(self, blocks, pos, size):
1038         """Internal implementation of add_segment."""
1039         self._committed = False
1040         for lr in locators_and_ranges(blocks, pos, size):
1041             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1042             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1043             self._segments.append(r)
1044
1045     @synchronized
1046     def size(self):
1047         """Get the file size."""
1048         if self._segments:
1049             n = self._segments[-1]
1050             return n.range_start + n.range_size
1051         else:
1052             return 0
1053
1054     @synchronized
1055     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1056         buf = ""
1057         filestream = []
1058         for segment in self.segments:
1059             loc = segment.locator
1060             if loc.startswith("bufferblock"):
1061                 loc = self._bufferblocks[loc].calculate_locator()
1062             if portable_locators:
1063                 loc = KeepLocator(loc).stripped()
1064             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1065                                  segment.segment_offset, segment.range_size))
1066         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1067         buf += "\n"
1068         return buf
1069
1070     @must_be_writable
1071     @synchronized
1072     def _reparent(self, newparent, newname):
1073         self._committed = False
1074         self.flush(sync=True)
1075         self.parent.remove(self.name)
1076         self.parent = newparent
1077         self.name = newname
1078         self.lock = self.parent.root_collection().lock
1079
1080
1081 class ArvadosFileReader(ArvadosFileReaderBase):
1082     """Wraps ArvadosFile in a file-like object supporting reading only.
1083
1084     Be aware that this class is NOT thread safe as there is no locking around
1085     updating file pointer.
1086
1087     """
1088
1089     def __init__(self, arvadosfile, num_retries=None):
1090         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1091         self.arvadosfile = arvadosfile
1092
1093     def size(self):
1094         return self.arvadosfile.size()
1095
1096     def stream_name(self):
1097         return self.arvadosfile.parent.stream_name()
1098
1099     @_FileLikeObjectBase._before_close
1100     @retry_method
1101     def read(self, size=None, num_retries=None):
1102         """Read up to `size` bytes from the file and return the result.
1103
1104         Starts at the current file position.  If `size` is None, read the
1105         entire remainder of the file.
1106         """
1107         if size is None:
1108             data = []
1109             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1110             while rd:
1111                 data.append(rd)
1112                 self._filepos += len(rd)
1113                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1114             return ''.join(data)
1115         else:
1116             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1117             self._filepos += len(data)
1118             return data
1119
1120     @_FileLikeObjectBase._before_close
1121     @retry_method
1122     def readfrom(self, offset, size, num_retries=None):
1123         """Read up to `size` bytes from the stream, starting at the specified file offset.
1124
1125         This method does not change the file position.
1126         """
1127         return self.arvadosfile.readfrom(offset, size, num_retries)
1128
1129     def flush(self):
1130         pass
1131
1132
1133 class ArvadosFileWriter(ArvadosFileReader):
1134     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1135
1136     Be aware that this class is NOT thread safe as there is no locking around
1137     updating file pointer.
1138
1139     """
1140
1141     def __init__(self, arvadosfile, mode, num_retries=None):
1142         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1143         self.mode = mode
1144         self.arvadosfile.add_writer(self)
1145
1146     @_FileLikeObjectBase._before_close
1147     @retry_method
1148     def write(self, data, num_retries=None):
1149         if self.mode[0] == "a":
1150             self.arvadosfile.writeto(self.size(), data, num_retries)
1151         else:
1152             self.arvadosfile.writeto(self._filepos, data, num_retries)
1153             self._filepos += len(data)
1154         return len(data)
1155
1156     @_FileLikeObjectBase._before_close
1157     @retry_method
1158     def writelines(self, seq, num_retries=None):
1159         for s in seq:
1160             self.write(s, num_retries=num_retries)
1161
1162     @_FileLikeObjectBase._before_close
1163     def truncate(self, size=None):
1164         if size is None:
1165             size = self._filepos
1166         self.arvadosfile.truncate(size)
1167         if self._filepos > self.size():
1168             self._filepos = self.size()
1169
1170     @_FileLikeObjectBase._before_close
1171     def flush(self):
1172         self.arvadosfile.flush()
1173
1174     def close(self, flush=True):
1175         if not self.closed:
1176             self.arvadosfile.remove_writer(self, flush)
1177             super(ArvadosFileWriter, self).close()