Merge branch '9954-keep-balance-systemd' closes #9954
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21 WRITE = "write"
22
23 _logger = logging.getLogger('arvados.arvfile')
24
25 def split(path):
26     """split(path) -> streamname, filename
27
28     Separate the stream name and file name in a /-separated stream path and
29     return a tuple (stream_name, file_name).  If no stream name is available,
30     assume '.'.
31
32     """
33     try:
34         stream_name, file_name = path.rsplit('/', 1)
35     except ValueError:  # No / in string
36         stream_name, file_name = '.', path
37     return stream_name, file_name
38
39 class _FileLikeObjectBase(object):
40     def __init__(self, name, mode):
41         self.name = name
42         self.mode = mode
43         self.closed = False
44
45     @staticmethod
46     def _before_close(orig_func):
47         @functools.wraps(orig_func)
48         def before_close_wrapper(self, *args, **kwargs):
49             if self.closed:
50                 raise ValueError("I/O operation on closed stream file")
51             return orig_func(self, *args, **kwargs)
52         return before_close_wrapper
53
54     def __enter__(self):
55         return self
56
57     def __exit__(self, exc_type, exc_value, traceback):
58         try:
59             self.close()
60         except Exception:
61             if exc_type is None:
62                 raise
63
64     def close(self):
65         self.closed = True
66
67
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69     def __init__(self, name, mode, num_retries=None):
70         super(ArvadosFileReaderBase, self).__init__(name, mode)
71         self._filepos = 0L
72         self.num_retries = num_retries
73         self._readline_cache = (None, None)
74
75     def __iter__(self):
76         while True:
77             data = self.readline()
78             if not data:
79                 break
80             yield data
81
82     def decompressed_name(self):
83         return re.sub('\.(bz2|gz)$', '', self.name)
84
85     @_FileLikeObjectBase._before_close
86     def seek(self, pos, whence=os.SEEK_SET):
87         if whence == os.SEEK_CUR:
88             pos += self._filepos
89         elif whence == os.SEEK_END:
90             pos += self.size()
91         self._filepos = min(max(pos, 0L), self.size())
92
93     def tell(self):
94         return self._filepos
95
96     @_FileLikeObjectBase._before_close
97     @retry_method
98     def readall(self, size=2**20, num_retries=None):
99         while True:
100             data = self.read(size, num_retries=num_retries)
101             if data == '':
102                 break
103             yield data
104
105     @_FileLikeObjectBase._before_close
106     @retry_method
107     def readline(self, size=float('inf'), num_retries=None):
108         cache_pos, cache_data = self._readline_cache
109         if self.tell() == cache_pos:
110             data = [cache_data]
111             self._filepos += len(cache_data)
112         else:
113             data = ['']
114         data_size = len(data[-1])
115         while (data_size < size) and ('\n' not in data[-1]):
116             next_read = self.read(2 ** 20, num_retries=num_retries)
117             if not next_read:
118                 break
119             data.append(next_read)
120             data_size += len(next_read)
121         data = ''.join(data)
122         try:
123             nextline_index = data.index('\n') + 1
124         except ValueError:
125             nextline_index = len(data)
126         nextline_index = min(nextline_index, size)
127         self._filepos -= len(data) - nextline_index
128         self._readline_cache = (self.tell(), data[nextline_index:])
129         return data[:nextline_index]
130
131     @_FileLikeObjectBase._before_close
132     @retry_method
133     def decompress(self, decompress, size, num_retries=None):
134         for segment in self.readall(size, num_retries=num_retries):
135             data = decompress(segment)
136             if data:
137                 yield data
138
139     @_FileLikeObjectBase._before_close
140     @retry_method
141     def readall_decompressed(self, size=2**20, num_retries=None):
142         self.seek(0)
143         if self.name.endswith('.bz2'):
144             dc = bz2.BZ2Decompressor()
145             return self.decompress(dc.decompress, size,
146                                    num_retries=num_retries)
147         elif self.name.endswith('.gz'):
148             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
149             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
150                                    size, num_retries=num_retries)
151         else:
152             return self.readall(size, num_retries=num_retries)
153
154     @_FileLikeObjectBase._before_close
155     @retry_method
156     def readlines(self, sizehint=float('inf'), num_retries=None):
157         data = []
158         data_size = 0
159         for s in self.readall(num_retries=num_retries):
160             data.append(s)
161             data_size += len(s)
162             if data_size >= sizehint:
163                 break
164         return ''.join(data).splitlines(True)
165
166     def size(self):
167         raise NotImplementedError()
168
169     def read(self, size, num_retries=None):
170         raise NotImplementedError()
171
172     def readfrom(self, start, size, num_retries=None):
173         raise NotImplementedError()
174
175
176 class StreamFileReader(ArvadosFileReaderBase):
177     class _NameAttribute(str):
178         # The Python file API provides a plain .name attribute.
179         # Older SDK provided a name() method.
180         # This class provides both, for maximum compatibility.
181         def __call__(self):
182             return self
183
184     def __init__(self, stream, segments, name):
185         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
186         self._stream = stream
187         self.segments = segments
188
189     def stream_name(self):
190         return self._stream.name()
191
192     def size(self):
193         n = self.segments[-1]
194         return n.range_start + n.range_size
195
196     @_FileLikeObjectBase._before_close
197     @retry_method
198     def read(self, size, num_retries=None):
199         """Read up to 'size' bytes from the stream, starting at the current file position"""
200         if size == 0:
201             return ''
202
203         data = ''
204         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
205         if available_chunks:
206             lr = available_chunks[0]
207             data = self._stream.readfrom(lr.locator+lr.segment_offset,
208                                           lr.segment_size,
209                                           num_retries=num_retries)
210
211         self._filepos += len(data)
212         return data
213
214     @_FileLikeObjectBase._before_close
215     @retry_method
216     def readfrom(self, start, size, num_retries=None):
217         """Read up to 'size' bytes from the stream, starting at 'start'"""
218         if size == 0:
219             return ''
220
221         data = []
222         for lr in locators_and_ranges(self.segments, start, size):
223             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
224                                               num_retries=num_retries))
225         return ''.join(data)
226
227     def as_manifest(self):
228         segs = []
229         for r in self.segments:
230             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
231         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
232
233
234 def synchronized(orig_func):
235     @functools.wraps(orig_func)
236     def synchronized_wrapper(self, *args, **kwargs):
237         with self.lock:
238             return orig_func(self, *args, **kwargs)
239     return synchronized_wrapper
240
241
242 class StateChangeError(Exception):
243     def __init__(self, message, state, nextstate):
244         super(StateChangeError, self).__init__(message)
245         self.state = state
246         self.nextstate = nextstate
247
248 class _BufferBlock(object):
249     """A stand-in for a Keep block that is in the process of being written.
250
251     Writers can append to it, get the size, and compute the Keep locator.
252     There are three valid states:
253
254     WRITABLE
255       Can append to block.
256
257     PENDING
258       Block is in the process of being uploaded to Keep, append is an error.
259
260     COMMITTED
261       The block has been written to Keep, its internal buffer has been
262       released, fetching the block will fetch it via keep client (since we
263       discarded the internal copy), and identifiers referring to the BufferBlock
264       can be replaced with the block locator.
265
266     """
267
268     WRITABLE = 0
269     PENDING = 1
270     COMMITTED = 2
271     ERROR = 3
272
273     def __init__(self, blockid, starting_capacity, owner):
274         """
275         :blockid:
276           the identifier for this block
277
278         :starting_capacity:
279           the initial buffer capacity
280
281         :owner:
282           ArvadosFile that owns this block
283
284         """
285         self.blockid = blockid
286         self.buffer_block = bytearray(starting_capacity)
287         self.buffer_view = memoryview(self.buffer_block)
288         self.write_pointer = 0
289         self._state = _BufferBlock.WRITABLE
290         self._locator = None
291         self.owner = owner
292         self.lock = threading.Lock()
293         self.wait_for_commit = threading.Event()
294         self.error = None
295
296     @synchronized
297     def append(self, data):
298         """Append some data to the buffer.
299
300         Only valid if the block is in WRITABLE state.  Implements an expanding
301         buffer, doubling capacity as needed to accomdate all the data.
302
303         """
304         if self._state == _BufferBlock.WRITABLE:
305             while (self.write_pointer+len(data)) > len(self.buffer_block):
306                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
307                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
308                 self.buffer_block = new_buffer_block
309                 self.buffer_view = memoryview(self.buffer_block)
310             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
311             self.write_pointer += len(data)
312             self._locator = None
313         else:
314             raise AssertionError("Buffer block is not writable")
315
316     STATE_TRANSITIONS = frozenset([
317             (WRITABLE, PENDING),
318             (PENDING, COMMITTED),
319             (PENDING, ERROR),
320             (ERROR, PENDING)])
321
322     @synchronized
323     def set_state(self, nextstate, val=None):
324         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
325             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
326         self._state = nextstate
327
328         if self._state == _BufferBlock.PENDING:
329             self.wait_for_commit.clear()
330
331         if self._state == _BufferBlock.COMMITTED:
332             self._locator = val
333             self.buffer_view = None
334             self.buffer_block = None
335             self.wait_for_commit.set()
336
337         if self._state == _BufferBlock.ERROR:
338             self.error = val
339             self.wait_for_commit.set()
340
341     @synchronized
342     def state(self):
343         return self._state
344
345     def size(self):
346         """The amount of data written to the buffer."""
347         return self.write_pointer
348
349     @synchronized
350     def locator(self):
351         """The Keep locator for this buffer's contents."""
352         if self._locator is None:
353             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
354         return self._locator
355
356     @synchronized
357     def clone(self, new_blockid, owner):
358         if self._state == _BufferBlock.COMMITTED:
359             raise AssertionError("Cannot duplicate committed buffer block")
360         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
361         bufferblock.append(self.buffer_view[0:self.size()])
362         return bufferblock
363
364     @synchronized
365     def clear(self):
366         self.owner = None
367         self.buffer_block = None
368         self.buffer_view = None
369
370
371 class NoopLock(object):
372     def __enter__(self):
373         return self
374
375     def __exit__(self, exc_type, exc_value, traceback):
376         pass
377
378     def acquire(self, blocking=False):
379         pass
380
381     def release(self):
382         pass
383
384
385 def must_be_writable(orig_func):
386     @functools.wraps(orig_func)
387     def must_be_writable_wrapper(self, *args, **kwargs):
388         if not self.writable():
389             raise IOError(errno.EROFS, "Collection is read-only.")
390         return orig_func(self, *args, **kwargs)
391     return must_be_writable_wrapper
392
393
394 class _BlockManager(object):
395     """BlockManager handles buffer blocks.
396
397     Also handles background block uploads, and background block prefetch for a
398     Collection of ArvadosFiles.
399
400     """
401
402     DEFAULT_PUT_THREADS = 2
403     DEFAULT_GET_THREADS = 2
404
405     def __init__(self, keep, copies=None):
406         """keep: KeepClient object to use"""
407         self._keep = keep
408         self._bufferblocks = {}
409         self._put_queue = None
410         self._put_threads = None
411         self._prefetch_queue = None
412         self._prefetch_threads = None
413         self.lock = threading.Lock()
414         self.prefetch_enabled = True
415         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
416         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
417         self.copies = copies
418
419     @synchronized
420     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
421         """Allocate a new, empty bufferblock in WRITABLE state and return it.
422
423         :blockid:
424           optional block identifier, otherwise one will be automatically assigned
425
426         :starting_capacity:
427           optional capacity, otherwise will use default capacity
428
429         :owner:
430           ArvadosFile that owns this block
431
432         """
433         if blockid is None:
434             blockid = "bufferblock%i" % len(self._bufferblocks)
435         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
436         self._bufferblocks[bufferblock.blockid] = bufferblock
437         return bufferblock
438
439     @synchronized
440     def dup_block(self, block, owner):
441         """Create a new bufferblock initialized with the content of an existing bufferblock.
442
443         :block:
444           the buffer block to copy.
445
446         :owner:
447           ArvadosFile that owns the new block
448
449         """
450         new_blockid = "bufferblock%i" % len(self._bufferblocks)
451         bufferblock = block.clone(new_blockid, owner)
452         self._bufferblocks[bufferblock.blockid] = bufferblock
453         return bufferblock
454
455     @synchronized
456     def is_bufferblock(self, locator):
457         return locator in self._bufferblocks
458
459     def _commit_bufferblock_worker(self):
460         """Background uploader thread."""
461
462         while True:
463             try:
464                 bufferblock = self._put_queue.get()
465                 if bufferblock is None:
466                     return
467
468                 if self.copies is None:
469                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
470                 else:
471                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
472                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
473
474             except Exception as e:
475                 bufferblock.set_state(_BufferBlock.ERROR, e)
476             finally:
477                 if self._put_queue is not None:
478                     self._put_queue.task_done()
479
480     @synchronized
481     def start_put_threads(self):
482         if self._put_threads is None:
483             # Start uploader threads.
484
485             # If we don't limit the Queue size, the upload queue can quickly
486             # grow to take up gigabytes of RAM if the writing process is
487             # generating data more quickly than it can be send to the Keep
488             # servers.
489             #
490             # With two upload threads and a queue size of 2, this means up to 4
491             # blocks pending.  If they are full 64 MiB blocks, that means up to
492             # 256 MiB of internal buffering, which is the same size as the
493             # default download block cache in KeepClient.
494             self._put_queue = Queue.Queue(maxsize=2)
495
496             self._put_threads = []
497             for i in xrange(0, self.num_put_threads):
498                 thread = threading.Thread(target=self._commit_bufferblock_worker)
499                 self._put_threads.append(thread)
500                 thread.daemon = True
501                 thread.start()
502
503     def _block_prefetch_worker(self):
504         """The background downloader thread."""
505         while True:
506             try:
507                 b = self._prefetch_queue.get()
508                 if b is None:
509                     return
510                 self._keep.get(b)
511             except Exception:
512                 pass
513
514     @synchronized
515     def start_get_threads(self):
516         if self._prefetch_threads is None:
517             self._prefetch_queue = Queue.Queue()
518             self._prefetch_threads = []
519             for i in xrange(0, self.num_get_threads):
520                 thread = threading.Thread(target=self._block_prefetch_worker)
521                 self._prefetch_threads.append(thread)
522                 thread.daemon = True
523                 thread.start()
524
525
526     @synchronized
527     def stop_threads(self):
528         """Shut down and wait for background upload and download threads to finish."""
529
530         if self._put_threads is not None:
531             for t in self._put_threads:
532                 self._put_queue.put(None)
533             for t in self._put_threads:
534                 t.join()
535         self._put_threads = None
536         self._put_queue = None
537
538         if self._prefetch_threads is not None:
539             for t in self._prefetch_threads:
540                 self._prefetch_queue.put(None)
541             for t in self._prefetch_threads:
542                 t.join()
543         self._prefetch_threads = None
544         self._prefetch_queue = None
545
546     def __enter__(self):
547         return self
548
549     def __exit__(self, exc_type, exc_value, traceback):
550         self.stop_threads()
551
552     def commit_bufferblock(self, block, sync):
553         """Initiate a background upload of a bufferblock.
554
555         :block:
556           The block object to upload
557
558         :sync:
559           If `sync` is True, upload the block synchronously.
560           If `sync` is False, upload the block asynchronously.  This will
561           return immediately unless the upload queue is at capacity, in
562           which case it will wait on an upload queue slot.
563
564         """
565
566         try:
567             # Mark the block as PENDING so to disallow any more appends.
568             block.set_state(_BufferBlock.PENDING)
569         except StateChangeError as e:
570             if e.state == _BufferBlock.PENDING:
571                 if sync:
572                     block.wait_for_commit.wait()
573                 else:
574                     return
575             if block.state() == _BufferBlock.COMMITTED:
576                 return
577             elif block.state() == _BufferBlock.ERROR:
578                 raise block.error
579             else:
580                 raise
581
582         if sync:
583             try:
584                 if self.copies is None:
585                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
586                 else:
587                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
588                 block.set_state(_BufferBlock.COMMITTED, loc)
589             except Exception as e:
590                 block.set_state(_BufferBlock.ERROR, e)
591                 raise
592         else:
593             self.start_put_threads()
594             self._put_queue.put(block)
595
596     @synchronized
597     def get_bufferblock(self, locator):
598         return self._bufferblocks.get(locator)
599
600     @synchronized
601     def delete_bufferblock(self, locator):
602         bb = self._bufferblocks[locator]
603         bb.clear()
604         del self._bufferblocks[locator]
605
606     def get_block_contents(self, locator, num_retries, cache_only=False):
607         """Fetch a block.
608
609         First checks to see if the locator is a BufferBlock and return that, if
610         not, passes the request through to KeepClient.get().
611
612         """
613         with self.lock:
614             if locator in self._bufferblocks:
615                 bufferblock = self._bufferblocks[locator]
616                 if bufferblock.state() != _BufferBlock.COMMITTED:
617                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
618                 else:
619                     locator = bufferblock._locator
620         if cache_only:
621             return self._keep.get_from_cache(locator)
622         else:
623             return self._keep.get(locator, num_retries=num_retries)
624
625     def commit_all(self):
626         """Commit all outstanding buffer blocks.
627
628         This is a synchronous call, and will not return until all buffer blocks
629         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
630
631         """
632         with self.lock:
633             items = self._bufferblocks.items()
634
635         for k,v in items:
636             if v.state() != _BufferBlock.COMMITTED:
637                 v.owner.flush(sync=False)
638
639         with self.lock:
640             if self._put_queue is not None:
641                 self._put_queue.join()
642
643                 err = []
644                 for k,v in items:
645                     if v.state() == _BufferBlock.ERROR:
646                         err.append((v.locator(), v.error))
647                 if err:
648                     raise KeepWriteError("Error writing some blocks", err, label="block")
649
650         for k,v in items:
651             # flush again with sync=True to remove committed bufferblocks from
652             # the segments.
653             if v.owner:
654                 v.owner.flush(sync=True)
655
656     def block_prefetch(self, locator):
657         """Initiate a background download of a block.
658
659         This assumes that the underlying KeepClient implements a block cache,
660         so repeated requests for the same block will not result in repeated
661         downloads (unless the block is evicted from the cache.)  This method
662         does not block.
663
664         """
665
666         if not self.prefetch_enabled:
667             return
668
669         if self._keep.get_from_cache(locator) is not None:
670             return
671
672         with self.lock:
673             if locator in self._bufferblocks:
674                 return
675
676         self.start_get_threads()
677         self._prefetch_queue.put(locator)
678
679
680 class ArvadosFile(object):
681     """Represent a file in a Collection.
682
683     ArvadosFile manages the underlying representation of a file in Keep as a
684     sequence of segments spanning a set of blocks, and implements random
685     read/write access.
686
687     This object may be accessed from multiple threads.
688
689     """
690
691     def __init__(self, parent, name, stream=[], segments=[]):
692         """
693         ArvadosFile constructor.
694
695         :stream:
696           a list of Range objects representing a block stream
697
698         :segments:
699           a list of Range objects representing segments
700         """
701         self.parent = parent
702         self.name = name
703         self._committed = False
704         self._segments = []
705         self.lock = parent.root_collection().lock
706         for s in segments:
707             self._add_segment(stream, s.locator, s.range_size)
708         self._current_bblock = None
709
710     def writable(self):
711         return self.parent.writable()
712
713     @synchronized
714     def segments(self):
715         return copy.copy(self._segments)
716
717     @synchronized
718     def clone(self, new_parent, new_name):
719         """Make a copy of this file."""
720         cp = ArvadosFile(new_parent, new_name)
721         cp.replace_contents(self)
722         return cp
723
724     @must_be_writable
725     @synchronized
726     def replace_contents(self, other):
727         """Replace segments of this file with segments from another `ArvadosFile` object."""
728
729         map_loc = {}
730         self._segments = []
731         for other_segment in other.segments():
732             new_loc = other_segment.locator
733             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
734                 if other_segment.locator not in map_loc:
735                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
736                     if bufferblock.state() != _BufferBlock.WRITABLE:
737                         map_loc[other_segment.locator] = bufferblock.locator()
738                     else:
739                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
740                 new_loc = map_loc[other_segment.locator]
741
742             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
743
744         self._committed = False
745
746     def __eq__(self, other):
747         if other is self:
748             return True
749         if not isinstance(other, ArvadosFile):
750             return False
751
752         othersegs = other.segments()
753         with self.lock:
754             if len(self._segments) != len(othersegs):
755                 return False
756             for i in xrange(0, len(othersegs)):
757                 seg1 = self._segments[i]
758                 seg2 = othersegs[i]
759                 loc1 = seg1.locator
760                 loc2 = seg2.locator
761
762                 if self.parent._my_block_manager().is_bufferblock(loc1):
763                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
764
765                 if other.parent._my_block_manager().is_bufferblock(loc2):
766                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
767
768                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
769                     seg1.range_start != seg2.range_start or
770                     seg1.range_size != seg2.range_size or
771                     seg1.segment_offset != seg2.segment_offset):
772                     return False
773
774         return True
775
776     def __ne__(self, other):
777         return not self.__eq__(other)
778
779     @synchronized
780     def set_committed(self):
781         """Set committed flag to False"""
782         self._committed = True
783
784     @synchronized
785     def committed(self):
786         """Get whether this is committed or not."""
787         return self._committed
788
789     @must_be_writable
790     @synchronized
791     def truncate(self, size):
792         """Shrink the size of the file.
793
794         If `size` is less than the size of the file, the file contents after
795         `size` will be discarded.  If `size` is greater than the current size
796         of the file, an IOError will be raised.
797
798         """
799         if size < self.size():
800             new_segs = []
801             for r in self._segments:
802                 range_end = r.range_start+r.range_size
803                 if r.range_start >= size:
804                     # segment is past the trucate size, all done
805                     break
806                 elif size < range_end:
807                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
808                     nr.segment_offset = r.segment_offset
809                     new_segs.append(nr)
810                     break
811                 else:
812                     new_segs.append(r)
813
814             self._segments = new_segs
815             self._committed = False
816         elif size > self.size():
817             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
818
819     def readfrom(self, offset, size, num_retries, exact=False):
820         """Read up to `size` bytes from the file starting at `offset`.
821
822         :exact:
823          If False (default), return less data than requested if the read
824          crosses a block boundary and the next block isn't cached.  If True,
825          only return less data than requested when hitting EOF.
826         """
827
828         with self.lock:
829             if size == 0 or offset >= self.size():
830                 return ''
831             readsegs = locators_and_ranges(self._segments, offset, size)
832             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
833
834         locs = set()
835         data = []
836         for lr in readsegs:
837             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
838             if block:
839                 blockview = memoryview(block)
840                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
841                 locs.add(lr.locator)
842             else:
843                 break
844
845         for lr in prefetch:
846             if lr.locator not in locs:
847                 self.parent._my_block_manager().block_prefetch(lr.locator)
848                 locs.add(lr.locator)
849
850         return ''.join(data)
851
852     def _repack_writes(self, num_retries):
853         """Test if the buffer block has more data than actual segments.
854
855         This happens when a buffered write over-writes a file range written in
856         a previous buffered write.  Re-pack the buffer block for efficiency
857         and to avoid leaking information.
858
859         """
860         segs = self._segments
861
862         # Sum up the segments to get the total bytes of the file referencing
863         # into the buffer block.
864         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
865         write_total = sum([s.range_size for s in bufferblock_segs])
866
867         if write_total < self._current_bblock.size():
868             # There is more data in the buffer block than is actually accounted for by segments, so
869             # re-pack into a new buffer by copying over to a new buffer block.
870             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
871             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
872             for t in bufferblock_segs:
873                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
874                 t.segment_offset = new_bb.size() - t.range_size
875
876             self._current_bblock = new_bb
877
878     @must_be_writable
879     @synchronized
880     def writeto(self, offset, data, num_retries):
881         """Write `data` to the file starting at `offset`.
882
883         This will update existing bytes and/or extend the size of the file as
884         necessary.
885
886         """
887         if len(data) == 0:
888             return
889
890         if offset > self.size():
891             raise ArgumentError("Offset is past the end of the file")
892
893         if len(data) > config.KEEP_BLOCK_SIZE:
894             # Chunk it up into smaller writes
895             n = 0
896             dataview = memoryview(data)
897             while n < len(data):
898                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
899                 n += config.KEEP_BLOCK_SIZE
900             return
901
902         self._committed = False
903
904         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
905             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
906
907         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
908             self._repack_writes(num_retries)
909             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
910                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
911                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
912
913         self._current_bblock.append(data)
914
915         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
916
917         self.parent.notify(WRITE, self.parent, self.name, (self, self))
918
919         return len(data)
920
921     @synchronized
922     def flush(self, sync=True, num_retries=0):
923         """Flush the current bufferblock to Keep.
924
925         :sync:
926           If True, commit block synchronously, wait until buffer block has been written.
927           If False, commit block asynchronously, return immediately after putting block into
928           the keep put queue.
929         """
930         if self.committed():
931             return
932
933         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
934             if self._current_bblock.state() == _BufferBlock.WRITABLE:
935                 self._repack_writes(num_retries)
936             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
937
938         if sync:
939             to_delete = set()
940             for s in self._segments:
941                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
942                 if bb:
943                     if bb.state() != _BufferBlock.COMMITTED:
944                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
945                     to_delete.add(s.locator)
946                     s.locator = bb.locator()
947             for s in to_delete:
948                self.parent._my_block_manager().delete_bufferblock(s)
949
950         self.parent.notify(MOD, self.parent, self.name, (self, self))
951
952     @must_be_writable
953     @synchronized
954     def add_segment(self, blocks, pos, size):
955         """Add a segment to the end of the file.
956
957         `pos` and `offset` reference a section of the stream described by
958         `blocks` (a list of Range objects)
959
960         """
961         self._add_segment(blocks, pos, size)
962
963     def _add_segment(self, blocks, pos, size):
964         """Internal implementation of add_segment."""
965         self._committed = False
966         for lr in locators_and_ranges(blocks, pos, size):
967             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
968             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
969             self._segments.append(r)
970
971     @synchronized
972     def size(self):
973         """Get the file size."""
974         if self._segments:
975             n = self._segments[-1]
976             return n.range_start + n.range_size
977         else:
978             return 0
979
980     @synchronized
981     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
982         buf = ""
983         filestream = []
984         for segment in self.segments:
985             loc = segment.locator
986             if loc.startswith("bufferblock"):
987                 loc = self._bufferblocks[loc].calculate_locator()
988             if portable_locators:
989                 loc = KeepLocator(loc).stripped()
990             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
991                                  segment.segment_offset, segment.range_size))
992         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
993         buf += "\n"
994         return buf
995
996     @must_be_writable
997     @synchronized
998     def _reparent(self, newparent, newname):
999         self._committed = False
1000         self.flush(sync=True)
1001         self.parent.remove(self.name)
1002         self.parent = newparent
1003         self.name = newname
1004         self.lock = self.parent.root_collection().lock
1005
1006
1007 class ArvadosFileReader(ArvadosFileReaderBase):
1008     """Wraps ArvadosFile in a file-like object supporting reading only.
1009
1010     Be aware that this class is NOT thread safe as there is no locking around
1011     updating file pointer.
1012
1013     """
1014
1015     def __init__(self, arvadosfile, num_retries=None):
1016         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1017         self.arvadosfile = arvadosfile
1018
1019     def size(self):
1020         return self.arvadosfile.size()
1021
1022     def stream_name(self):
1023         return self.arvadosfile.parent.stream_name()
1024
1025     @_FileLikeObjectBase._before_close
1026     @retry_method
1027     def read(self, size=None, num_retries=None):
1028         """Read up to `size` bytes from the file and return the result.
1029
1030         Starts at the current file position.  If `size` is None, read the
1031         entire remainder of the file.
1032         """
1033         if size is None:
1034             data = []
1035             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1036             while rd:
1037                 data.append(rd)
1038                 self._filepos += len(rd)
1039                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1040             return ''.join(data)
1041         else:
1042             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1043             self._filepos += len(data)
1044             return data
1045
1046     @_FileLikeObjectBase._before_close
1047     @retry_method
1048     def readfrom(self, offset, size, num_retries=None):
1049         """Read up to `size` bytes from the stream, starting at the specified file offset.
1050
1051         This method does not change the file position.
1052         """
1053         return self.arvadosfile.readfrom(offset, size, num_retries)
1054
1055     def flush(self):
1056         pass
1057
1058
1059 class ArvadosFileWriter(ArvadosFileReader):
1060     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1061
1062     Be aware that this class is NOT thread safe as there is no locking around
1063     updating file pointer.
1064
1065     """
1066
1067     def __init__(self, arvadosfile, mode, num_retries=None):
1068         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1069         self.mode = mode
1070
1071     @_FileLikeObjectBase._before_close
1072     @retry_method
1073     def write(self, data, num_retries=None):
1074         if self.mode[0] == "a":
1075             self.arvadosfile.writeto(self.size(), data, num_retries)
1076         else:
1077             self.arvadosfile.writeto(self._filepos, data, num_retries)
1078             self._filepos += len(data)
1079         return len(data)
1080
1081     @_FileLikeObjectBase._before_close
1082     @retry_method
1083     def writelines(self, seq, num_retries=None):
1084         for s in seq:
1085             self.write(s, num_retries=num_retries)
1086
1087     @_FileLikeObjectBase._before_close
1088     def truncate(self, size=None):
1089         if size is None:
1090             size = self._filepos
1091         self.arvadosfile.truncate(size)
1092         if self._filepos > self.size():
1093             self._filepos = self.size()
1094
1095     @_FileLikeObjectBase._before_close
1096     def flush(self):
1097         self.arvadosfile.flush()
1098
1099     def close(self):
1100         if not self.closed:
1101             self.flush()
1102             super(ArvadosFileWriter, self).close()