8005: Add tar Ruby build dependency on CentOS 6.
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21 WRITE = "write"
22
23 _logger = logging.getLogger('arvados.arvfile')
24
25 def split(path):
26     """split(path) -> streamname, filename
27
28     Separate the stream name and file name in a /-separated stream path and
29     return a tuple (stream_name, file_name).  If no stream name is available,
30     assume '.'.
31
32     """
33     try:
34         stream_name, file_name = path.rsplit('/', 1)
35     except ValueError:  # No / in string
36         stream_name, file_name = '.', path
37     return stream_name, file_name
38
39 class _FileLikeObjectBase(object):
40     def __init__(self, name, mode):
41         self.name = name
42         self.mode = mode
43         self.closed = False
44
45     @staticmethod
46     def _before_close(orig_func):
47         @functools.wraps(orig_func)
48         def before_close_wrapper(self, *args, **kwargs):
49             if self.closed:
50                 raise ValueError("I/O operation on closed stream file")
51             return orig_func(self, *args, **kwargs)
52         return before_close_wrapper
53
54     def __enter__(self):
55         return self
56
57     def __exit__(self, exc_type, exc_value, traceback):
58         try:
59             self.close()
60         except Exception:
61             if exc_type is None:
62                 raise
63
64     def close(self):
65         self.closed = True
66
67
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69     def __init__(self, name, mode, num_retries=None):
70         super(ArvadosFileReaderBase, self).__init__(name, mode)
71         self._filepos = 0L
72         self.num_retries = num_retries
73         self._readline_cache = (None, None)
74
75     def __iter__(self):
76         while True:
77             data = self.readline()
78             if not data:
79                 break
80             yield data
81
82     def decompressed_name(self):
83         return re.sub('\.(bz2|gz)$', '', self.name)
84
85     @_FileLikeObjectBase._before_close
86     def seek(self, pos, whence=os.SEEK_SET):
87         if whence == os.SEEK_CUR:
88             pos += self._filepos
89         elif whence == os.SEEK_END:
90             pos += self.size()
91         self._filepos = min(max(pos, 0L), self.size())
92
93     def tell(self):
94         return self._filepos
95
96     @_FileLikeObjectBase._before_close
97     @retry_method
98     def readall(self, size=2**20, num_retries=None):
99         while True:
100             data = self.read(size, num_retries=num_retries)
101             if data == '':
102                 break
103             yield data
104
105     @_FileLikeObjectBase._before_close
106     @retry_method
107     def readline(self, size=float('inf'), num_retries=None):
108         cache_pos, cache_data = self._readline_cache
109         if self.tell() == cache_pos:
110             data = [cache_data]
111         else:
112             data = ['']
113         data_size = len(data[-1])
114         while (data_size < size) and ('\n' not in data[-1]):
115             next_read = self.read(2 ** 20, num_retries=num_retries)
116             if not next_read:
117                 break
118             data.append(next_read)
119             data_size += len(next_read)
120         data = ''.join(data)
121         try:
122             nextline_index = data.index('\n') + 1
123         except ValueError:
124             nextline_index = len(data)
125         nextline_index = min(nextline_index, size)
126         self._readline_cache = (self.tell(), data[nextline_index:])
127         return data[:nextline_index]
128
129     @_FileLikeObjectBase._before_close
130     @retry_method
131     def decompress(self, decompress, size, num_retries=None):
132         for segment in self.readall(size, num_retries=num_retries):
133             data = decompress(segment)
134             if data:
135                 yield data
136
137     @_FileLikeObjectBase._before_close
138     @retry_method
139     def readall_decompressed(self, size=2**20, num_retries=None):
140         self.seek(0)
141         if self.name.endswith('.bz2'):
142             dc = bz2.BZ2Decompressor()
143             return self.decompress(dc.decompress, size,
144                                    num_retries=num_retries)
145         elif self.name.endswith('.gz'):
146             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
147             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
148                                    size, num_retries=num_retries)
149         else:
150             return self.readall(size, num_retries=num_retries)
151
152     @_FileLikeObjectBase._before_close
153     @retry_method
154     def readlines(self, sizehint=float('inf'), num_retries=None):
155         data = []
156         data_size = 0
157         for s in self.readall(num_retries=num_retries):
158             data.append(s)
159             data_size += len(s)
160             if data_size >= sizehint:
161                 break
162         return ''.join(data).splitlines(True)
163
164     def size(self):
165         raise NotImplementedError()
166
167     def read(self, size, num_retries=None):
168         raise NotImplementedError()
169
170     def readfrom(self, start, size, num_retries=None):
171         raise NotImplementedError()
172
173
174 class StreamFileReader(ArvadosFileReaderBase):
175     class _NameAttribute(str):
176         # The Python file API provides a plain .name attribute.
177         # Older SDK provided a name() method.
178         # This class provides both, for maximum compatibility.
179         def __call__(self):
180             return self
181
182     def __init__(self, stream, segments, name):
183         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
184         self._stream = stream
185         self.segments = segments
186
187     def stream_name(self):
188         return self._stream.name()
189
190     def size(self):
191         n = self.segments[-1]
192         return n.range_start + n.range_size
193
194     @_FileLikeObjectBase._before_close
195     @retry_method
196     def read(self, size, num_retries=None):
197         """Read up to 'size' bytes from the stream, starting at the current file position"""
198         if size == 0:
199             return ''
200
201         data = ''
202         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
203         if available_chunks:
204             lr = available_chunks[0]
205             data = self._stream.readfrom(lr.locator+lr.segment_offset,
206                                           lr.segment_size,
207                                           num_retries=num_retries)
208
209         self._filepos += len(data)
210         return data
211
212     @_FileLikeObjectBase._before_close
213     @retry_method
214     def readfrom(self, start, size, num_retries=None):
215         """Read up to 'size' bytes from the stream, starting at 'start'"""
216         if size == 0:
217             return ''
218
219         data = []
220         for lr in locators_and_ranges(self.segments, start, size):
221             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
222                                               num_retries=num_retries))
223         return ''.join(data)
224
225     def as_manifest(self):
226         segs = []
227         for r in self.segments:
228             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
229         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
230
231
232 def synchronized(orig_func):
233     @functools.wraps(orig_func)
234     def synchronized_wrapper(self, *args, **kwargs):
235         with self.lock:
236             return orig_func(self, *args, **kwargs)
237     return synchronized_wrapper
238
239
240 class StateChangeError(Exception):
241     def __init__(self, message, state, nextstate):
242         super(StateChangeError, self).__init__(message)
243         self.state = state
244         self.nextstate = nextstate
245
246 class _BufferBlock(object):
247     """A stand-in for a Keep block that is in the process of being written.
248
249     Writers can append to it, get the size, and compute the Keep locator.
250     There are three valid states:
251
252     WRITABLE
253       Can append to block.
254
255     PENDING
256       Block is in the process of being uploaded to Keep, append is an error.
257
258     COMMITTED
259       The block has been written to Keep, its internal buffer has been
260       released, fetching the block will fetch it via keep client (since we
261       discarded the internal copy), and identifiers referring to the BufferBlock
262       can be replaced with the block locator.
263
264     """
265
266     WRITABLE = 0
267     PENDING = 1
268     COMMITTED = 2
269     ERROR = 3
270
271     def __init__(self, blockid, starting_capacity, owner):
272         """
273         :blockid:
274           the identifier for this block
275
276         :starting_capacity:
277           the initial buffer capacity
278
279         :owner:
280           ArvadosFile that owns this block
281
282         """
283         self.blockid = blockid
284         self.buffer_block = bytearray(starting_capacity)
285         self.buffer_view = memoryview(self.buffer_block)
286         self.write_pointer = 0
287         self._state = _BufferBlock.WRITABLE
288         self._locator = None
289         self.owner = owner
290         self.lock = threading.Lock()
291         self.wait_for_commit = threading.Event()
292         self.error = None
293
294     @synchronized
295     def append(self, data):
296         """Append some data to the buffer.
297
298         Only valid if the block is in WRITABLE state.  Implements an expanding
299         buffer, doubling capacity as needed to accomdate all the data.
300
301         """
302         if self._state == _BufferBlock.WRITABLE:
303             while (self.write_pointer+len(data)) > len(self.buffer_block):
304                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
305                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
306                 self.buffer_block = new_buffer_block
307                 self.buffer_view = memoryview(self.buffer_block)
308             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
309             self.write_pointer += len(data)
310             self._locator = None
311         else:
312             raise AssertionError("Buffer block is not writable")
313
314     STATE_TRANSITIONS = frozenset([
315             (WRITABLE, PENDING),
316             (PENDING, COMMITTED),
317             (PENDING, ERROR),
318             (ERROR, PENDING)])
319
320     @synchronized
321     def set_state(self, nextstate, val=None):
322         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
323             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
324         self._state = nextstate
325
326         if self._state == _BufferBlock.PENDING:
327             self.wait_for_commit.clear()
328
329         if self._state == _BufferBlock.COMMITTED:
330             self._locator = val
331             self.buffer_view = None
332             self.buffer_block = None
333             self.wait_for_commit.set()
334
335         if self._state == _BufferBlock.ERROR:
336             self.error = val
337             self.wait_for_commit.set()
338
339     @synchronized
340     def state(self):
341         return self._state
342
343     def size(self):
344         """The amount of data written to the buffer."""
345         return self.write_pointer
346
347     @synchronized
348     def locator(self):
349         """The Keep locator for this buffer's contents."""
350         if self._locator is None:
351             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
352         return self._locator
353
354     @synchronized
355     def clone(self, new_blockid, owner):
356         if self._state == _BufferBlock.COMMITTED:
357             raise AssertionError("Cannot duplicate committed buffer block")
358         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
359         bufferblock.append(self.buffer_view[0:self.size()])
360         return bufferblock
361
362     @synchronized
363     def clear(self):
364         self.owner = None
365         self.buffer_block = None
366         self.buffer_view = None
367
368
369 class NoopLock(object):
370     def __enter__(self):
371         return self
372
373     def __exit__(self, exc_type, exc_value, traceback):
374         pass
375
376     def acquire(self, blocking=False):
377         pass
378
379     def release(self):
380         pass
381
382
383 def must_be_writable(orig_func):
384     @functools.wraps(orig_func)
385     def must_be_writable_wrapper(self, *args, **kwargs):
386         if not self.writable():
387             raise IOError(errno.EROFS, "Collection is read-only.")
388         return orig_func(self, *args, **kwargs)
389     return must_be_writable_wrapper
390
391
392 class _BlockManager(object):
393     """BlockManager handles buffer blocks.
394
395     Also handles background block uploads, and background block prefetch for a
396     Collection of ArvadosFiles.
397
398     """
399
400     DEFAULT_PUT_THREADS = 2
401     DEFAULT_GET_THREADS = 2
402
403     def __init__(self, keep):
404         """keep: KeepClient object to use"""
405         self._keep = keep
406         self._bufferblocks = {}
407         self._put_queue = None
408         self._put_threads = None
409         self._prefetch_queue = None
410         self._prefetch_threads = None
411         self.lock = threading.Lock()
412         self.prefetch_enabled = True
413         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
414         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
415
416     @synchronized
417     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
418         """Allocate a new, empty bufferblock in WRITABLE state and return it.
419
420         :blockid:
421           optional block identifier, otherwise one will be automatically assigned
422
423         :starting_capacity:
424           optional capacity, otherwise will use default capacity
425
426         :owner:
427           ArvadosFile that owns this block
428
429         """
430         if blockid is None:
431             blockid = "bufferblock%i" % len(self._bufferblocks)
432         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
433         self._bufferblocks[bufferblock.blockid] = bufferblock
434         return bufferblock
435
436     @synchronized
437     def dup_block(self, block, owner):
438         """Create a new bufferblock initialized with the content of an existing bufferblock.
439
440         :block:
441           the buffer block to copy.
442
443         :owner:
444           ArvadosFile that owns the new block
445
446         """
447         new_blockid = "bufferblock%i" % len(self._bufferblocks)
448         bufferblock = block.clone(new_blockid, owner)
449         self._bufferblocks[bufferblock.blockid] = bufferblock
450         return bufferblock
451
452     @synchronized
453     def is_bufferblock(self, locator):
454         return locator in self._bufferblocks
455
456     def _commit_bufferblock_worker(self):
457         """Background uploader thread."""
458
459         while True:
460             try:
461                 bufferblock = self._put_queue.get()
462                 if bufferblock is None:
463                     return
464
465                 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
466                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
467
468             except Exception as e:
469                 bufferblock.set_state(_BufferBlock.ERROR, e)
470             finally:
471                 if self._put_queue is not None:
472                     self._put_queue.task_done()
473
474     @synchronized
475     def start_put_threads(self):
476         if self._put_threads is None:
477             # Start uploader threads.
478
479             # If we don't limit the Queue size, the upload queue can quickly
480             # grow to take up gigabytes of RAM if the writing process is
481             # generating data more quickly than it can be send to the Keep
482             # servers.
483             #
484             # With two upload threads and a queue size of 2, this means up to 4
485             # blocks pending.  If they are full 64 MiB blocks, that means up to
486             # 256 MiB of internal buffering, which is the same size as the
487             # default download block cache in KeepClient.
488             self._put_queue = Queue.Queue(maxsize=2)
489
490             self._put_threads = []
491             for i in xrange(0, self.num_put_threads):
492                 thread = threading.Thread(target=self._commit_bufferblock_worker)
493                 self._put_threads.append(thread)
494                 thread.daemon = True
495                 thread.start()
496
497     def _block_prefetch_worker(self):
498         """The background downloader thread."""
499         while True:
500             try:
501                 b = self._prefetch_queue.get()
502                 if b is None:
503                     return
504                 self._keep.get(b)
505             except Exception:
506                 pass
507
508     @synchronized
509     def start_get_threads(self):
510         if self._prefetch_threads is None:
511             self._prefetch_queue = Queue.Queue()
512             self._prefetch_threads = []
513             for i in xrange(0, self.num_get_threads):
514                 thread = threading.Thread(target=self._block_prefetch_worker)
515                 self._prefetch_threads.append(thread)
516                 thread.daemon = True
517                 thread.start()
518
519
520     @synchronized
521     def stop_threads(self):
522         """Shut down and wait for background upload and download threads to finish."""
523
524         if self._put_threads is not None:
525             for t in self._put_threads:
526                 self._put_queue.put(None)
527             for t in self._put_threads:
528                 t.join()
529         self._put_threads = None
530         self._put_queue = None
531
532         if self._prefetch_threads is not None:
533             for t in self._prefetch_threads:
534                 self._prefetch_queue.put(None)
535             for t in self._prefetch_threads:
536                 t.join()
537         self._prefetch_threads = None
538         self._prefetch_queue = None
539
540     def __enter__(self):
541         return self
542
543     def __exit__(self, exc_type, exc_value, traceback):
544         self.stop_threads()
545
546     def commit_bufferblock(self, block, sync):
547         """Initiate a background upload of a bufferblock.
548
549         :block:
550           The block object to upload
551
552         :sync:
553           If `sync` is True, upload the block synchronously.
554           If `sync` is False, upload the block asynchronously.  This will
555           return immediately unless the upload queue is at capacity, in
556           which case it will wait on an upload queue slot.
557
558         """
559
560         try:
561             # Mark the block as PENDING so to disallow any more appends.
562             block.set_state(_BufferBlock.PENDING)
563         except StateChangeError as e:
564             if e.state == _BufferBlock.PENDING:
565                 if sync:
566                     block.wait_for_commit.wait()
567                 else:
568                     return
569             if block.state() == _BufferBlock.COMMITTED:
570                 return
571             elif block.state() == _BufferBlock.ERROR:
572                 raise block.error
573             else:
574                 raise
575
576         if sync:
577             try:
578                 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
579                 block.set_state(_BufferBlock.COMMITTED, loc)
580             except Exception as e:
581                 block.set_state(_BufferBlock.ERROR, e)
582                 raise
583         else:
584             self.start_put_threads()
585             self._put_queue.put(block)
586
587     @synchronized
588     def get_bufferblock(self, locator):
589         return self._bufferblocks.get(locator)
590
591     @synchronized
592     def delete_bufferblock(self, locator):
593         bb = self._bufferblocks[locator]
594         bb.clear()
595         del self._bufferblocks[locator]
596
597     def get_block_contents(self, locator, num_retries, cache_only=False):
598         """Fetch a block.
599
600         First checks to see if the locator is a BufferBlock and return that, if
601         not, passes the request through to KeepClient.get().
602
603         """
604         with self.lock:
605             if locator in self._bufferblocks:
606                 bufferblock = self._bufferblocks[locator]
607                 if bufferblock.state() != _BufferBlock.COMMITTED:
608                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
609                 else:
610                     locator = bufferblock._locator
611         if cache_only:
612             return self._keep.get_from_cache(locator)
613         else:
614             return self._keep.get(locator, num_retries=num_retries)
615
616     def commit_all(self):
617         """Commit all outstanding buffer blocks.
618
619         This is a synchronous call, and will not return until all buffer blocks
620         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
621
622         """
623         with self.lock:
624             items = self._bufferblocks.items()
625
626         for k,v in items:
627             if v.state() != _BufferBlock.COMMITTED:
628                 v.owner.flush(sync=False)
629
630         with self.lock:
631             if self._put_queue is not None:
632                 self._put_queue.join()
633
634                 err = []
635                 for k,v in items:
636                     if v.state() == _BufferBlock.ERROR:
637                         err.append((v.locator(), v.error))
638                 if err:
639                     raise KeepWriteError("Error writing some blocks", err, label="block")
640
641         for k,v in items:
642             # flush again with sync=True to remove committed bufferblocks from
643             # the segments.
644             if v.owner:
645                 v.owner.flush(sync=True)
646
647     def block_prefetch(self, locator):
648         """Initiate a background download of a block.
649
650         This assumes that the underlying KeepClient implements a block cache,
651         so repeated requests for the same block will not result in repeated
652         downloads (unless the block is evicted from the cache.)  This method
653         does not block.
654
655         """
656
657         if not self.prefetch_enabled:
658             return
659
660         if self._keep.get_from_cache(locator) is not None:
661             return
662
663         with self.lock:
664             if locator in self._bufferblocks:
665                 return
666
667         self.start_get_threads()
668         self._prefetch_queue.put(locator)
669
670
671 class ArvadosFile(object):
672     """Represent a file in a Collection.
673
674     ArvadosFile manages the underlying representation of a file in Keep as a
675     sequence of segments spanning a set of blocks, and implements random
676     read/write access.
677
678     This object may be accessed from multiple threads.
679
680     """
681
682     def __init__(self, parent, name, stream=[], segments=[]):
683         """
684         ArvadosFile constructor.
685
686         :stream:
687           a list of Range objects representing a block stream
688
689         :segments:
690           a list of Range objects representing segments
691         """
692         self.parent = parent
693         self.name = name
694         self._committed = False
695         self._segments = []
696         self.lock = parent.root_collection().lock
697         for s in segments:
698             self._add_segment(stream, s.locator, s.range_size)
699         self._current_bblock = None
700
701     def writable(self):
702         return self.parent.writable()
703
704     @synchronized
705     def segments(self):
706         return copy.copy(self._segments)
707
708     @synchronized
709     def clone(self, new_parent, new_name):
710         """Make a copy of this file."""
711         cp = ArvadosFile(new_parent, new_name)
712         cp.replace_contents(self)
713         return cp
714
715     @must_be_writable
716     @synchronized
717     def replace_contents(self, other):
718         """Replace segments of this file with segments from another `ArvadosFile` object."""
719
720         map_loc = {}
721         self._segments = []
722         for other_segment in other.segments():
723             new_loc = other_segment.locator
724             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
725                 if other_segment.locator not in map_loc:
726                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
727                     if bufferblock.state() != _BufferBlock.WRITABLE:
728                         map_loc[other_segment.locator] = bufferblock.locator()
729                     else:
730                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
731                 new_loc = map_loc[other_segment.locator]
732
733             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
734
735         self._committed = False
736
737     def __eq__(self, other):
738         if other is self:
739             return True
740         if not isinstance(other, ArvadosFile):
741             return False
742
743         othersegs = other.segments()
744         with self.lock:
745             if len(self._segments) != len(othersegs):
746                 return False
747             for i in xrange(0, len(othersegs)):
748                 seg1 = self._segments[i]
749                 seg2 = othersegs[i]
750                 loc1 = seg1.locator
751                 loc2 = seg2.locator
752
753                 if self.parent._my_block_manager().is_bufferblock(loc1):
754                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
755
756                 if other.parent._my_block_manager().is_bufferblock(loc2):
757                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
758
759                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
760                     seg1.range_start != seg2.range_start or
761                     seg1.range_size != seg2.range_size or
762                     seg1.segment_offset != seg2.segment_offset):
763                     return False
764
765         return True
766
767     def __ne__(self, other):
768         return not self.__eq__(other)
769
770     @synchronized
771     def set_committed(self):
772         """Set committed flag to False"""
773         self._committed = True
774
775     @synchronized
776     def committed(self):
777         """Get whether this is committed or not."""
778         return self._committed
779
780     @must_be_writable
781     @synchronized
782     def truncate(self, size):
783         """Shrink the size of the file.
784
785         If `size` is less than the size of the file, the file contents after
786         `size` will be discarded.  If `size` is greater than the current size
787         of the file, an IOError will be raised.
788
789         """
790         if size < self.size():
791             new_segs = []
792             for r in self._segments:
793                 range_end = r.range_start+r.range_size
794                 if r.range_start >= size:
795                     # segment is past the trucate size, all done
796                     break
797                 elif size < range_end:
798                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
799                     nr.segment_offset = r.segment_offset
800                     new_segs.append(nr)
801                     break
802                 else:
803                     new_segs.append(r)
804
805             self._segments = new_segs
806             self._committed = False
807         elif size > self.size():
808             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
809
810     def readfrom(self, offset, size, num_retries, exact=False):
811         """Read up to `size` bytes from the file starting at `offset`.
812
813         :exact:
814          If False (default), return less data than requested if the read
815          crosses a block boundary and the next block isn't cached.  If True,
816          only return less data than requested when hitting EOF.
817         """
818
819         with self.lock:
820             if size == 0 or offset >= self.size():
821                 return ''
822             readsegs = locators_and_ranges(self._segments, offset, size)
823             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
824
825         locs = set()
826         data = []
827         for lr in readsegs:
828             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
829             if block:
830                 blockview = memoryview(block)
831                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
832                 locs.add(lr.locator)
833             else:
834                 break
835
836         for lr in prefetch:
837             if lr.locator not in locs:
838                 self.parent._my_block_manager().block_prefetch(lr.locator)
839                 locs.add(lr.locator)
840
841         return ''.join(data)
842
843     def _repack_writes(self, num_retries):
844         """Test if the buffer block has more data than actual segments.
845
846         This happens when a buffered write over-writes a file range written in
847         a previous buffered write.  Re-pack the buffer block for efficiency
848         and to avoid leaking information.
849
850         """
851         segs = self._segments
852
853         # Sum up the segments to get the total bytes of the file referencing
854         # into the buffer block.
855         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
856         write_total = sum([s.range_size for s in bufferblock_segs])
857
858         if write_total < self._current_bblock.size():
859             # There is more data in the buffer block than is actually accounted for by segments, so
860             # re-pack into a new buffer by copying over to a new buffer block.
861             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
862             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
863             for t in bufferblock_segs:
864                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
865                 t.segment_offset = new_bb.size() - t.range_size
866
867             self._current_bblock = new_bb
868
869     @must_be_writable
870     @synchronized
871     def writeto(self, offset, data, num_retries):
872         """Write `data` to the file starting at `offset`.
873
874         This will update existing bytes and/or extend the size of the file as
875         necessary.
876
877         """
878         if len(data) == 0:
879             return
880
881         if offset > self.size():
882             raise ArgumentError("Offset is past the end of the file")
883
884         if len(data) > config.KEEP_BLOCK_SIZE:
885             # Chunk it up into smaller writes
886             n = 0
887             dataview = memoryview(data)
888             while n < len(data):
889                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
890                 n += config.KEEP_BLOCK_SIZE
891             return
892
893         self._committed = False
894
895         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
896             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
897
898         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
899             self._repack_writes(num_retries)
900             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
901                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
902                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
903
904         self._current_bblock.append(data)
905
906         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
907
908         self.parent.notify(WRITE, self.parent, self.name, (self, self))
909
910         return len(data)
911
912     @synchronized
913     def flush(self, sync=True, num_retries=0):
914         """Flush the current bufferblock to Keep.
915
916         :sync:
917           If True, commit block synchronously, wait until buffer block has been written.
918           If False, commit block asynchronously, return immediately after putting block into
919           the keep put queue.
920         """
921         if self.committed():
922             return
923
924         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
925             if self._current_bblock.state() == _BufferBlock.WRITABLE:
926                 self._repack_writes(num_retries)
927             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
928
929         if sync:
930             to_delete = set()
931             for s in self._segments:
932                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
933                 if bb:
934                     if bb.state() != _BufferBlock.COMMITTED:
935                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
936                     to_delete.add(s.locator)
937                     s.locator = bb.locator()
938             for s in to_delete:
939                self.parent._my_block_manager().delete_bufferblock(s)
940
941         self.parent.notify(MOD, self.parent, self.name, (self, self))
942
943     @must_be_writable
944     @synchronized
945     def add_segment(self, blocks, pos, size):
946         """Add a segment to the end of the file.
947
948         `pos` and `offset` reference a section of the stream described by
949         `blocks` (a list of Range objects)
950
951         """
952         self._add_segment(blocks, pos, size)
953
954     def _add_segment(self, blocks, pos, size):
955         """Internal implementation of add_segment."""
956         self._committed = False
957         for lr in locators_and_ranges(blocks, pos, size):
958             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
959             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
960             self._segments.append(r)
961
962     @synchronized
963     def size(self):
964         """Get the file size."""
965         if self._segments:
966             n = self._segments[-1]
967             return n.range_start + n.range_size
968         else:
969             return 0
970
971     @synchronized
972     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
973         buf = ""
974         filestream = []
975         for segment in self.segments:
976             loc = segment.locator
977             if loc.startswith("bufferblock"):
978                 loc = self._bufferblocks[loc].calculate_locator()
979             if portable_locators:
980                 loc = KeepLocator(loc).stripped()
981             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
982                                  segment.segment_offset, segment.range_size))
983         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
984         buf += "\n"
985         return buf
986
987     @must_be_writable
988     @synchronized
989     def _reparent(self, newparent, newname):
990         self._committed = False
991         self.flush(sync=True)
992         self.parent.remove(self.name)
993         self.parent = newparent
994         self.name = newname
995         self.lock = self.parent.root_collection().lock
996
997
998 class ArvadosFileReader(ArvadosFileReaderBase):
999     """Wraps ArvadosFile in a file-like object supporting reading only.
1000
1001     Be aware that this class is NOT thread safe as there is no locking around
1002     updating file pointer.
1003
1004     """
1005
1006     def __init__(self, arvadosfile, num_retries=None):
1007         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1008         self.arvadosfile = arvadosfile
1009
1010     def size(self):
1011         return self.arvadosfile.size()
1012
1013     def stream_name(self):
1014         return self.arvadosfile.parent.stream_name()
1015
1016     @_FileLikeObjectBase._before_close
1017     @retry_method
1018     def read(self, size=None, num_retries=None):
1019         """Read up to `size` bytes from the file and return the result.
1020
1021         Starts at the current file position.  If `size` is None, read the
1022         entire remainder of the file.
1023         """
1024         if size is None:
1025             data = []
1026             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1027             while rd:
1028                 data.append(rd)
1029                 self._filepos += len(rd)
1030                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1031             return ''.join(data)
1032         else:
1033             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1034             self._filepos += len(data)
1035             return data
1036
1037     @_FileLikeObjectBase._before_close
1038     @retry_method
1039     def readfrom(self, offset, size, num_retries=None):
1040         """Read up to `size` bytes from the stream, starting at the specified file offset.
1041
1042         This method does not change the file position.
1043         """
1044         return self.arvadosfile.readfrom(offset, size, num_retries)
1045
1046     def flush(self):
1047         pass
1048
1049
1050 class ArvadosFileWriter(ArvadosFileReader):
1051     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1052
1053     Be aware that this class is NOT thread safe as there is no locking around
1054     updating file pointer.
1055
1056     """
1057
1058     def __init__(self, arvadosfile, mode, num_retries=None):
1059         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1060         self.mode = mode
1061
1062     @_FileLikeObjectBase._before_close
1063     @retry_method
1064     def write(self, data, num_retries=None):
1065         if self.mode[0] == "a":
1066             self.arvadosfile.writeto(self.size(), data, num_retries)
1067         else:
1068             self.arvadosfile.writeto(self._filepos, data, num_retries)
1069             self._filepos += len(data)
1070         return len(data)
1071
1072     @_FileLikeObjectBase._before_close
1073     @retry_method
1074     def writelines(self, seq, num_retries=None):
1075         for s in seq:
1076             self.write(s, num_retries=num_retries)
1077
1078     @_FileLikeObjectBase._before_close
1079     def truncate(self, size=None):
1080         if size is None:
1081             size = self._filepos
1082         self.arvadosfile.truncate(size)
1083         if self._filepos > self.size():
1084             self._filepos = self.size()
1085
1086     @_FileLikeObjectBase._before_close
1087     def flush(self):
1088         self.arvadosfile.flush()
1089
1090     def close(self):
1091         if not self.closed:
1092             self.flush()
1093             super(ArvadosFileWriter, self).close()