3198: Fix frontrunning (subsequent updates after a commit causing the
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21 WRITE = "write"
22
23 _logger = logging.getLogger('arvados.arvfile')
24
25 def split(path):
26     """split(path) -> streamname, filename
27
28     Separate the stream name and file name in a /-separated stream path and
29     return a tuple (stream_name, file_name).  If no stream name is available,
30     assume '.'.
31
32     """
33     try:
34         stream_name, file_name = path.rsplit('/', 1)
35     except ValueError:  # No / in string
36         stream_name, file_name = '.', path
37     return stream_name, file_name
38
39 class _FileLikeObjectBase(object):
40     def __init__(self, name, mode):
41         self.name = name
42         self.mode = mode
43         self.closed = False
44
45     @staticmethod
46     def _before_close(orig_func):
47         @functools.wraps(orig_func)
48         def before_close_wrapper(self, *args, **kwargs):
49             if self.closed:
50                 raise ValueError("I/O operation on closed stream file")
51             return orig_func(self, *args, **kwargs)
52         return before_close_wrapper
53
54     def __enter__(self):
55         return self
56
57     def __exit__(self, exc_type, exc_value, traceback):
58         try:
59             self.close()
60         except Exception:
61             if exc_type is None:
62                 raise
63
64     def close(self):
65         self.closed = True
66
67
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69     def __init__(self, name, mode, num_retries=None):
70         super(ArvadosFileReaderBase, self).__init__(name, mode)
71         self._filepos = 0L
72         self.num_retries = num_retries
73         self._readline_cache = (None, None)
74
75     def __iter__(self):
76         while True:
77             data = self.readline()
78             if not data:
79                 break
80             yield data
81
82     def decompressed_name(self):
83         return re.sub('\.(bz2|gz)$', '', self.name)
84
85     @_FileLikeObjectBase._before_close
86     def seek(self, pos, whence=os.SEEK_SET):
87         if whence == os.SEEK_CUR:
88             pos += self._filepos
89         elif whence == os.SEEK_END:
90             pos += self.size()
91         self._filepos = min(max(pos, 0L), self.size())
92
93     def tell(self):
94         return self._filepos
95
96     @_FileLikeObjectBase._before_close
97     @retry_method
98     def readall(self, size=2**20, num_retries=None):
99         while True:
100             data = self.read(size, num_retries=num_retries)
101             if data == '':
102                 break
103             yield data
104
105     @_FileLikeObjectBase._before_close
106     @retry_method
107     def readline(self, size=float('inf'), num_retries=None):
108         cache_pos, cache_data = self._readline_cache
109         if self.tell() == cache_pos:
110             data = [cache_data]
111         else:
112             data = ['']
113         data_size = len(data[-1])
114         while (data_size < size) and ('\n' not in data[-1]):
115             next_read = self.read(2 ** 20, num_retries=num_retries)
116             if not next_read:
117                 break
118             data.append(next_read)
119             data_size += len(next_read)
120         data = ''.join(data)
121         try:
122             nextline_index = data.index('\n') + 1
123         except ValueError:
124             nextline_index = len(data)
125         nextline_index = min(nextline_index, size)
126         self._readline_cache = (self.tell(), data[nextline_index:])
127         return data[:nextline_index]
128
129     @_FileLikeObjectBase._before_close
130     @retry_method
131     def decompress(self, decompress, size, num_retries=None):
132         for segment in self.readall(size, num_retries):
133             data = decompress(segment)
134             if data:
135                 yield data
136
137     @_FileLikeObjectBase._before_close
138     @retry_method
139     def readall_decompressed(self, size=2**20, num_retries=None):
140         self.seek(0)
141         if self.name.endswith('.bz2'):
142             dc = bz2.BZ2Decompressor()
143             return self.decompress(dc.decompress, size,
144                                    num_retries=num_retries)
145         elif self.name.endswith('.gz'):
146             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
147             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
148                                    size, num_retries=num_retries)
149         else:
150             return self.readall(size, num_retries=num_retries)
151
152     @_FileLikeObjectBase._before_close
153     @retry_method
154     def readlines(self, sizehint=float('inf'), num_retries=None):
155         data = []
156         data_size = 0
157         for s in self.readall(num_retries=num_retries):
158             data.append(s)
159             data_size += len(s)
160             if data_size >= sizehint:
161                 break
162         return ''.join(data).splitlines(True)
163
164     def size(self):
165         raise NotImplementedError()
166
167     def read(self, size, num_retries=None):
168         raise NotImplementedError()
169
170     def readfrom(self, start, size, num_retries=None):
171         raise NotImplementedError()
172
173
174 class StreamFileReader(ArvadosFileReaderBase):
175     class _NameAttribute(str):
176         # The Python file API provides a plain .name attribute.
177         # Older SDK provided a name() method.
178         # This class provides both, for maximum compatibility.
179         def __call__(self):
180             return self
181
182     def __init__(self, stream, segments, name):
183         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
184         self._stream = stream
185         self.segments = segments
186
187     def stream_name(self):
188         return self._stream.name()
189
190     def size(self):
191         n = self.segments[-1]
192         return n.range_start + n.range_size
193
194     @_FileLikeObjectBase._before_close
195     @retry_method
196     def read(self, size, num_retries=None):
197         """Read up to 'size' bytes from the stream, starting at the current file position"""
198         if size == 0:
199             return ''
200
201         data = ''
202         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
203         if available_chunks:
204             lr = available_chunks[0]
205             data = self._stream.readfrom(lr.locator+lr.segment_offset,
206                                           lr.segment_size,
207                                           num_retries=num_retries)
208
209         self._filepos += len(data)
210         return data
211
212     @_FileLikeObjectBase._before_close
213     @retry_method
214     def readfrom(self, start, size, num_retries=None):
215         """Read up to 'size' bytes from the stream, starting at 'start'"""
216         if size == 0:
217             return ''
218
219         data = []
220         for lr in locators_and_ranges(self.segments, start, size):
221             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
222                                               num_retries=num_retries))
223         return ''.join(data)
224
225     def as_manifest(self):
226         segs = []
227         for r in self.segments:
228             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
229         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
230
231
232 def synchronized(orig_func):
233     @functools.wraps(orig_func)
234     def synchronized_wrapper(self, *args, **kwargs):
235         with self.lock:
236             return orig_func(self, *args, **kwargs)
237     return synchronized_wrapper
238
239
240 class StateChangeError(Exception):
241     def __init__(self, message, state, nextstate):
242         super(StateChangeError, self).__init__(message)
243         self.state = state
244         self.nextstate = nextstate
245
246 class _BufferBlock(object):
247     """A stand-in for a Keep block that is in the process of being written.
248
249     Writers can append to it, get the size, and compute the Keep locator.
250     There are three valid states:
251
252     WRITABLE
253       Can append to block.
254
255     PENDING
256       Block is in the process of being uploaded to Keep, append is an error.
257
258     COMMITTED
259       The block has been written to Keep, its internal buffer has been
260       released, fetching the block will fetch it via keep client (since we
261       discarded the internal copy), and identifiers referring to the BufferBlock
262       can be replaced with the block locator.
263
264     """
265
266     WRITABLE = 0
267     PENDING = 1
268     COMMITTED = 2
269     ERROR = 3
270
271     def __init__(self, blockid, starting_capacity, owner):
272         """
273         :blockid:
274           the identifier for this block
275
276         :starting_capacity:
277           the initial buffer capacity
278
279         :owner:
280           ArvadosFile that owns this block
281
282         """
283         self.blockid = blockid
284         self.buffer_block = bytearray(starting_capacity)
285         self.buffer_view = memoryview(self.buffer_block)
286         self.write_pointer = 0
287         self._state = _BufferBlock.WRITABLE
288         self._locator = None
289         self.owner = owner
290         self.lock = threading.Lock()
291         self.wait_for_commit = threading.Event()
292         self.error = None
293
294     @synchronized
295     def append(self, data):
296         """Append some data to the buffer.
297
298         Only valid if the block is in WRITABLE state.  Implements an expanding
299         buffer, doubling capacity as needed to accomdate all the data.
300
301         """
302         if self._state == _BufferBlock.WRITABLE:
303             while (self.write_pointer+len(data)) > len(self.buffer_block):
304                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
305                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
306                 self.buffer_block = new_buffer_block
307                 self.buffer_view = memoryview(self.buffer_block)
308             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
309             self.write_pointer += len(data)
310             self._locator = None
311         else:
312             raise AssertionError("Buffer block is not writable")
313
314     @synchronized
315     def set_state(self, nextstate, val=None):
316         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
317             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED) or
318             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.ERROR) or
319             (self._state == _BufferBlock.ERROR and nextstate == _BufferBlock.PENDING)):
320             self._state = nextstate
321
322             if self._state == _BufferBlock.PENDING:
323                 self.wait_for_commit.clear()
324
325             if self._state == _BufferBlock.COMMITTED:
326                 self._locator = val
327                 self.buffer_view = None
328                 self.buffer_block = None
329                 self.wait_for_commit.set()
330
331             if self._state == _BufferBlock.ERROR:
332                 self.error = val
333                 self.wait_for_commit.set()
334         else:
335             raise StateChangeError("Invalid state change from %s to %s" % (self.state, nextstate), self.state, nextstate)
336
337     @synchronized
338     def state(self):
339         return self._state
340
341     def size(self):
342         """The amount of data written to the buffer."""
343         return self.write_pointer
344
345     @synchronized
346     def locator(self):
347         """The Keep locator for this buffer's contents."""
348         if self._locator is None:
349             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
350         return self._locator
351
352     @synchronized
353     def clone(self, new_blockid, owner):
354         if self._state == _BufferBlock.COMMITTED:
355             raise AssertionError("Cannot duplicate committed buffer block")
356         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
357         bufferblock.append(self.buffer_view[0:self.size()])
358         return bufferblock
359
360     @synchronized
361     def clear(self):
362         self.owner = None
363         self.buffer_block = None
364         self.buffer_view = None
365
366
367 class NoopLock(object):
368     def __enter__(self):
369         return self
370
371     def __exit__(self, exc_type, exc_value, traceback):
372         pass
373
374     def acquire(self, blocking=False):
375         pass
376
377     def release(self):
378         pass
379
380
381 def must_be_writable(orig_func):
382     @functools.wraps(orig_func)
383     def must_be_writable_wrapper(self, *args, **kwargs):
384         if not self.writable():
385             raise IOError(errno.EROFS, "Collection is read-only.")
386         return orig_func(self, *args, **kwargs)
387     return must_be_writable_wrapper
388
389
390 class _BlockManager(object):
391     """BlockManager handles buffer blocks.
392
393     Also handles background block uploads, and background block prefetch for a
394     Collection of ArvadosFiles.
395
396     """
397
398     DEFAULT_PUT_THREADS = 2
399     DEFAULT_GET_THREADS = 2
400
401     def __init__(self, keep):
402         """keep: KeepClient object to use"""
403         self._keep = keep
404         self._bufferblocks = {}
405         self._put_queue = None
406         self._put_threads = None
407         self._prefetch_queue = None
408         self._prefetch_threads = None
409         self.lock = threading.Lock()
410         self.prefetch_enabled = True
411         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
412         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
413
414     @synchronized
415     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
416         """Allocate a new, empty bufferblock in WRITABLE state and return it.
417
418         :blockid:
419           optional block identifier, otherwise one will be automatically assigned
420
421         :starting_capacity:
422           optional capacity, otherwise will use default capacity
423
424         :owner:
425           ArvadosFile that owns this block
426
427         """
428         if blockid is None:
429             blockid = "bufferblock%i" % len(self._bufferblocks)
430         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
431         self._bufferblocks[bufferblock.blockid] = bufferblock
432         return bufferblock
433
434     @synchronized
435     def dup_block(self, block, owner):
436         """Create a new bufferblock initialized with the content of an existing bufferblock.
437
438         :block:
439           the buffer block to copy.
440
441         :owner:
442           ArvadosFile that owns the new block
443
444         """
445         new_blockid = "bufferblock%i" % len(self._bufferblocks)
446         bufferblock = block.clone(new_blockid, owner)
447         self._bufferblocks[bufferblock.blockid] = bufferblock
448         return bufferblock
449
450     @synchronized
451     def is_bufferblock(self, locator):
452         return locator in self._bufferblocks
453
454     def _commit_bufferblock_worker(self):
455         """Background uploader thread."""
456
457         while True:
458             try:
459                 bufferblock = self._put_queue.get()
460                 if bufferblock is None:
461                     return
462
463                 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
464                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
465
466             except Exception as e:
467                 bufferblock.set_state(_BufferBlock.ERROR, e)
468             finally:
469                 if self._put_queue is not None:
470                     self._put_queue.task_done()
471
472     @synchronized
473     def start_put_threads(self):
474         if self._put_threads is None:
475             # Start uploader threads.
476
477             # If we don't limit the Queue size, the upload queue can quickly
478             # grow to take up gigabytes of RAM if the writing process is
479             # generating data more quickly than it can be send to the Keep
480             # servers.
481             #
482             # With two upload threads and a queue size of 2, this means up to 4
483             # blocks pending.  If they are full 64 MiB blocks, that means up to
484             # 256 MiB of internal buffering, which is the same size as the
485             # default download block cache in KeepClient.
486             self._put_queue = Queue.Queue(maxsize=2)
487
488             self._put_threads = []
489             for i in xrange(0, self.num_put_threads):
490                 thread = threading.Thread(target=self._commit_bufferblock_worker)
491                 self._put_threads.append(thread)
492                 thread.daemon = False
493                 thread.start()
494
495     def _block_prefetch_worker(self):
496         """The background downloader thread."""
497         while True:
498             try:
499                 b = self._prefetch_queue.get()
500                 if b is None:
501                     return
502                 self._keep.get(b)
503             except Exception:
504                 pass
505
506     @synchronized
507     def start_get_threads(self):
508         if self._prefetch_threads is None:
509             self._prefetch_queue = Queue.Queue()
510             self._prefetch_threads = []
511             for i in xrange(0, self.num_get_threads):
512                 thread = threading.Thread(target=self._block_prefetch_worker)
513                 self._prefetch_threads.append(thread)
514                 thread.daemon = True
515                 thread.start()
516
517
518     @synchronized
519     def stop_threads(self):
520         """Shut down and wait for background upload and download threads to finish."""
521
522         if self._put_threads is not None:
523             for t in self._put_threads:
524                 self._put_queue.put(None)
525             for t in self._put_threads:
526                 t.join()
527         self._put_threads = None
528         self._put_queue = None
529
530         if self._prefetch_threads is not None:
531             for t in self._prefetch_threads:
532                 self._prefetch_queue.put(None)
533             for t in self._prefetch_threads:
534                 t.join()
535         self._prefetch_threads = None
536         self._prefetch_queue = None
537
538     def __enter__(self):
539         return self
540
541     def __exit__(self, exc_type, exc_value, traceback):
542         self.stop_threads()
543
544     def __del__(self):
545         self.stop_threads()
546
547     def commit_bufferblock(self, block, sync):
548         """Initiate a background upload of a bufferblock.
549
550         :block:
551           The block object to upload
552
553         :sync:
554           If `sync` is True, upload the block synchronously.
555           If `sync` is False, upload the block asynchronously.  This will
556           return immediately unless the upload queue is at capacity, in
557           which case it will wait on an upload queue slot.
558
559         """
560
561         try:
562             # Mark the block as PENDING so to disallow any more appends.
563             block.set_state(_BufferBlock.PENDING)
564         except StateChangeError as e:
565             if e.state == _BufferBlock.PENDING and sync:
566                 block.wait_for_commit.wait()
567                 if block.state() == _BufferBlock.ERROR:
568                     raise block.error
569             return
570
571         if sync:
572             try:
573                 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
574                 block.set_state(_BufferBlock.COMMITTED, loc)
575             except Exception as e:
576                 block.set_state(_BufferBlock.ERROR, e)
577                 raise
578         else:
579             self.start_put_threads()
580             self._put_queue.put(block)
581
582     @synchronized
583     def get_bufferblock(self, locator):
584         return self._bufferblocks.get(locator)
585
586     @synchronized
587     def delete_bufferblock(self, locator):
588         bb = self._bufferblocks[locator]
589         bb.clear()
590         del self._bufferblocks[locator]
591
592     def get_block_contents(self, locator, num_retries, cache_only=False):
593         """Fetch a block.
594
595         First checks to see if the locator is a BufferBlock and return that, if
596         not, passes the request through to KeepClient.get().
597
598         """
599         with self.lock:
600             if locator in self._bufferblocks:
601                 bufferblock = self._bufferblocks[locator]
602                 if bufferblock.state() != _BufferBlock.COMMITTED:
603                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
604                 else:
605                     locator = bufferblock._locator
606         if cache_only:
607             return self._keep.get_from_cache(locator)
608         else:
609             return self._keep.get(locator, num_retries=num_retries)
610
611     def commit_all(self):
612         """Commit all outstanding buffer blocks.
613
614         This is a synchronous call, and will not return until all buffer blocks
615         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
616
617         """
618         with self.lock:
619             items = self._bufferblocks.items()
620
621         for k,v in items:
622             if v.state() != _BufferBlock.COMMITTED:
623                 v.owner.flush(sync=False)
624
625         with self.lock:
626             if self._put_queue is not None:
627                 self._put_queue.join()
628
629                 err = []
630                 for k,v in items:
631                     if v.state() == _BufferBlock.ERROR:
632                         err.append((v.locator(), v.error))
633                 if err:
634                     raise KeepWriteError("Error writing some blocks", err, label="block")
635
636         for k,v in items:
637             # flush again with sync=True to remove committed bufferblocks from
638             # the segments.
639             if v.owner:
640                 v.owner.flush(sync=True)
641
642
643     def block_prefetch(self, locator):
644         """Initiate a background download of a block.
645
646         This assumes that the underlying KeepClient implements a block cache,
647         so repeated requests for the same block will not result in repeated
648         downloads (unless the block is evicted from the cache.)  This method
649         does not block.
650
651         """
652
653         if not self.prefetch_enabled:
654             return
655
656         with self.lock:
657             if locator in self._bufferblocks:
658                 return
659         self.start_get_threads()
660         self._prefetch_queue.put(locator)
661
662
663 class ArvadosFile(object):
664     """Represent a file in a Collection.
665
666     ArvadosFile manages the underlying representation of a file in Keep as a
667     sequence of segments spanning a set of blocks, and implements random
668     read/write access.
669
670     This object may be accessed from multiple threads.
671
672     """
673
674     def __init__(self, parent, name, stream=[], segments=[]):
675         """
676         ArvadosFile constructor.
677
678         :stream:
679           a list of Range objects representing a block stream
680
681         :segments:
682           a list of Range objects representing segments
683         """
684         self.parent = parent
685         self.name = name
686         self._committed = False
687         self._segments = []
688         self.lock = parent.root_collection().lock
689         for s in segments:
690             self._add_segment(stream, s.locator, s.range_size)
691         self._current_bblock = None
692
693     def writable(self):
694         return self.parent.writable()
695
696     @synchronized
697     def segments(self):
698         return copy.copy(self._segments)
699
700     @synchronized
701     def clone(self, new_parent, new_name):
702         """Make a copy of this file."""
703         cp = ArvadosFile(new_parent, new_name)
704         cp.replace_contents(self)
705         return cp
706
707     @must_be_writable
708     @synchronized
709     def replace_contents(self, other):
710         """Replace segments of this file with segments from another `ArvadosFile` object."""
711
712         map_loc = {}
713         self._segments = []
714         for other_segment in other.segments():
715             new_loc = other_segment.locator
716             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
717                 if other_segment.locator not in map_loc:
718                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
719                     if bufferblock.state() != _BufferBlock.WRITABLE:
720                         map_loc[other_segment.locator] = bufferblock.locator()
721                     else:
722                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
723                 new_loc = map_loc[other_segment.locator]
724
725             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
726
727         self._committed = False
728
729     def __eq__(self, other):
730         if other is self:
731             return True
732         if not isinstance(other, ArvadosFile):
733             return False
734
735         othersegs = other.segments()
736         with self.lock:
737             if len(self._segments) != len(othersegs):
738                 return False
739             for i in xrange(0, len(othersegs)):
740                 seg1 = self._segments[i]
741                 seg2 = othersegs[i]
742                 loc1 = seg1.locator
743                 loc2 = seg2.locator
744
745                 if self.parent._my_block_manager().is_bufferblock(loc1):
746                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
747
748                 if other.parent._my_block_manager().is_bufferblock(loc2):
749                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
750
751                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
752                     seg1.range_start != seg2.range_start or
753                     seg1.range_size != seg2.range_size or
754                     seg1.segment_offset != seg2.segment_offset):
755                     return False
756
757         return True
758
759     def __ne__(self, other):
760         return not self.__eq__(other)
761
762     @synchronized
763     def set_committed(self):
764         """Set committed flag to False"""
765         self._committed = True
766
767     @synchronized
768     def committed(self):
769         """Get whether this is committed or not."""
770         return self._committed
771
772     @must_be_writable
773     @synchronized
774     def truncate(self, size):
775         """Shrink the size of the file.
776
777         If `size` is less than the size of the file, the file contents after
778         `size` will be discarded.  If `size` is greater than the current size
779         of the file, an IOError will be raised.
780
781         """
782         if size < self.size():
783             new_segs = []
784             for r in self._segments:
785                 range_end = r.range_start+r.range_size
786                 if r.range_start >= size:
787                     # segment is past the trucate size, all done
788                     break
789                 elif size < range_end:
790                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
791                     nr.segment_offset = r.segment_offset
792                     new_segs.append(nr)
793                     break
794                 else:
795                     new_segs.append(r)
796
797             self._segments = new_segs
798             self._committed = False
799         elif size > self.size():
800             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
801
802     def readfrom(self, offset, size, num_retries, exact=False):
803         """Read up to `size` bytes from the file starting at `offset`.
804
805         :exact:
806          If False (default), return less data than requested if the read
807          crosses a block boundary and the next block isn't cached.  If True,
808          only return less data than requested when hitting EOF.
809         """
810
811         with self.lock:
812             if size == 0 or offset >= self.size():
813                 return ''
814             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
815             readsegs = locators_and_ranges(self._segments, offset, size)
816
817         for lr in prefetch:
818             self.parent._my_block_manager().block_prefetch(lr.locator)
819
820         data = []
821         for lr in readsegs:
822             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
823             if block:
824                 blockview = memoryview(block)
825                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
826             else:
827                 break
828         return ''.join(data)
829
830     def _repack_writes(self, num_retries):
831         """Test if the buffer block has more data than actual segments.
832
833         This happens when a buffered write over-writes a file range written in
834         a previous buffered write.  Re-pack the buffer block for efficiency
835         and to avoid leaking information.
836
837         """
838         segs = self._segments
839
840         # Sum up the segments to get the total bytes of the file referencing
841         # into the buffer block.
842         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
843         write_total = sum([s.range_size for s in bufferblock_segs])
844
845         if write_total < self._current_bblock.size():
846             # There is more data in the buffer block than is actually accounted for by segments, so
847             # re-pack into a new buffer by copying over to a new buffer block.
848             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
849             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
850             for t in bufferblock_segs:
851                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
852                 t.segment_offset = new_bb.size() - t.range_size
853
854             self._current_bblock = new_bb
855
856     @must_be_writable
857     @synchronized
858     def writeto(self, offset, data, num_retries):
859         """Write `data` to the file starting at `offset`.
860
861         This will update existing bytes and/or extend the size of the file as
862         necessary.
863
864         """
865         if len(data) == 0:
866             return
867
868         if offset > self.size():
869             raise ArgumentError("Offset is past the end of the file")
870
871         if len(data) > config.KEEP_BLOCK_SIZE:
872             # Chunk it up into smaller writes
873             n = 0
874             dataview = memoryview(data)
875             while n < len(data):
876                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
877                 n += config.KEEP_BLOCK_SIZE
878             return
879
880         self._committed = False
881
882         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
883             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
884
885         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
886             self._repack_writes(num_retries)
887             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
888                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
889                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
890
891         self._current_bblock.append(data)
892
893         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
894
895         self.parent.notify(WRITE, self.parent, self.name, (self, self))
896
897         return len(data)
898
899     @synchronized
900     def flush(self, sync=True, num_retries=0):
901         """Flush the current bufferblock to Keep.
902
903         :sync:
904           If True, commit block synchronously, wait until buffer block has been written.
905           If False, commit block asynchronously, return immediately after putting block into
906           the keep put queue.
907         """
908         if self.committed():
909             return
910
911         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
912             if self._current_bblock.state() == _BufferBlock.WRITABLE:
913                 self._repack_writes(num_retries)
914             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
915
916         if sync:
917             to_delete = set()
918             for s in self._segments:
919                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
920                 if bb:
921                     if bb.state() != _BufferBlock.COMMITTED:
922                         self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=True)
923                     to_delete.add(s.locator)
924                     s.locator = bb.locator()
925             for s in to_delete:
926                self.parent._my_block_manager().delete_bufferblock(s)
927
928         self.parent.notify(MOD, self.parent, self.name, (self, self))
929
930     @must_be_writable
931     @synchronized
932     def add_segment(self, blocks, pos, size):
933         """Add a segment to the end of the file.
934
935         `pos` and `offset` reference a section of the stream described by
936         `blocks` (a list of Range objects)
937
938         """
939         self._add_segment(blocks, pos, size)
940
941     def _add_segment(self, blocks, pos, size):
942         """Internal implementation of add_segment."""
943         self._committed = False
944         for lr in locators_and_ranges(blocks, pos, size):
945             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
946             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
947             self._segments.append(r)
948
949     @synchronized
950     def size(self):
951         """Get the file size."""
952         if self._segments:
953             n = self._segments[-1]
954             return n.range_start + n.range_size
955         else:
956             return 0
957
958     @synchronized
959     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
960         buf = ""
961         filestream = []
962         for segment in self.segments:
963             loc = segment.locator
964             if loc.startswith("bufferblock"):
965                 loc = self._bufferblocks[loc].calculate_locator()
966             if portable_locators:
967                 loc = KeepLocator(loc).stripped()
968             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
969                                  segment.segment_offset, segment.range_size))
970         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
971         buf += "\n"
972         return buf
973
974     @must_be_writable
975     @synchronized
976     def _reparent(self, newparent, newname):
977         self._committed = False
978         self.flush(sync=True)
979         self.parent.remove(self.name)
980         self.parent = newparent
981         self.name = newname
982         self.lock = self.parent.root_collection().lock
983
984
985 class ArvadosFileReader(ArvadosFileReaderBase):
986     """Wraps ArvadosFile in a file-like object supporting reading only.
987
988     Be aware that this class is NOT thread safe as there is no locking around
989     updating file pointer.
990
991     """
992
993     def __init__(self, arvadosfile, num_retries=None):
994         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
995         self.arvadosfile = arvadosfile
996
997     def size(self):
998         return self.arvadosfile.size()
999
1000     def stream_name(self):
1001         return self.arvadosfile.parent.stream_name()
1002
1003     @_FileLikeObjectBase._before_close
1004     @retry_method
1005     def read(self, size=None, num_retries=None):
1006         """Read up to `size` bytes from the file and return the result.
1007
1008         Starts at the current file position.  If `size` is None, read the
1009         entire remainder of the file.
1010         """
1011         if size is None:
1012             data = []
1013             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1014             while rd:
1015                 data.append(rd)
1016                 self._filepos += len(rd)
1017                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1018             return ''.join(data)
1019         else:
1020             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1021             self._filepos += len(data)
1022             return data
1023
1024     @_FileLikeObjectBase._before_close
1025     @retry_method
1026     def readfrom(self, offset, size, num_retries=None):
1027         """Read up to `size` bytes from the stream, starting at the specified file offset.
1028
1029         This method does not change the file position.
1030         """
1031         return self.arvadosfile.readfrom(offset, size, num_retries)
1032
1033     def flush(self):
1034         pass
1035
1036
1037 class ArvadosFileWriter(ArvadosFileReader):
1038     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1039
1040     Be aware that this class is NOT thread safe as there is no locking around
1041     updating file pointer.
1042
1043     """
1044
1045     def __init__(self, arvadosfile, mode, num_retries=None):
1046         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1047         self.mode = mode
1048
1049     @_FileLikeObjectBase._before_close
1050     @retry_method
1051     def write(self, data, num_retries=None):
1052         if self.mode[0] == "a":
1053             self.arvadosfile.writeto(self.size(), data, num_retries)
1054         else:
1055             self.arvadosfile.writeto(self._filepos, data, num_retries)
1056             self._filepos += len(data)
1057         return len(data)
1058
1059     @_FileLikeObjectBase._before_close
1060     @retry_method
1061     def writelines(self, seq, num_retries=None):
1062         for s in seq:
1063             self.write(s, num_retries)
1064
1065     @_FileLikeObjectBase._before_close
1066     def truncate(self, size=None):
1067         if size is None:
1068             size = self._filepos
1069         self.arvadosfile.truncate(size)
1070         if self._filepos > self.size():
1071             self._filepos = self.size()
1072
1073     @_FileLikeObjectBase._before_close
1074     def flush(self):
1075         self.arvadosfile.flush()
1076
1077     def close(self):
1078         if not self.closed:
1079             self.flush()
1080             super(ArvadosFileWriter, self).close()