6473: include last_log_id when start_time argument is provided.
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21 WRITE = "write"
22
23 _logger = logging.getLogger('arvados.arvfile')
24
25 def split(path):
26     """split(path) -> streamname, filename
27
28     Separate the stream name and file name in a /-separated stream path and
29     return a tuple (stream_name, file_name).  If no stream name is available,
30     assume '.'.
31
32     """
33     try:
34         stream_name, file_name = path.rsplit('/', 1)
35     except ValueError:  # No / in string
36         stream_name, file_name = '.', path
37     return stream_name, file_name
38
39 class _FileLikeObjectBase(object):
40     def __init__(self, name, mode):
41         self.name = name
42         self.mode = mode
43         self.closed = False
44
45     @staticmethod
46     def _before_close(orig_func):
47         @functools.wraps(orig_func)
48         def before_close_wrapper(self, *args, **kwargs):
49             if self.closed:
50                 raise ValueError("I/O operation on closed stream file")
51             return orig_func(self, *args, **kwargs)
52         return before_close_wrapper
53
54     def __enter__(self):
55         return self
56
57     def __exit__(self, exc_type, exc_value, traceback):
58         try:
59             self.close()
60         except Exception:
61             if exc_type is None:
62                 raise
63
64     def close(self):
65         self.closed = True
66
67
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69     def __init__(self, name, mode, num_retries=None):
70         super(ArvadosFileReaderBase, self).__init__(name, mode)
71         self._filepos = 0L
72         self.num_retries = num_retries
73         self._readline_cache = (None, None)
74
75     def __iter__(self):
76         while True:
77             data = self.readline()
78             if not data:
79                 break
80             yield data
81
82     def decompressed_name(self):
83         return re.sub('\.(bz2|gz)$', '', self.name)
84
85     @_FileLikeObjectBase._before_close
86     def seek(self, pos, whence=os.SEEK_SET):
87         if whence == os.SEEK_CUR:
88             pos += self._filepos
89         elif whence == os.SEEK_END:
90             pos += self.size()
91         self._filepos = min(max(pos, 0L), self.size())
92
93     def tell(self):
94         return self._filepos
95
96     @_FileLikeObjectBase._before_close
97     @retry_method
98     def readall(self, size=2**20, num_retries=None):
99         while True:
100             data = self.read(size, num_retries=num_retries)
101             if data == '':
102                 break
103             yield data
104
105     @_FileLikeObjectBase._before_close
106     @retry_method
107     def readline(self, size=float('inf'), num_retries=None):
108         cache_pos, cache_data = self._readline_cache
109         if self.tell() == cache_pos:
110             data = [cache_data]
111         else:
112             data = ['']
113         data_size = len(data[-1])
114         while (data_size < size) and ('\n' not in data[-1]):
115             next_read = self.read(2 ** 20, num_retries=num_retries)
116             if not next_read:
117                 break
118             data.append(next_read)
119             data_size += len(next_read)
120         data = ''.join(data)
121         try:
122             nextline_index = data.index('\n') + 1
123         except ValueError:
124             nextline_index = len(data)
125         nextline_index = min(nextline_index, size)
126         self._readline_cache = (self.tell(), data[nextline_index:])
127         return data[:nextline_index]
128
129     @_FileLikeObjectBase._before_close
130     @retry_method
131     def decompress(self, decompress, size, num_retries=None):
132         for segment in self.readall(size, num_retries):
133             data = decompress(segment)
134             if data:
135                 yield data
136
137     @_FileLikeObjectBase._before_close
138     @retry_method
139     def readall_decompressed(self, size=2**20, num_retries=None):
140         self.seek(0)
141         if self.name.endswith('.bz2'):
142             dc = bz2.BZ2Decompressor()
143             return self.decompress(dc.decompress, size,
144                                    num_retries=num_retries)
145         elif self.name.endswith('.gz'):
146             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
147             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
148                                    size, num_retries=num_retries)
149         else:
150             return self.readall(size, num_retries=num_retries)
151
152     @_FileLikeObjectBase._before_close
153     @retry_method
154     def readlines(self, sizehint=float('inf'), num_retries=None):
155         data = []
156         data_size = 0
157         for s in self.readall(num_retries=num_retries):
158             data.append(s)
159             data_size += len(s)
160             if data_size >= sizehint:
161                 break
162         return ''.join(data).splitlines(True)
163
164     def size(self):
165         raise NotImplementedError()
166
167     def read(self, size, num_retries=None):
168         raise NotImplementedError()
169
170     def readfrom(self, start, size, num_retries=None):
171         raise NotImplementedError()
172
173
174 class StreamFileReader(ArvadosFileReaderBase):
175     class _NameAttribute(str):
176         # The Python file API provides a plain .name attribute.
177         # Older SDK provided a name() method.
178         # This class provides both, for maximum compatibility.
179         def __call__(self):
180             return self
181
182     def __init__(self, stream, segments, name):
183         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
184         self._stream = stream
185         self.segments = segments
186
187     def stream_name(self):
188         return self._stream.name()
189
190     def size(self):
191         n = self.segments[-1]
192         return n.range_start + n.range_size
193
194     @_FileLikeObjectBase._before_close
195     @retry_method
196     def read(self, size, num_retries=None):
197         """Read up to 'size' bytes from the stream, starting at the current file position"""
198         if size == 0:
199             return ''
200
201         data = ''
202         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
203         if available_chunks:
204             lr = available_chunks[0]
205             data = self._stream.readfrom(lr.locator+lr.segment_offset,
206                                           lr.segment_size,
207                                           num_retries=num_retries)
208
209         self._filepos += len(data)
210         return data
211
212     @_FileLikeObjectBase._before_close
213     @retry_method
214     def readfrom(self, start, size, num_retries=None):
215         """Read up to 'size' bytes from the stream, starting at 'start'"""
216         if size == 0:
217             return ''
218
219         data = []
220         for lr in locators_and_ranges(self.segments, start, size):
221             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
222                                               num_retries=num_retries))
223         return ''.join(data)
224
225     def as_manifest(self):
226         segs = []
227         for r in self.segments:
228             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
229         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
230
231
232 def synchronized(orig_func):
233     @functools.wraps(orig_func)
234     def synchronized_wrapper(self, *args, **kwargs):
235         with self.lock:
236             return orig_func(self, *args, **kwargs)
237     return synchronized_wrapper
238
239
240 class StateChangeError(Exception):
241     def __init__(self, message, state, nextstate):
242         super(StateChangeError, self).__init__(message)
243         self.state = state
244         self.nextstate = nextstate
245
246 class _BufferBlock(object):
247     """A stand-in for a Keep block that is in the process of being written.
248
249     Writers can append to it, get the size, and compute the Keep locator.
250     There are three valid states:
251
252     WRITABLE
253       Can append to block.
254
255     PENDING
256       Block is in the process of being uploaded to Keep, append is an error.
257
258     COMMITTED
259       The block has been written to Keep, its internal buffer has been
260       released, fetching the block will fetch it via keep client (since we
261       discarded the internal copy), and identifiers referring to the BufferBlock
262       can be replaced with the block locator.
263
264     """
265
266     WRITABLE = 0
267     PENDING = 1
268     COMMITTED = 2
269     ERROR = 3
270
271     def __init__(self, blockid, starting_capacity, owner):
272         """
273         :blockid:
274           the identifier for this block
275
276         :starting_capacity:
277           the initial buffer capacity
278
279         :owner:
280           ArvadosFile that owns this block
281
282         """
283         self.blockid = blockid
284         self.buffer_block = bytearray(starting_capacity)
285         self.buffer_view = memoryview(self.buffer_block)
286         self.write_pointer = 0
287         self._state = _BufferBlock.WRITABLE
288         self._locator = None
289         self.owner = owner
290         self.lock = threading.Lock()
291         self.wait_for_commit = threading.Event()
292         self.error = None
293
294     @synchronized
295     def append(self, data):
296         """Append some data to the buffer.
297
298         Only valid if the block is in WRITABLE state.  Implements an expanding
299         buffer, doubling capacity as needed to accomdate all the data.
300
301         """
302         if self._state == _BufferBlock.WRITABLE:
303             while (self.write_pointer+len(data)) > len(self.buffer_block):
304                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
305                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
306                 self.buffer_block = new_buffer_block
307                 self.buffer_view = memoryview(self.buffer_block)
308             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
309             self.write_pointer += len(data)
310             self._locator = None
311         else:
312             raise AssertionError("Buffer block is not writable")
313
314     STATE_TRANSITIONS = frozenset([
315             (WRITABLE, PENDING),
316             (PENDING, COMMITTED),
317             (PENDING, ERROR),
318             (ERROR, PENDING)])
319
320     @synchronized
321     def set_state(self, nextstate, val=None):
322         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
323             raise StateChangeError("Invalid state change from %s to %s" % (self.state, nextstate), self.state, nextstate)
324         self._state = nextstate
325
326         if self._state == _BufferBlock.PENDING:
327             self.wait_for_commit.clear()
328
329         if self._state == _BufferBlock.COMMITTED:
330             self._locator = val
331             self.buffer_view = None
332             self.buffer_block = None
333             self.wait_for_commit.set()
334
335         if self._state == _BufferBlock.ERROR:
336             self.error = val
337             self.wait_for_commit.set()
338
339     @synchronized
340     def state(self):
341         return self._state
342
343     def size(self):
344         """The amount of data written to the buffer."""
345         return self.write_pointer
346
347     @synchronized
348     def locator(self):
349         """The Keep locator for this buffer's contents."""
350         if self._locator is None:
351             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
352         return self._locator
353
354     @synchronized
355     def clone(self, new_blockid, owner):
356         if self._state == _BufferBlock.COMMITTED:
357             raise AssertionError("Cannot duplicate committed buffer block")
358         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
359         bufferblock.append(self.buffer_view[0:self.size()])
360         return bufferblock
361
362     @synchronized
363     def clear(self):
364         self.owner = None
365         self.buffer_block = None
366         self.buffer_view = None
367
368
369 class NoopLock(object):
370     def __enter__(self):
371         return self
372
373     def __exit__(self, exc_type, exc_value, traceback):
374         pass
375
376     def acquire(self, blocking=False):
377         pass
378
379     def release(self):
380         pass
381
382
383 def must_be_writable(orig_func):
384     @functools.wraps(orig_func)
385     def must_be_writable_wrapper(self, *args, **kwargs):
386         if not self.writable():
387             raise IOError(errno.EROFS, "Collection is read-only.")
388         return orig_func(self, *args, **kwargs)
389     return must_be_writable_wrapper
390
391
392 class _BlockManager(object):
393     """BlockManager handles buffer blocks.
394
395     Also handles background block uploads, and background block prefetch for a
396     Collection of ArvadosFiles.
397
398     """
399
400     DEFAULT_PUT_THREADS = 2
401     DEFAULT_GET_THREADS = 2
402
403     def __init__(self, keep):
404         """keep: KeepClient object to use"""
405         self._keep = keep
406         self._bufferblocks = {}
407         self._put_queue = None
408         self._put_threads = None
409         self._prefetch_queue = None
410         self._prefetch_threads = None
411         self.lock = threading.Lock()
412         self.prefetch_enabled = True
413         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
414         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
415
416     @synchronized
417     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
418         """Allocate a new, empty bufferblock in WRITABLE state and return it.
419
420         :blockid:
421           optional block identifier, otherwise one will be automatically assigned
422
423         :starting_capacity:
424           optional capacity, otherwise will use default capacity
425
426         :owner:
427           ArvadosFile that owns this block
428
429         """
430         if blockid is None:
431             blockid = "bufferblock%i" % len(self._bufferblocks)
432         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
433         self._bufferblocks[bufferblock.blockid] = bufferblock
434         return bufferblock
435
436     @synchronized
437     def dup_block(self, block, owner):
438         """Create a new bufferblock initialized with the content of an existing bufferblock.
439
440         :block:
441           the buffer block to copy.
442
443         :owner:
444           ArvadosFile that owns the new block
445
446         """
447         new_blockid = "bufferblock%i" % len(self._bufferblocks)
448         bufferblock = block.clone(new_blockid, owner)
449         self._bufferblocks[bufferblock.blockid] = bufferblock
450         return bufferblock
451
452     @synchronized
453     def is_bufferblock(self, locator):
454         return locator in self._bufferblocks
455
456     def _commit_bufferblock_worker(self):
457         """Background uploader thread."""
458
459         while True:
460             try:
461                 bufferblock = self._put_queue.get()
462                 if bufferblock is None:
463                     return
464
465                 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
466                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
467
468             except Exception as e:
469                 bufferblock.set_state(_BufferBlock.ERROR, e)
470             finally:
471                 if self._put_queue is not None:
472                     self._put_queue.task_done()
473
474     @synchronized
475     def start_put_threads(self):
476         if self._put_threads is None:
477             # Start uploader threads.
478
479             # If we don't limit the Queue size, the upload queue can quickly
480             # grow to take up gigabytes of RAM if the writing process is
481             # generating data more quickly than it can be send to the Keep
482             # servers.
483             #
484             # With two upload threads and a queue size of 2, this means up to 4
485             # blocks pending.  If they are full 64 MiB blocks, that means up to
486             # 256 MiB of internal buffering, which is the same size as the
487             # default download block cache in KeepClient.
488             self._put_queue = Queue.Queue(maxsize=2)
489
490             self._put_threads = []
491             for i in xrange(0, self.num_put_threads):
492                 thread = threading.Thread(target=self._commit_bufferblock_worker)
493                 self._put_threads.append(thread)
494                 thread.daemon = False
495                 thread.start()
496
497     def _block_prefetch_worker(self):
498         """The background downloader thread."""
499         while True:
500             try:
501                 b = self._prefetch_queue.get()
502                 if b is None:
503                     return
504                 self._keep.get(b)
505             except Exception:
506                 pass
507
508     @synchronized
509     def start_get_threads(self):
510         if self._prefetch_threads is None:
511             self._prefetch_queue = Queue.Queue()
512             self._prefetch_threads = []
513             for i in xrange(0, self.num_get_threads):
514                 thread = threading.Thread(target=self._block_prefetch_worker)
515                 self._prefetch_threads.append(thread)
516                 thread.daemon = True
517                 thread.start()
518
519
520     @synchronized
521     def stop_threads(self):
522         """Shut down and wait for background upload and download threads to finish."""
523
524         if self._put_threads is not None:
525             for t in self._put_threads:
526                 self._put_queue.put(None)
527             for t in self._put_threads:
528                 t.join()
529         self._put_threads = None
530         self._put_queue = None
531
532         if self._prefetch_threads is not None:
533             for t in self._prefetch_threads:
534                 self._prefetch_queue.put(None)
535             for t in self._prefetch_threads:
536                 t.join()
537         self._prefetch_threads = None
538         self._prefetch_queue = None
539
540     def __enter__(self):
541         return self
542
543     def __exit__(self, exc_type, exc_value, traceback):
544         self.stop_threads()
545
546     def __del__(self):
547         self.stop_threads()
548
549     def commit_bufferblock(self, block, sync):
550         """Initiate a background upload of a bufferblock.
551
552         :block:
553           The block object to upload
554
555         :sync:
556           If `sync` is True, upload the block synchronously.
557           If `sync` is False, upload the block asynchronously.  This will
558           return immediately unless the upload queue is at capacity, in
559           which case it will wait on an upload queue slot.
560
561         """
562
563         try:
564             # Mark the block as PENDING so to disallow any more appends.
565             block.set_state(_BufferBlock.PENDING)
566         except StateChangeError as e:
567             if e.state == _BufferBlock.PENDING and sync:
568                 block.wait_for_commit.wait()
569                 if block.state() == _BufferBlock.ERROR:
570                     raise block.error
571             return
572
573         if sync:
574             try:
575                 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
576                 block.set_state(_BufferBlock.COMMITTED, loc)
577             except Exception as e:
578                 block.set_state(_BufferBlock.ERROR, e)
579                 raise
580         else:
581             self.start_put_threads()
582             self._put_queue.put(block)
583
584     @synchronized
585     def get_bufferblock(self, locator):
586         return self._bufferblocks.get(locator)
587
588     @synchronized
589     def delete_bufferblock(self, locator):
590         bb = self._bufferblocks[locator]
591         bb.clear()
592         del self._bufferblocks[locator]
593
594     def get_block_contents(self, locator, num_retries, cache_only=False):
595         """Fetch a block.
596
597         First checks to see if the locator is a BufferBlock and return that, if
598         not, passes the request through to KeepClient.get().
599
600         """
601         with self.lock:
602             if locator in self._bufferblocks:
603                 bufferblock = self._bufferblocks[locator]
604                 if bufferblock.state() != _BufferBlock.COMMITTED:
605                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
606                 else:
607                     locator = bufferblock._locator
608         if cache_only:
609             return self._keep.get_from_cache(locator)
610         else:
611             return self._keep.get(locator, num_retries=num_retries)
612
613     def commit_all(self):
614         """Commit all outstanding buffer blocks.
615
616         This is a synchronous call, and will not return until all buffer blocks
617         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
618
619         """
620         with self.lock:
621             items = self._bufferblocks.items()
622
623         for k,v in items:
624             if v.state() != _BufferBlock.COMMITTED:
625                 v.owner.flush(sync=False)
626
627         with self.lock:
628             if self._put_queue is not None:
629                 self._put_queue.join()
630
631                 err = []
632                 for k,v in items:
633                     if v.state() == _BufferBlock.ERROR:
634                         err.append((v.locator(), v.error))
635                 if err:
636                     raise KeepWriteError("Error writing some blocks", err, label="block")
637
638         for k,v in items:
639             # flush again with sync=True to remove committed bufferblocks from
640             # the segments.
641             if v.owner:
642                 v.owner.flush(sync=True)
643
644     def block_prefetch(self, locator):
645         """Initiate a background download of a block.
646
647         This assumes that the underlying KeepClient implements a block cache,
648         so repeated requests for the same block will not result in repeated
649         downloads (unless the block is evicted from the cache.)  This method
650         does not block.
651
652         """
653
654         if not self.prefetch_enabled:
655             return
656
657         if self._keep.get_from_cache(locator) is not None:
658             return
659
660         with self.lock:
661             if locator in self._bufferblocks:
662                 return
663
664         self.start_get_threads()
665         self._prefetch_queue.put(locator)
666
667
668 class ArvadosFile(object):
669     """Represent a file in a Collection.
670
671     ArvadosFile manages the underlying representation of a file in Keep as a
672     sequence of segments spanning a set of blocks, and implements random
673     read/write access.
674
675     This object may be accessed from multiple threads.
676
677     """
678
679     def __init__(self, parent, name, stream=[], segments=[]):
680         """
681         ArvadosFile constructor.
682
683         :stream:
684           a list of Range objects representing a block stream
685
686         :segments:
687           a list of Range objects representing segments
688         """
689         self.parent = parent
690         self.name = name
691         self._committed = False
692         self._segments = []
693         self.lock = parent.root_collection().lock
694         for s in segments:
695             self._add_segment(stream, s.locator, s.range_size)
696         self._current_bblock = None
697
698     def writable(self):
699         return self.parent.writable()
700
701     @synchronized
702     def segments(self):
703         return copy.copy(self._segments)
704
705     @synchronized
706     def clone(self, new_parent, new_name):
707         """Make a copy of this file."""
708         cp = ArvadosFile(new_parent, new_name)
709         cp.replace_contents(self)
710         return cp
711
712     @must_be_writable
713     @synchronized
714     def replace_contents(self, other):
715         """Replace segments of this file with segments from another `ArvadosFile` object."""
716
717         map_loc = {}
718         self._segments = []
719         for other_segment in other.segments():
720             new_loc = other_segment.locator
721             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
722                 if other_segment.locator not in map_loc:
723                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
724                     if bufferblock.state() != _BufferBlock.WRITABLE:
725                         map_loc[other_segment.locator] = bufferblock.locator()
726                     else:
727                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
728                 new_loc = map_loc[other_segment.locator]
729
730             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
731
732         self._committed = False
733
734     def __eq__(self, other):
735         if other is self:
736             return True
737         if not isinstance(other, ArvadosFile):
738             return False
739
740         othersegs = other.segments()
741         with self.lock:
742             if len(self._segments) != len(othersegs):
743                 return False
744             for i in xrange(0, len(othersegs)):
745                 seg1 = self._segments[i]
746                 seg2 = othersegs[i]
747                 loc1 = seg1.locator
748                 loc2 = seg2.locator
749
750                 if self.parent._my_block_manager().is_bufferblock(loc1):
751                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
752
753                 if other.parent._my_block_manager().is_bufferblock(loc2):
754                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
755
756                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
757                     seg1.range_start != seg2.range_start or
758                     seg1.range_size != seg2.range_size or
759                     seg1.segment_offset != seg2.segment_offset):
760                     return False
761
762         return True
763
764     def __ne__(self, other):
765         return not self.__eq__(other)
766
767     @synchronized
768     def set_committed(self):
769         """Set committed flag to False"""
770         self._committed = True
771
772     @synchronized
773     def committed(self):
774         """Get whether this is committed or not."""
775         return self._committed
776
777     @must_be_writable
778     @synchronized
779     def truncate(self, size):
780         """Shrink the size of the file.
781
782         If `size` is less than the size of the file, the file contents after
783         `size` will be discarded.  If `size` is greater than the current size
784         of the file, an IOError will be raised.
785
786         """
787         if size < self.size():
788             new_segs = []
789             for r in self._segments:
790                 range_end = r.range_start+r.range_size
791                 if r.range_start >= size:
792                     # segment is past the trucate size, all done
793                     break
794                 elif size < range_end:
795                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
796                     nr.segment_offset = r.segment_offset
797                     new_segs.append(nr)
798                     break
799                 else:
800                     new_segs.append(r)
801
802             self._segments = new_segs
803             self._committed = False
804         elif size > self.size():
805             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
806
807     def readfrom(self, offset, size, num_retries, exact=False):
808         """Read up to `size` bytes from the file starting at `offset`.
809
810         :exact:
811          If False (default), return less data than requested if the read
812          crosses a block boundary and the next block isn't cached.  If True,
813          only return less data than requested when hitting EOF.
814         """
815
816         with self.lock:
817             if size == 0 or offset >= self.size():
818                 return ''
819             readsegs = locators_and_ranges(self._segments, offset, size)
820             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
821
822         locs = set()
823         data = []
824         for lr in readsegs:
825             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
826             if block:
827                 blockview = memoryview(block)
828                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
829                 locs.add(lr.locator)
830             else:
831                 break
832
833         for lr in prefetch:
834             if lr.locator not in locs:
835                 self.parent._my_block_manager().block_prefetch(lr.locator)
836                 locs.add(lr.locator)
837
838         return ''.join(data)
839
840     def _repack_writes(self, num_retries):
841         """Test if the buffer block has more data than actual segments.
842
843         This happens when a buffered write over-writes a file range written in
844         a previous buffered write.  Re-pack the buffer block for efficiency
845         and to avoid leaking information.
846
847         """
848         segs = self._segments
849
850         # Sum up the segments to get the total bytes of the file referencing
851         # into the buffer block.
852         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
853         write_total = sum([s.range_size for s in bufferblock_segs])
854
855         if write_total < self._current_bblock.size():
856             # There is more data in the buffer block than is actually accounted for by segments, so
857             # re-pack into a new buffer by copying over to a new buffer block.
858             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
859             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
860             for t in bufferblock_segs:
861                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
862                 t.segment_offset = new_bb.size() - t.range_size
863
864             self._current_bblock = new_bb
865
866     @must_be_writable
867     @synchronized
868     def writeto(self, offset, data, num_retries):
869         """Write `data` to the file starting at `offset`.
870
871         This will update existing bytes and/or extend the size of the file as
872         necessary.
873
874         """
875         if len(data) == 0:
876             return
877
878         if offset > self.size():
879             raise ArgumentError("Offset is past the end of the file")
880
881         if len(data) > config.KEEP_BLOCK_SIZE:
882             # Chunk it up into smaller writes
883             n = 0
884             dataview = memoryview(data)
885             while n < len(data):
886                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
887                 n += config.KEEP_BLOCK_SIZE
888             return
889
890         self._committed = False
891
892         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
893             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
894
895         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
896             self._repack_writes(num_retries)
897             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
898                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
899                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
900
901         self._current_bblock.append(data)
902
903         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
904
905         self.parent.notify(WRITE, self.parent, self.name, (self, self))
906
907         return len(data)
908
909     @synchronized
910     def flush(self, sync=True, num_retries=0):
911         """Flush the current bufferblock to Keep.
912
913         :sync:
914           If True, commit block synchronously, wait until buffer block has been written.
915           If False, commit block asynchronously, return immediately after putting block into
916           the keep put queue.
917         """
918         if self.committed():
919             return
920
921         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
922             if self._current_bblock.state() == _BufferBlock.WRITABLE:
923                 self._repack_writes(num_retries)
924             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
925
926         if sync:
927             to_delete = set()
928             for s in self._segments:
929                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
930                 if bb:
931                     if bb.state() != _BufferBlock.COMMITTED:
932                         self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=True)
933                     to_delete.add(s.locator)
934                     s.locator = bb.locator()
935             for s in to_delete:
936                self.parent._my_block_manager().delete_bufferblock(s)
937
938         self.parent.notify(MOD, self.parent, self.name, (self, self))
939
940     @must_be_writable
941     @synchronized
942     def add_segment(self, blocks, pos, size):
943         """Add a segment to the end of the file.
944
945         `pos` and `offset` reference a section of the stream described by
946         `blocks` (a list of Range objects)
947
948         """
949         self._add_segment(blocks, pos, size)
950
951     def _add_segment(self, blocks, pos, size):
952         """Internal implementation of add_segment."""
953         self._committed = False
954         for lr in locators_and_ranges(blocks, pos, size):
955             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
956             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
957             self._segments.append(r)
958
959     @synchronized
960     def size(self):
961         """Get the file size."""
962         if self._segments:
963             n = self._segments[-1]
964             return n.range_start + n.range_size
965         else:
966             return 0
967
968     @synchronized
969     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
970         buf = ""
971         filestream = []
972         for segment in self.segments:
973             loc = segment.locator
974             if loc.startswith("bufferblock"):
975                 loc = self._bufferblocks[loc].calculate_locator()
976             if portable_locators:
977                 loc = KeepLocator(loc).stripped()
978             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
979                                  segment.segment_offset, segment.range_size))
980         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
981         buf += "\n"
982         return buf
983
984     @must_be_writable
985     @synchronized
986     def _reparent(self, newparent, newname):
987         self._committed = False
988         self.flush(sync=True)
989         self.parent.remove(self.name)
990         self.parent = newparent
991         self.name = newname
992         self.lock = self.parent.root_collection().lock
993
994
995 class ArvadosFileReader(ArvadosFileReaderBase):
996     """Wraps ArvadosFile in a file-like object supporting reading only.
997
998     Be aware that this class is NOT thread safe as there is no locking around
999     updating file pointer.
1000
1001     """
1002
1003     def __init__(self, arvadosfile, num_retries=None):
1004         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1005         self.arvadosfile = arvadosfile
1006
1007     def size(self):
1008         return self.arvadosfile.size()
1009
1010     def stream_name(self):
1011         return self.arvadosfile.parent.stream_name()
1012
1013     @_FileLikeObjectBase._before_close
1014     @retry_method
1015     def read(self, size=None, num_retries=None):
1016         """Read up to `size` bytes from the file and return the result.
1017
1018         Starts at the current file position.  If `size` is None, read the
1019         entire remainder of the file.
1020         """
1021         if size is None:
1022             data = []
1023             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1024             while rd:
1025                 data.append(rd)
1026                 self._filepos += len(rd)
1027                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1028             return ''.join(data)
1029         else:
1030             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1031             self._filepos += len(data)
1032             return data
1033
1034     @_FileLikeObjectBase._before_close
1035     @retry_method
1036     def readfrom(self, offset, size, num_retries=None):
1037         """Read up to `size` bytes from the stream, starting at the specified file offset.
1038
1039         This method does not change the file position.
1040         """
1041         return self.arvadosfile.readfrom(offset, size, num_retries)
1042
1043     def flush(self):
1044         pass
1045
1046
1047 class ArvadosFileWriter(ArvadosFileReader):
1048     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1049
1050     Be aware that this class is NOT thread safe as there is no locking around
1051     updating file pointer.
1052
1053     """
1054
1055     def __init__(self, arvadosfile, mode, num_retries=None):
1056         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1057         self.mode = mode
1058
1059     @_FileLikeObjectBase._before_close
1060     @retry_method
1061     def write(self, data, num_retries=None):
1062         if self.mode[0] == "a":
1063             self.arvadosfile.writeto(self.size(), data, num_retries)
1064         else:
1065             self.arvadosfile.writeto(self._filepos, data, num_retries)
1066             self._filepos += len(data)
1067         return len(data)
1068
1069     @_FileLikeObjectBase._before_close
1070     @retry_method
1071     def writelines(self, seq, num_retries=None):
1072         for s in seq:
1073             self.write(s, num_retries)
1074
1075     @_FileLikeObjectBase._before_close
1076     def truncate(self, size=None):
1077         if size is None:
1078             size = self._filepos
1079         self.arvadosfile.truncate(size)
1080         if self._filepos > self.size():
1081             self._filepos = self.size()
1082
1083     @_FileLikeObjectBase._before_close
1084     def flush(self):
1085         self.arvadosfile.flush()
1086
1087     def close(self):
1088         if not self.closed:
1089             self.flush()
1090             super(ArvadosFileWriter, self).close()