10383: Merge branch 'master' into 10383-arv-put-incremental-upload
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41 class _FileLikeObjectBase(object):
42     def __init__(self, name, mode):
43         self.name = name
44         self.mode = mode
45         self.closed = False
46
47     @staticmethod
48     def _before_close(orig_func):
49         @functools.wraps(orig_func)
50         def before_close_wrapper(self, *args, **kwargs):
51             if self.closed:
52                 raise ValueError("I/O operation on closed stream file")
53             return orig_func(self, *args, **kwargs)
54         return before_close_wrapper
55
56     def __enter__(self):
57         return self
58
59     def __exit__(self, exc_type, exc_value, traceback):
60         try:
61             self.close()
62         except Exception:
63             if exc_type is None:
64                 raise
65
66     def close(self):
67         self.closed = True
68
69
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71     def __init__(self, name, mode, num_retries=None):
72         super(ArvadosFileReaderBase, self).__init__(name, mode)
73         self._filepos = 0L
74         self.num_retries = num_retries
75         self._readline_cache = (None, None)
76
77     def __iter__(self):
78         while True:
79             data = self.readline()
80             if not data:
81                 break
82             yield data
83
84     def decompressed_name(self):
85         return re.sub('\.(bz2|gz)$', '', self.name)
86
87     @_FileLikeObjectBase._before_close
88     def seek(self, pos, whence=os.SEEK_SET):
89         if whence == os.SEEK_CUR:
90             pos += self._filepos
91         elif whence == os.SEEK_END:
92             pos += self.size()
93         self._filepos = min(max(pos, 0L), self.size())
94
95     def tell(self):
96         return self._filepos
97
98     @_FileLikeObjectBase._before_close
99     @retry_method
100     def readall(self, size=2**20, num_retries=None):
101         while True:
102             data = self.read(size, num_retries=num_retries)
103             if data == '':
104                 break
105             yield data
106
107     @_FileLikeObjectBase._before_close
108     @retry_method
109     def readline(self, size=float('inf'), num_retries=None):
110         cache_pos, cache_data = self._readline_cache
111         if self.tell() == cache_pos:
112             data = [cache_data]
113             self._filepos += len(cache_data)
114         else:
115             data = ['']
116         data_size = len(data[-1])
117         while (data_size < size) and ('\n' not in data[-1]):
118             next_read = self.read(2 ** 20, num_retries=num_retries)
119             if not next_read:
120                 break
121             data.append(next_read)
122             data_size += len(next_read)
123         data = ''.join(data)
124         try:
125             nextline_index = data.index('\n') + 1
126         except ValueError:
127             nextline_index = len(data)
128         nextline_index = min(nextline_index, size)
129         self._filepos -= len(data) - nextline_index
130         self._readline_cache = (self.tell(), data[nextline_index:])
131         return data[:nextline_index]
132
133     @_FileLikeObjectBase._before_close
134     @retry_method
135     def decompress(self, decompress, size, num_retries=None):
136         for segment in self.readall(size, num_retries=num_retries):
137             data = decompress(segment)
138             if data:
139                 yield data
140
141     @_FileLikeObjectBase._before_close
142     @retry_method
143     def readall_decompressed(self, size=2**20, num_retries=None):
144         self.seek(0)
145         if self.name.endswith('.bz2'):
146             dc = bz2.BZ2Decompressor()
147             return self.decompress(dc.decompress, size,
148                                    num_retries=num_retries)
149         elif self.name.endswith('.gz'):
150             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152                                    size, num_retries=num_retries)
153         else:
154             return self.readall(size, num_retries=num_retries)
155
156     @_FileLikeObjectBase._before_close
157     @retry_method
158     def readlines(self, sizehint=float('inf'), num_retries=None):
159         data = []
160         data_size = 0
161         for s in self.readall(num_retries=num_retries):
162             data.append(s)
163             data_size += len(s)
164             if data_size >= sizehint:
165                 break
166         return ''.join(data).splitlines(True)
167
168     def size(self):
169         raise NotImplementedError()
170
171     def read(self, size, num_retries=None):
172         raise NotImplementedError()
173
174     def readfrom(self, start, size, num_retries=None):
175         raise NotImplementedError()
176
177
178 class StreamFileReader(ArvadosFileReaderBase):
179     class _NameAttribute(str):
180         # The Python file API provides a plain .name attribute.
181         # Older SDK provided a name() method.
182         # This class provides both, for maximum compatibility.
183         def __call__(self):
184             return self
185
186     def __init__(self, stream, segments, name):
187         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188         self._stream = stream
189         self.segments = segments
190
191     def stream_name(self):
192         return self._stream.name()
193
194     def size(self):
195         n = self.segments[-1]
196         return n.range_start + n.range_size
197
198     @_FileLikeObjectBase._before_close
199     @retry_method
200     def read(self, size, num_retries=None):
201         """Read up to 'size' bytes from the stream, starting at the current file position"""
202         if size == 0:
203             return ''
204
205         data = ''
206         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
207         if available_chunks:
208             lr = available_chunks[0]
209             data = self._stream.readfrom(lr.locator+lr.segment_offset,
210                                           lr.segment_size,
211                                           num_retries=num_retries)
212
213         self._filepos += len(data)
214         return data
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def readfrom(self, start, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at 'start'"""
220         if size == 0:
221             return ''
222
223         data = []
224         for lr in locators_and_ranges(self.segments, start, size):
225             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226                                               num_retries=num_retries))
227         return ''.join(data)
228
229     def as_manifest(self):
230         segs = []
231         for r in self.segments:
232             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234
235
236 def synchronized(orig_func):
237     @functools.wraps(orig_func)
238     def synchronized_wrapper(self, *args, **kwargs):
239         with self.lock:
240             return orig_func(self, *args, **kwargs)
241     return synchronized_wrapper
242
243
244 class StateChangeError(Exception):
245     def __init__(self, message, state, nextstate):
246         super(StateChangeError, self).__init__(message)
247         self.state = state
248         self.nextstate = nextstate
249
250 class _BufferBlock(object):
251     """A stand-in for a Keep block that is in the process of being written.
252
253     Writers can append to it, get the size, and compute the Keep locator.
254     There are three valid states:
255
256     WRITABLE
257       Can append to block.
258
259     PENDING
260       Block is in the process of being uploaded to Keep, append is an error.
261
262     COMMITTED
263       The block has been written to Keep, its internal buffer has been
264       released, fetching the block will fetch it via keep client (since we
265       discarded the internal copy), and identifiers referring to the BufferBlock
266       can be replaced with the block locator.
267
268     """
269
270     WRITABLE = 0
271     PENDING = 1
272     COMMITTED = 2
273     ERROR = 3
274
275     def __init__(self, blockid, starting_capacity, owner):
276         """
277         :blockid:
278           the identifier for this block
279
280         :starting_capacity:
281           the initial buffer capacity
282
283         :owner:
284           ArvadosFile that owns this block
285
286         """
287         self.blockid = blockid
288         self.buffer_block = bytearray(starting_capacity)
289         self.buffer_view = memoryview(self.buffer_block)
290         self.write_pointer = 0
291         self._state = _BufferBlock.WRITABLE
292         self._locator = None
293         self.owner = owner
294         self.lock = threading.Lock()
295         self.wait_for_commit = threading.Event()
296         self.error = None
297
298     @synchronized
299     def append(self, data):
300         """Append some data to the buffer.
301
302         Only valid if the block is in WRITABLE state.  Implements an expanding
303         buffer, doubling capacity as needed to accomdate all the data.
304
305         """
306         if self._state == _BufferBlock.WRITABLE:
307             while (self.write_pointer+len(data)) > len(self.buffer_block):
308                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310                 self.buffer_block = new_buffer_block
311                 self.buffer_view = memoryview(self.buffer_block)
312             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313             self.write_pointer += len(data)
314             self._locator = None
315         else:
316             raise AssertionError("Buffer block is not writable")
317
318     STATE_TRANSITIONS = frozenset([
319             (WRITABLE, PENDING),
320             (PENDING, COMMITTED),
321             (PENDING, ERROR),
322             (ERROR, PENDING)])
323
324     @synchronized
325     def set_state(self, nextstate, val=None):
326         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328         self._state = nextstate
329
330         if self._state == _BufferBlock.PENDING:
331             self.wait_for_commit.clear()
332
333         if self._state == _BufferBlock.COMMITTED:
334             self._locator = val
335             self.buffer_view = None
336             self.buffer_block = None
337             self.wait_for_commit.set()
338
339         if self._state == _BufferBlock.ERROR:
340             self.error = val
341             self.wait_for_commit.set()
342
343     @synchronized
344     def state(self):
345         return self._state
346
347     def size(self):
348         """The amount of data written to the buffer."""
349         return self.write_pointer
350
351     @synchronized
352     def locator(self):
353         """The Keep locator for this buffer's contents."""
354         if self._locator is None:
355             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
356         return self._locator
357
358     @synchronized
359     def clone(self, new_blockid, owner):
360         if self._state == _BufferBlock.COMMITTED:
361             raise AssertionError("Cannot duplicate committed buffer block")
362         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363         bufferblock.append(self.buffer_view[0:self.size()])
364         return bufferblock
365
366     @synchronized
367     def clear(self):
368         self.owner = None
369         self.buffer_block = None
370         self.buffer_view = None
371
372
373 class NoopLock(object):
374     def __enter__(self):
375         return self
376
377     def __exit__(self, exc_type, exc_value, traceback):
378         pass
379
380     def acquire(self, blocking=False):
381         pass
382
383     def release(self):
384         pass
385
386
387 def must_be_writable(orig_func):
388     @functools.wraps(orig_func)
389     def must_be_writable_wrapper(self, *args, **kwargs):
390         if not self.writable():
391             raise IOError(errno.EROFS, "Collection is read-only.")
392         return orig_func(self, *args, **kwargs)
393     return must_be_writable_wrapper
394
395
396 class _BlockManager(object):
397     """BlockManager handles buffer blocks.
398
399     Also handles background block uploads, and background block prefetch for a
400     Collection of ArvadosFiles.
401
402     """
403
404     DEFAULT_PUT_THREADS = 2
405     DEFAULT_GET_THREADS = 2
406
407     def __init__(self, keep, copies=None):
408         """keep: KeepClient object to use"""
409         self._keep = keep
410         self._bufferblocks = collections.OrderedDict()
411         self._put_queue = None
412         self._put_threads = None
413         self._prefetch_queue = None
414         self._prefetch_threads = None
415         self.lock = threading.Lock()
416         self.prefetch_enabled = True
417         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
418         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
419         self.copies = copies
420         self._pending_write_size = 0
421         self.threads_lock = threading.Lock()
422
423     @synchronized
424     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
425         """Allocate a new, empty bufferblock in WRITABLE state and return it.
426
427         :blockid:
428           optional block identifier, otherwise one will be automatically assigned
429
430         :starting_capacity:
431           optional capacity, otherwise will use default capacity
432
433         :owner:
434           ArvadosFile that owns this block
435
436         """
437         return self._alloc_bufferblock(blockid, starting_capacity, owner)
438
439     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
440         if blockid is None:
441             blockid = "%s" % uuid.uuid4()
442         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
443         self._bufferblocks[bufferblock.blockid] = bufferblock
444         return bufferblock
445
446     @synchronized
447     def dup_block(self, block, owner):
448         """Create a new bufferblock initialized with the content of an existing bufferblock.
449
450         :block:
451           the buffer block to copy.
452
453         :owner:
454           ArvadosFile that owns the new block
455
456         """
457         new_blockid = "bufferblock%i" % len(self._bufferblocks)
458         bufferblock = block.clone(new_blockid, owner)
459         self._bufferblocks[bufferblock.blockid] = bufferblock
460         return bufferblock
461
462     @synchronized
463     def is_bufferblock(self, locator):
464         return locator in self._bufferblocks
465
466     def _commit_bufferblock_worker(self):
467         """Background uploader thread."""
468
469         while True:
470             try:
471                 bufferblock = self._put_queue.get()
472                 if bufferblock is None:
473                     return
474
475                 if self.copies is None:
476                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
477                 else:
478                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
479                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
480
481             except Exception as e:
482                 bufferblock.set_state(_BufferBlock.ERROR, e)
483             finally:
484                 if self._put_queue is not None:
485                     self._put_queue.task_done()
486
487     def start_put_threads(self):
488         with self.threads_lock:
489             if self._put_threads is None:
490                 # Start uploader threads.
491
492                 # If we don't limit the Queue size, the upload queue can quickly
493                 # grow to take up gigabytes of RAM if the writing process is
494                 # generating data more quickly than it can be send to the Keep
495                 # servers.
496                 #
497                 # With two upload threads and a queue size of 2, this means up to 4
498                 # blocks pending.  If they are full 64 MiB blocks, that means up to
499                 # 256 MiB of internal buffering, which is the same size as the
500                 # default download block cache in KeepClient.
501                 self._put_queue = Queue.Queue(maxsize=2)
502
503                 self._put_threads = []
504                 for i in xrange(0, self.num_put_threads):
505                     thread = threading.Thread(target=self._commit_bufferblock_worker)
506                     self._put_threads.append(thread)
507                     thread.daemon = True
508                     thread.start()
509
510     def _block_prefetch_worker(self):
511         """The background downloader thread."""
512         while True:
513             try:
514                 b = self._prefetch_queue.get()
515                 if b is None:
516                     return
517                 self._keep.get(b)
518             except Exception:
519                 _logger.exception("Exception doing block prefetch")
520
521     @synchronized
522     def start_get_threads(self):
523         if self._prefetch_threads is None:
524             self._prefetch_queue = Queue.Queue()
525             self._prefetch_threads = []
526             for i in xrange(0, self.num_get_threads):
527                 thread = threading.Thread(target=self._block_prefetch_worker)
528                 self._prefetch_threads.append(thread)
529                 thread.daemon = True
530                 thread.start()
531
532
533     @synchronized
534     def stop_threads(self):
535         """Shut down and wait for background upload and download threads to finish."""
536
537         if self._put_threads is not None:
538             for t in self._put_threads:
539                 self._put_queue.put(None)
540             for t in self._put_threads:
541                 t.join()
542         self._put_threads = None
543         self._put_queue = None
544
545         if self._prefetch_threads is not None:
546             for t in self._prefetch_threads:
547                 self._prefetch_queue.put(None)
548             for t in self._prefetch_threads:
549                 t.join()
550         self._prefetch_threads = None
551         self._prefetch_queue = None
552
553     def __enter__(self):
554         return self
555
556     def __exit__(self, exc_type, exc_value, traceback):
557         self.stop_threads()
558
559     @synchronized
560     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
561         """Packs small blocks together before uploading"""
562         self._pending_write_size += closed_file_size
563
564         # Check if there are enough small blocks for filling up one in full
565         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
566
567             # Search blocks ready for getting packed together before being committed to Keep.
568             # A WRITABLE block always has an owner.
569             # A WRITABLE block with its owner.closed() implies that it's
570             # size is <= KEEP_BLOCK_SIZE/2.
571             small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
572
573             if len(small_blocks) <= 1:
574                 # Not enough small blocks for repacking
575                 return
576
577             # Update the pending write size count with its true value, just in case
578             # some small file was opened, written and closed several times.
579             self._pending_write_size = sum([b.size() for b in small_blocks])
580             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
581                 return
582
583             new_bb = self._alloc_bufferblock()
584             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
585                 bb = small_blocks.pop(0)
586                 arvfile = bb.owner
587                 self._pending_write_size -= bb.size()
588                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
589                 arvfile.set_segments([Range(new_bb.blockid,
590                                             0,
591                                             bb.size(),
592                                             new_bb.write_pointer - bb.size())])
593                 self._delete_bufferblock(bb.blockid)
594             self.commit_bufferblock(new_bb, sync=sync)
595
596     def commit_bufferblock(self, block, sync):
597         """Initiate a background upload of a bufferblock.
598
599         :block:
600           The block object to upload
601
602         :sync:
603           If `sync` is True, upload the block synchronously.
604           If `sync` is False, upload the block asynchronously.  This will
605           return immediately unless the upload queue is at capacity, in
606           which case it will wait on an upload queue slot.
607
608         """
609         try:
610             # Mark the block as PENDING so to disallow any more appends.
611             block.set_state(_BufferBlock.PENDING)
612         except StateChangeError as e:
613             if e.state == _BufferBlock.PENDING:
614                 if sync:
615                     block.wait_for_commit.wait()
616                 else:
617                     return
618             if block.state() == _BufferBlock.COMMITTED:
619                 return
620             elif block.state() == _BufferBlock.ERROR:
621                 raise block.error
622             else:
623                 raise
624
625         if sync:
626             try:
627                 if self.copies is None:
628                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
629                 else:
630                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
631                 block.set_state(_BufferBlock.COMMITTED, loc)
632             except Exception as e:
633                 block.set_state(_BufferBlock.ERROR, e)
634                 raise
635         else:
636             self.start_put_threads()
637             self._put_queue.put(block)
638
639     @synchronized
640     def get_bufferblock(self, locator):
641         return self._bufferblocks.get(locator)
642
643     @synchronized
644     def delete_bufferblock(self, locator):
645         self._delete_bufferblock(locator)
646
647     def _delete_bufferblock(self, locator):
648         bb = self._bufferblocks[locator]
649         bb.clear()
650         del self._bufferblocks[locator]
651
652     def get_block_contents(self, locator, num_retries, cache_only=False):
653         """Fetch a block.
654
655         First checks to see if the locator is a BufferBlock and return that, if
656         not, passes the request through to KeepClient.get().
657
658         """
659         with self.lock:
660             if locator in self._bufferblocks:
661                 bufferblock = self._bufferblocks[locator]
662                 if bufferblock.state() != _BufferBlock.COMMITTED:
663                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
664                 else:
665                     locator = bufferblock._locator
666         if cache_only:
667             return self._keep.get_from_cache(locator)
668         else:
669             return self._keep.get(locator, num_retries=num_retries)
670
671     def commit_all(self):
672         """Commit all outstanding buffer blocks.
673
674         This is a synchronous call, and will not return until all buffer blocks
675         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
676
677         """
678         self.repack_small_blocks(force=True, sync=True)
679
680         with self.lock:
681             items = self._bufferblocks.items()
682
683         for k,v in items:
684             if v.state() != _BufferBlock.COMMITTED and v.owner:
685                 v.owner.flush(sync=False)
686
687         with self.lock:
688             if self._put_queue is not None:
689                 self._put_queue.join()
690
691                 err = []
692                 for k,v in items:
693                     if v.state() == _BufferBlock.ERROR:
694                         err.append((v.locator(), v.error))
695                 if err:
696                     raise KeepWriteError("Error writing some blocks", err, label="block")
697
698         for k,v in items:
699             # flush again with sync=True to remove committed bufferblocks from
700             # the segments.
701             if v.owner:
702                 v.owner.flush(sync=True)
703
704     def block_prefetch(self, locator):
705         """Initiate a background download of a block.
706
707         This assumes that the underlying KeepClient implements a block cache,
708         so repeated requests for the same block will not result in repeated
709         downloads (unless the block is evicted from the cache.)  This method
710         does not block.
711
712         """
713
714         if not self.prefetch_enabled:
715             return
716
717         if self._keep.get_from_cache(locator) is not None:
718             return
719
720         with self.lock:
721             if locator in self._bufferblocks:
722                 return
723
724         self.start_get_threads()
725         self._prefetch_queue.put(locator)
726
727
728 class ArvadosFile(object):
729     """Represent a file in a Collection.
730
731     ArvadosFile manages the underlying representation of a file in Keep as a
732     sequence of segments spanning a set of blocks, and implements random
733     read/write access.
734
735     This object may be accessed from multiple threads.
736
737     """
738
739     def __init__(self, parent, name, stream=[], segments=[]):
740         """
741         ArvadosFile constructor.
742
743         :stream:
744           a list of Range objects representing a block stream
745
746         :segments:
747           a list of Range objects representing segments
748         """
749         self.parent = parent
750         self.name = name
751         self._writers = set()
752         self._committed = False
753         self._segments = []
754         self.lock = parent.root_collection().lock
755         for s in segments:
756             self._add_segment(stream, s.locator, s.range_size)
757         self._current_bblock = None
758
759     def writable(self):
760         return self.parent.writable()
761
762     @synchronized
763     def permission_expired(self, as_of_dt=None):
764         """Returns True if any of the segment's locators is expired"""
765         for r in self._segments:
766             if KeepLocator(r.locator).permission_expired(as_of_dt):
767                 return True
768         return False
769
770     @synchronized
771     def segments(self):
772         return copy.copy(self._segments)
773
774     @synchronized
775     def clone(self, new_parent, new_name):
776         """Make a copy of this file."""
777         cp = ArvadosFile(new_parent, new_name)
778         cp.replace_contents(self)
779         return cp
780
781     @must_be_writable
782     @synchronized
783     def replace_contents(self, other):
784         """Replace segments of this file with segments from another `ArvadosFile` object."""
785
786         map_loc = {}
787         self._segments = []
788         for other_segment in other.segments():
789             new_loc = other_segment.locator
790             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
791                 if other_segment.locator not in map_loc:
792                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
793                     if bufferblock.state() != _BufferBlock.WRITABLE:
794                         map_loc[other_segment.locator] = bufferblock.locator()
795                     else:
796                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
797                 new_loc = map_loc[other_segment.locator]
798
799             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
800
801         self._committed = False
802
803     def __eq__(self, other):
804         if other is self:
805             return True
806         if not isinstance(other, ArvadosFile):
807             return False
808
809         othersegs = other.segments()
810         with self.lock:
811             if len(self._segments) != len(othersegs):
812                 return False
813             for i in xrange(0, len(othersegs)):
814                 seg1 = self._segments[i]
815                 seg2 = othersegs[i]
816                 loc1 = seg1.locator
817                 loc2 = seg2.locator
818
819                 if self.parent._my_block_manager().is_bufferblock(loc1):
820                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
821
822                 if other.parent._my_block_manager().is_bufferblock(loc2):
823                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
824
825                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
826                     seg1.range_start != seg2.range_start or
827                     seg1.range_size != seg2.range_size or
828                     seg1.segment_offset != seg2.segment_offset):
829                     return False
830
831         return True
832
833     def __ne__(self, other):
834         return not self.__eq__(other)
835
836     @synchronized
837     def set_segments(self, segs):
838         self._segments = segs
839
840     @synchronized
841     def set_committed(self):
842         """Set committed flag to True"""
843         self._committed = True
844
845     @synchronized
846     def committed(self):
847         """Get whether this is committed or not."""
848         return self._committed
849
850     @synchronized
851     def add_writer(self, writer):
852         """Add an ArvadosFileWriter reference to the list of writers"""
853         if isinstance(writer, ArvadosFileWriter):
854             self._writers.add(writer)
855
856     @synchronized
857     def remove_writer(self, writer, flush):
858         """
859         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
860         and do some block maintenance tasks.
861         """
862         self._writers.remove(writer)
863
864         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
865             # File writer closed, not small enough for repacking
866             self.flush()
867         elif self.closed():
868             # All writers closed and size is adequate for repacking
869             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
870
871     def closed(self):
872         """
873         Get whether this is closed or not. When the writers list is empty, the file
874         is supposed to be closed.
875         """
876         return len(self._writers) == 0
877
878     @must_be_writable
879     @synchronized
880     def truncate(self, size):
881         """Shrink the size of the file.
882
883         If `size` is less than the size of the file, the file contents after
884         `size` will be discarded.  If `size` is greater than the current size
885         of the file, an IOError will be raised.
886
887         """
888         if size < self.size():
889             new_segs = []
890             for r in self._segments:
891                 range_end = r.range_start+r.range_size
892                 if r.range_start >= size:
893                     # segment is past the trucate size, all done
894                     break
895                 elif size < range_end:
896                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
897                     nr.segment_offset = r.segment_offset
898                     new_segs.append(nr)
899                     break
900                 else:
901                     new_segs.append(r)
902
903             self._segments = new_segs
904             self._committed = False
905         elif size > self.size():
906             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
907
908     def readfrom(self, offset, size, num_retries, exact=False):
909         """Read up to `size` bytes from the file starting at `offset`.
910
911         :exact:
912          If False (default), return less data than requested if the read
913          crosses a block boundary and the next block isn't cached.  If True,
914          only return less data than requested when hitting EOF.
915         """
916
917         with self.lock:
918             if size == 0 or offset >= self.size():
919                 return ''
920             readsegs = locators_and_ranges(self._segments, offset, size)
921             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
922
923         locs = set()
924         data = []
925         for lr in readsegs:
926             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
927             if block:
928                 blockview = memoryview(block)
929                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
930                 locs.add(lr.locator)
931             else:
932                 break
933
934         for lr in prefetch:
935             if lr.locator not in locs:
936                 self.parent._my_block_manager().block_prefetch(lr.locator)
937                 locs.add(lr.locator)
938
939         return ''.join(data)
940
941     def _repack_writes(self, num_retries):
942         """Test if the buffer block has more data than actual segments.
943
944         This happens when a buffered write over-writes a file range written in
945         a previous buffered write.  Re-pack the buffer block for efficiency
946         and to avoid leaking information.
947
948         """
949         segs = self._segments
950
951         # Sum up the segments to get the total bytes of the file referencing
952         # into the buffer block.
953         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
954         write_total = sum([s.range_size for s in bufferblock_segs])
955
956         if write_total < self._current_bblock.size():
957             # There is more data in the buffer block than is actually accounted for by segments, so
958             # re-pack into a new buffer by copying over to a new buffer block.
959             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
960             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
961             for t in bufferblock_segs:
962                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
963                 t.segment_offset = new_bb.size() - t.range_size
964
965             self._current_bblock = new_bb
966
967     @must_be_writable
968     @synchronized
969     def writeto(self, offset, data, num_retries):
970         """Write `data` to the file starting at `offset`.
971
972         This will update existing bytes and/or extend the size of the file as
973         necessary.
974
975         """
976         if len(data) == 0:
977             return
978
979         if offset > self.size():
980             raise ArgumentError("Offset is past the end of the file")
981
982         if len(data) > config.KEEP_BLOCK_SIZE:
983             # Chunk it up into smaller writes
984             n = 0
985             dataview = memoryview(data)
986             while n < len(data):
987                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
988                 n += config.KEEP_BLOCK_SIZE
989             return
990
991         self._committed = False
992
993         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
994             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
995
996         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
997             self._repack_writes(num_retries)
998             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
999                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1000                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1001
1002         self._current_bblock.append(data)
1003
1004         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1005
1006         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1007
1008         return len(data)
1009
1010     @synchronized
1011     def flush(self, sync=True, num_retries=0):
1012         """Flush the current bufferblock to Keep.
1013
1014         :sync:
1015           If True, commit block synchronously, wait until buffer block has been written.
1016           If False, commit block asynchronously, return immediately after putting block into
1017           the keep put queue.
1018         """
1019         if self.committed():
1020             return
1021
1022         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1023             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1024                 self._repack_writes(num_retries)
1025             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1026
1027         if sync:
1028             to_delete = set()
1029             for s in self._segments:
1030                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1031                 if bb:
1032                     if bb.state() != _BufferBlock.COMMITTED:
1033                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1034                     to_delete.add(s.locator)
1035                     s.locator = bb.locator()
1036             for s in to_delete:
1037                self.parent._my_block_manager().delete_bufferblock(s)
1038
1039         self.parent.notify(MOD, self.parent, self.name, (self, self))
1040
1041     @must_be_writable
1042     @synchronized
1043     def add_segment(self, blocks, pos, size):
1044         """Add a segment to the end of the file.
1045
1046         `pos` and `offset` reference a section of the stream described by
1047         `blocks` (a list of Range objects)
1048
1049         """
1050         self._add_segment(blocks, pos, size)
1051
1052     def _add_segment(self, blocks, pos, size):
1053         """Internal implementation of add_segment."""
1054         self._committed = False
1055         for lr in locators_and_ranges(blocks, pos, size):
1056             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1057             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1058             self._segments.append(r)
1059
1060     @synchronized
1061     def size(self):
1062         """Get the file size."""
1063         if self._segments:
1064             n = self._segments[-1]
1065             return n.range_start + n.range_size
1066         else:
1067             return 0
1068
1069     @synchronized
1070     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1071         buf = ""
1072         filestream = []
1073         for segment in self.segments:
1074             loc = segment.locator
1075             if loc.startswith("bufferblock"):
1076                 loc = self._bufferblocks[loc].calculate_locator()
1077             if portable_locators:
1078                 loc = KeepLocator(loc).stripped()
1079             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1080                                  segment.segment_offset, segment.range_size))
1081         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1082         buf += "\n"
1083         return buf
1084
1085     @must_be_writable
1086     @synchronized
1087     def _reparent(self, newparent, newname):
1088         self._committed = False
1089         self.flush(sync=True)
1090         self.parent.remove(self.name)
1091         self.parent = newparent
1092         self.name = newname
1093         self.lock = self.parent.root_collection().lock
1094
1095
1096 class ArvadosFileReader(ArvadosFileReaderBase):
1097     """Wraps ArvadosFile in a file-like object supporting reading only.
1098
1099     Be aware that this class is NOT thread safe as there is no locking around
1100     updating file pointer.
1101
1102     """
1103
1104     def __init__(self, arvadosfile, num_retries=None):
1105         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1106         self.arvadosfile = arvadosfile
1107
1108     def size(self):
1109         return self.arvadosfile.size()
1110
1111     def stream_name(self):
1112         return self.arvadosfile.parent.stream_name()
1113
1114     @_FileLikeObjectBase._before_close
1115     @retry_method
1116     def read(self, size=None, num_retries=None):
1117         """Read up to `size` bytes from the file and return the result.
1118
1119         Starts at the current file position.  If `size` is None, read the
1120         entire remainder of the file.
1121         """
1122         if size is None:
1123             data = []
1124             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1125             while rd:
1126                 data.append(rd)
1127                 self._filepos += len(rd)
1128                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1129             return ''.join(data)
1130         else:
1131             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1132             self._filepos += len(data)
1133             return data
1134
1135     @_FileLikeObjectBase._before_close
1136     @retry_method
1137     def readfrom(self, offset, size, num_retries=None):
1138         """Read up to `size` bytes from the stream, starting at the specified file offset.
1139
1140         This method does not change the file position.
1141         """
1142         return self.arvadosfile.readfrom(offset, size, num_retries)
1143
1144     def flush(self):
1145         pass
1146
1147
1148 class ArvadosFileWriter(ArvadosFileReader):
1149     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1150
1151     Be aware that this class is NOT thread safe as there is no locking around
1152     updating file pointer.
1153
1154     """
1155
1156     def __init__(self, arvadosfile, mode, num_retries=None):
1157         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1158         self.mode = mode
1159         self.arvadosfile.add_writer(self)
1160
1161     @_FileLikeObjectBase._before_close
1162     @retry_method
1163     def write(self, data, num_retries=None):
1164         if self.mode[0] == "a":
1165             self.arvadosfile.writeto(self.size(), data, num_retries)
1166         else:
1167             self.arvadosfile.writeto(self._filepos, data, num_retries)
1168             self._filepos += len(data)
1169         return len(data)
1170
1171     @_FileLikeObjectBase._before_close
1172     @retry_method
1173     def writelines(self, seq, num_retries=None):
1174         for s in seq:
1175             self.write(s, num_retries=num_retries)
1176
1177     @_FileLikeObjectBase._before_close
1178     def truncate(self, size=None):
1179         if size is None:
1180             size = self._filepos
1181         self.arvadosfile.truncate(size)
1182         if self._filepos > self.size():
1183             self._filepos = self.size()
1184
1185     @_FileLikeObjectBase._before_close
1186     def flush(self):
1187         self.arvadosfile.flush()
1188
1189     def close(self, flush=True):
1190         if not self.closed:
1191             self.arvadosfile.remove_writer(self, flush)
1192             super(ArvadosFileWriter, self).close()