Merge branch 'master' into 7167-keep-rsync
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21 WRITE = "write"
22
23 _logger = logging.getLogger('arvados.arvfile')
24
25 def split(path):
26     """split(path) -> streamname, filename
27
28     Separate the stream name and file name in a /-separated stream path and
29     return a tuple (stream_name, file_name).  If no stream name is available,
30     assume '.'.
31
32     """
33     try:
34         stream_name, file_name = path.rsplit('/', 1)
35     except ValueError:  # No / in string
36         stream_name, file_name = '.', path
37     return stream_name, file_name
38
39 class _FileLikeObjectBase(object):
40     def __init__(self, name, mode):
41         self.name = name
42         self.mode = mode
43         self.closed = False
44
45     @staticmethod
46     def _before_close(orig_func):
47         @functools.wraps(orig_func)
48         def before_close_wrapper(self, *args, **kwargs):
49             if self.closed:
50                 raise ValueError("I/O operation on closed stream file")
51             return orig_func(self, *args, **kwargs)
52         return before_close_wrapper
53
54     def __enter__(self):
55         return self
56
57     def __exit__(self, exc_type, exc_value, traceback):
58         try:
59             self.close()
60         except Exception:
61             if exc_type is None:
62                 raise
63
64     def close(self):
65         self.closed = True
66
67
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69     def __init__(self, name, mode, num_retries=None):
70         super(ArvadosFileReaderBase, self).__init__(name, mode)
71         self._filepos = 0L
72         self.num_retries = num_retries
73         self._readline_cache = (None, None)
74
75     def __iter__(self):
76         while True:
77             data = self.readline()
78             if not data:
79                 break
80             yield data
81
82     def decompressed_name(self):
83         return re.sub('\.(bz2|gz)$', '', self.name)
84
85     @_FileLikeObjectBase._before_close
86     def seek(self, pos, whence=os.SEEK_SET):
87         if whence == os.SEEK_CUR:
88             pos += self._filepos
89         elif whence == os.SEEK_END:
90             pos += self.size()
91         self._filepos = min(max(pos, 0L), self.size())
92
93     def tell(self):
94         return self._filepos
95
96     @_FileLikeObjectBase._before_close
97     @retry_method
98     def readall(self, size=2**20, num_retries=None):
99         while True:
100             data = self.read(size, num_retries=num_retries)
101             if data == '':
102                 break
103             yield data
104
105     @_FileLikeObjectBase._before_close
106     @retry_method
107     def readline(self, size=float('inf'), num_retries=None):
108         cache_pos, cache_data = self._readline_cache
109         if self.tell() == cache_pos:
110             data = [cache_data]
111         else:
112             data = ['']
113         data_size = len(data[-1])
114         while (data_size < size) and ('\n' not in data[-1]):
115             next_read = self.read(2 ** 20, num_retries=num_retries)
116             if not next_read:
117                 break
118             data.append(next_read)
119             data_size += len(next_read)
120         data = ''.join(data)
121         try:
122             nextline_index = data.index('\n') + 1
123         except ValueError:
124             nextline_index = len(data)
125         nextline_index = min(nextline_index, size)
126         self._readline_cache = (self.tell(), data[nextline_index:])
127         return data[:nextline_index]
128
129     @_FileLikeObjectBase._before_close
130     @retry_method
131     def decompress(self, decompress, size, num_retries=None):
132         for segment in self.readall(size, num_retries=num_retries):
133             data = decompress(segment)
134             if data:
135                 yield data
136
137     @_FileLikeObjectBase._before_close
138     @retry_method
139     def readall_decompressed(self, size=2**20, num_retries=None):
140         self.seek(0)
141         if self.name.endswith('.bz2'):
142             dc = bz2.BZ2Decompressor()
143             return self.decompress(dc.decompress, size,
144                                    num_retries=num_retries)
145         elif self.name.endswith('.gz'):
146             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
147             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
148                                    size, num_retries=num_retries)
149         else:
150             return self.readall(size, num_retries=num_retries)
151
152     @_FileLikeObjectBase._before_close
153     @retry_method
154     def readlines(self, sizehint=float('inf'), num_retries=None):
155         data = []
156         data_size = 0
157         for s in self.readall(num_retries=num_retries):
158             data.append(s)
159             data_size += len(s)
160             if data_size >= sizehint:
161                 break
162         return ''.join(data).splitlines(True)
163
164     def size(self):
165         raise NotImplementedError()
166
167     def read(self, size, num_retries=None):
168         raise NotImplementedError()
169
170     def readfrom(self, start, size, num_retries=None):
171         raise NotImplementedError()
172
173
174 class StreamFileReader(ArvadosFileReaderBase):
175     class _NameAttribute(str):
176         # The Python file API provides a plain .name attribute.
177         # Older SDK provided a name() method.
178         # This class provides both, for maximum compatibility.
179         def __call__(self):
180             return self
181
182     def __init__(self, stream, segments, name):
183         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
184         self._stream = stream
185         self.segments = segments
186
187     def stream_name(self):
188         return self._stream.name()
189
190     def size(self):
191         n = self.segments[-1]
192         return n.range_start + n.range_size
193
194     @_FileLikeObjectBase._before_close
195     @retry_method
196     def read(self, size, num_retries=None):
197         """Read up to 'size' bytes from the stream, starting at the current file position"""
198         if size == 0:
199             return ''
200
201         data = ''
202         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
203         if available_chunks:
204             lr = available_chunks[0]
205             data = self._stream.readfrom(lr.locator+lr.segment_offset,
206                                           lr.segment_size,
207                                           num_retries=num_retries)
208
209         self._filepos += len(data)
210         return data
211
212     @_FileLikeObjectBase._before_close
213     @retry_method
214     def readfrom(self, start, size, num_retries=None):
215         """Read up to 'size' bytes from the stream, starting at 'start'"""
216         if size == 0:
217             return ''
218
219         data = []
220         for lr in locators_and_ranges(self.segments, start, size):
221             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
222                                               num_retries=num_retries))
223         return ''.join(data)
224
225     def as_manifest(self):
226         segs = []
227         for r in self.segments:
228             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
229         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
230
231
232 def synchronized(orig_func):
233     @functools.wraps(orig_func)
234     def synchronized_wrapper(self, *args, **kwargs):
235         with self.lock:
236             return orig_func(self, *args, **kwargs)
237     return synchronized_wrapper
238
239
240 class StateChangeError(Exception):
241     def __init__(self, message, state, nextstate):
242         super(StateChangeError, self).__init__(message)
243         self.state = state
244         self.nextstate = nextstate
245
246 class _BufferBlock(object):
247     """A stand-in for a Keep block that is in the process of being written.
248
249     Writers can append to it, get the size, and compute the Keep locator.
250     There are three valid states:
251
252     WRITABLE
253       Can append to block.
254
255     PENDING
256       Block is in the process of being uploaded to Keep, append is an error.
257
258     COMMITTED
259       The block has been written to Keep, its internal buffer has been
260       released, fetching the block will fetch it via keep client (since we
261       discarded the internal copy), and identifiers referring to the BufferBlock
262       can be replaced with the block locator.
263
264     """
265
266     WRITABLE = 0
267     PENDING = 1
268     COMMITTED = 2
269     ERROR = 3
270
271     def __init__(self, blockid, starting_capacity, owner):
272         """
273         :blockid:
274           the identifier for this block
275
276         :starting_capacity:
277           the initial buffer capacity
278
279         :owner:
280           ArvadosFile that owns this block
281
282         """
283         self.blockid = blockid
284         self.buffer_block = bytearray(starting_capacity)
285         self.buffer_view = memoryview(self.buffer_block)
286         self.write_pointer = 0
287         self._state = _BufferBlock.WRITABLE
288         self._locator = None
289         self.owner = owner
290         self.lock = threading.Lock()
291         self.wait_for_commit = threading.Event()
292         self.error = None
293
294     @synchronized
295     def append(self, data):
296         """Append some data to the buffer.
297
298         Only valid if the block is in WRITABLE state.  Implements an expanding
299         buffer, doubling capacity as needed to accomdate all the data.
300
301         """
302         if self._state == _BufferBlock.WRITABLE:
303             while (self.write_pointer+len(data)) > len(self.buffer_block):
304                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
305                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
306                 self.buffer_block = new_buffer_block
307                 self.buffer_view = memoryview(self.buffer_block)
308             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
309             self.write_pointer += len(data)
310             self._locator = None
311         else:
312             raise AssertionError("Buffer block is not writable")
313
314     STATE_TRANSITIONS = frozenset([
315             (WRITABLE, PENDING),
316             (PENDING, COMMITTED),
317             (PENDING, ERROR),
318             (ERROR, PENDING)])
319
320     @synchronized
321     def set_state(self, nextstate, val=None):
322         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
323             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
324         self._state = nextstate
325
326         if self._state == _BufferBlock.PENDING:
327             self.wait_for_commit.clear()
328
329         if self._state == _BufferBlock.COMMITTED:
330             self._locator = val
331             self.buffer_view = None
332             self.buffer_block = None
333             self.wait_for_commit.set()
334
335         if self._state == _BufferBlock.ERROR:
336             self.error = val
337             self.wait_for_commit.set()
338
339     @synchronized
340     def state(self):
341         return self._state
342
343     def size(self):
344         """The amount of data written to the buffer."""
345         return self.write_pointer
346
347     @synchronized
348     def locator(self):
349         """The Keep locator for this buffer's contents."""
350         if self._locator is None:
351             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
352         return self._locator
353
354     @synchronized
355     def clone(self, new_blockid, owner):
356         if self._state == _BufferBlock.COMMITTED:
357             raise AssertionError("Cannot duplicate committed buffer block")
358         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
359         bufferblock.append(self.buffer_view[0:self.size()])
360         return bufferblock
361
362     @synchronized
363     def clear(self):
364         self.owner = None
365         self.buffer_block = None
366         self.buffer_view = None
367
368
369 class NoopLock(object):
370     def __enter__(self):
371         return self
372
373     def __exit__(self, exc_type, exc_value, traceback):
374         pass
375
376     def acquire(self, blocking=False):
377         pass
378
379     def release(self):
380         pass
381
382
383 def must_be_writable(orig_func):
384     @functools.wraps(orig_func)
385     def must_be_writable_wrapper(self, *args, **kwargs):
386         if not self.writable():
387             raise IOError(errno.EROFS, "Collection is read-only.")
388         return orig_func(self, *args, **kwargs)
389     return must_be_writable_wrapper
390
391
392 class _BlockManager(object):
393     """BlockManager handles buffer blocks.
394
395     Also handles background block uploads, and background block prefetch for a
396     Collection of ArvadosFiles.
397
398     """
399
400     DEFAULT_PUT_THREADS = 2
401     DEFAULT_GET_THREADS = 2
402
403     def __init__(self, keep):
404         """keep: KeepClient object to use"""
405         self._keep = keep
406         self._bufferblocks = {}
407         self._put_queue = None
408         self._put_threads = None
409         self._prefetch_queue = None
410         self._prefetch_threads = None
411         self.lock = threading.Lock()
412         self.prefetch_enabled = True
413         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
414         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
415
416     @synchronized
417     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
418         """Allocate a new, empty bufferblock in WRITABLE state and return it.
419
420         :blockid:
421           optional block identifier, otherwise one will be automatically assigned
422
423         :starting_capacity:
424           optional capacity, otherwise will use default capacity
425
426         :owner:
427           ArvadosFile that owns this block
428
429         """
430         if blockid is None:
431             blockid = "bufferblock%i" % len(self._bufferblocks)
432         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
433         self._bufferblocks[bufferblock.blockid] = bufferblock
434         return bufferblock
435
436     @synchronized
437     def dup_block(self, block, owner):
438         """Create a new bufferblock initialized with the content of an existing bufferblock.
439
440         :block:
441           the buffer block to copy.
442
443         :owner:
444           ArvadosFile that owns the new block
445
446         """
447         new_blockid = "bufferblock%i" % len(self._bufferblocks)
448         bufferblock = block.clone(new_blockid, owner)
449         self._bufferblocks[bufferblock.blockid] = bufferblock
450         return bufferblock
451
452     @synchronized
453     def is_bufferblock(self, locator):
454         return locator in self._bufferblocks
455
456     def _commit_bufferblock_worker(self):
457         """Background uploader thread."""
458
459         while True:
460             try:
461                 bufferblock = self._put_queue.get()
462                 if bufferblock is None:
463                     return
464
465                 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
466                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
467
468             except Exception as e:
469                 bufferblock.set_state(_BufferBlock.ERROR, e)
470             finally:
471                 if self._put_queue is not None:
472                     self._put_queue.task_done()
473
474     @synchronized
475     def start_put_threads(self):
476         if self._put_threads is None:
477             # Start uploader threads.
478
479             # If we don't limit the Queue size, the upload queue can quickly
480             # grow to take up gigabytes of RAM if the writing process is
481             # generating data more quickly than it can be send to the Keep
482             # servers.
483             #
484             # With two upload threads and a queue size of 2, this means up to 4
485             # blocks pending.  If they are full 64 MiB blocks, that means up to
486             # 256 MiB of internal buffering, which is the same size as the
487             # default download block cache in KeepClient.
488             self._put_queue = Queue.Queue(maxsize=2)
489
490             self._put_threads = []
491             for i in xrange(0, self.num_put_threads):
492                 thread = threading.Thread(target=self._commit_bufferblock_worker)
493                 self._put_threads.append(thread)
494                 thread.daemon = True
495                 thread.start()
496
497     def _block_prefetch_worker(self):
498         """The background downloader thread."""
499         while True:
500             try:
501                 b = self._prefetch_queue.get()
502                 if b is None:
503                     return
504                 self._keep.get(b)
505             except Exception:
506                 pass
507
508     @synchronized
509     def start_get_threads(self):
510         if self._prefetch_threads is None:
511             self._prefetch_queue = Queue.Queue()
512             self._prefetch_threads = []
513             for i in xrange(0, self.num_get_threads):
514                 thread = threading.Thread(target=self._block_prefetch_worker)
515                 self._prefetch_threads.append(thread)
516                 thread.daemon = True
517                 thread.start()
518
519
520     @synchronized
521     def stop_threads(self):
522         """Shut down and wait for background upload and download threads to finish."""
523
524         if self._put_threads is not None:
525             for t in self._put_threads:
526                 self._put_queue.put(None)
527             for t in self._put_threads:
528                 t.join()
529         self._put_threads = None
530         self._put_queue = None
531
532         if self._prefetch_threads is not None:
533             for t in self._prefetch_threads:
534                 self._prefetch_queue.put(None)
535             for t in self._prefetch_threads:
536                 t.join()
537         self._prefetch_threads = None
538         self._prefetch_queue = None
539
540     def __enter__(self):
541         return self
542
543     def __exit__(self, exc_type, exc_value, traceback):
544         self.stop_threads()
545
546     def __del__(self):
547         self.stop_threads()
548
549     def commit_bufferblock(self, block, sync):
550         """Initiate a background upload of a bufferblock.
551
552         :block:
553           The block object to upload
554
555         :sync:
556           If `sync` is True, upload the block synchronously.
557           If `sync` is False, upload the block asynchronously.  This will
558           return immediately unless the upload queue is at capacity, in
559           which case it will wait on an upload queue slot.
560
561         """
562
563         try:
564             # Mark the block as PENDING so to disallow any more appends.
565             block.set_state(_BufferBlock.PENDING)
566         except StateChangeError as e:
567             if e.state == _BufferBlock.PENDING:
568                 if sync:
569                     block.wait_for_commit.wait()
570                 else:
571                     return
572             if block.state() == _BufferBlock.COMMITTED:
573                 return
574             elif block.state() == _BufferBlock.ERROR:
575                 raise block.error
576             else:
577                 raise
578
579         if sync:
580             try:
581                 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
582                 block.set_state(_BufferBlock.COMMITTED, loc)
583             except Exception as e:
584                 block.set_state(_BufferBlock.ERROR, e)
585                 raise
586         else:
587             self.start_put_threads()
588             self._put_queue.put(block)
589
590     @synchronized
591     def get_bufferblock(self, locator):
592         return self._bufferblocks.get(locator)
593
594     @synchronized
595     def delete_bufferblock(self, locator):
596         bb = self._bufferblocks[locator]
597         bb.clear()
598         del self._bufferblocks[locator]
599
600     def get_block_contents(self, locator, num_retries, cache_only=False):
601         """Fetch a block.
602
603         First checks to see if the locator is a BufferBlock and return that, if
604         not, passes the request through to KeepClient.get().
605
606         """
607         with self.lock:
608             if locator in self._bufferblocks:
609                 bufferblock = self._bufferblocks[locator]
610                 if bufferblock.state() != _BufferBlock.COMMITTED:
611                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
612                 else:
613                     locator = bufferblock._locator
614         if cache_only:
615             return self._keep.get_from_cache(locator)
616         else:
617             return self._keep.get(locator, num_retries=num_retries)
618
619     def commit_all(self):
620         """Commit all outstanding buffer blocks.
621
622         This is a synchronous call, and will not return until all buffer blocks
623         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
624
625         """
626         with self.lock:
627             items = self._bufferblocks.items()
628
629         for k,v in items:
630             if v.state() != _BufferBlock.COMMITTED:
631                 v.owner.flush(sync=False)
632
633         with self.lock:
634             if self._put_queue is not None:
635                 self._put_queue.join()
636
637                 err = []
638                 for k,v in items:
639                     if v.state() == _BufferBlock.ERROR:
640                         err.append((v.locator(), v.error))
641                 if err:
642                     raise KeepWriteError("Error writing some blocks", err, label="block")
643
644         for k,v in items:
645             # flush again with sync=True to remove committed bufferblocks from
646             # the segments.
647             if v.owner:
648                 v.owner.flush(sync=True)
649
650     def block_prefetch(self, locator):
651         """Initiate a background download of a block.
652
653         This assumes that the underlying KeepClient implements a block cache,
654         so repeated requests for the same block will not result in repeated
655         downloads (unless the block is evicted from the cache.)  This method
656         does not block.
657
658         """
659
660         if not self.prefetch_enabled:
661             return
662
663         if self._keep.get_from_cache(locator) is not None:
664             return
665
666         with self.lock:
667             if locator in self._bufferblocks:
668                 return
669
670         self.start_get_threads()
671         self._prefetch_queue.put(locator)
672
673
674 class ArvadosFile(object):
675     """Represent a file in a Collection.
676
677     ArvadosFile manages the underlying representation of a file in Keep as a
678     sequence of segments spanning a set of blocks, and implements random
679     read/write access.
680
681     This object may be accessed from multiple threads.
682
683     """
684
685     def __init__(self, parent, name, stream=[], segments=[]):
686         """
687         ArvadosFile constructor.
688
689         :stream:
690           a list of Range objects representing a block stream
691
692         :segments:
693           a list of Range objects representing segments
694         """
695         self.parent = parent
696         self.name = name
697         self._committed = False
698         self._segments = []
699         self.lock = parent.root_collection().lock
700         for s in segments:
701             self._add_segment(stream, s.locator, s.range_size)
702         self._current_bblock = None
703
704     def writable(self):
705         return self.parent.writable()
706
707     @synchronized
708     def segments(self):
709         return copy.copy(self._segments)
710
711     @synchronized
712     def clone(self, new_parent, new_name):
713         """Make a copy of this file."""
714         cp = ArvadosFile(new_parent, new_name)
715         cp.replace_contents(self)
716         return cp
717
718     @must_be_writable
719     @synchronized
720     def replace_contents(self, other):
721         """Replace segments of this file with segments from another `ArvadosFile` object."""
722
723         map_loc = {}
724         self._segments = []
725         for other_segment in other.segments():
726             new_loc = other_segment.locator
727             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
728                 if other_segment.locator not in map_loc:
729                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
730                     if bufferblock.state() != _BufferBlock.WRITABLE:
731                         map_loc[other_segment.locator] = bufferblock.locator()
732                     else:
733                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
734                 new_loc = map_loc[other_segment.locator]
735
736             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
737
738         self._committed = False
739
740     def __eq__(self, other):
741         if other is self:
742             return True
743         if not isinstance(other, ArvadosFile):
744             return False
745
746         othersegs = other.segments()
747         with self.lock:
748             if len(self._segments) != len(othersegs):
749                 return False
750             for i in xrange(0, len(othersegs)):
751                 seg1 = self._segments[i]
752                 seg2 = othersegs[i]
753                 loc1 = seg1.locator
754                 loc2 = seg2.locator
755
756                 if self.parent._my_block_manager().is_bufferblock(loc1):
757                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
758
759                 if other.parent._my_block_manager().is_bufferblock(loc2):
760                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
761
762                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
763                     seg1.range_start != seg2.range_start or
764                     seg1.range_size != seg2.range_size or
765                     seg1.segment_offset != seg2.segment_offset):
766                     return False
767
768         return True
769
770     def __ne__(self, other):
771         return not self.__eq__(other)
772
773     @synchronized
774     def set_committed(self):
775         """Set committed flag to False"""
776         self._committed = True
777
778     @synchronized
779     def committed(self):
780         """Get whether this is committed or not."""
781         return self._committed
782
783     @must_be_writable
784     @synchronized
785     def truncate(self, size):
786         """Shrink the size of the file.
787
788         If `size` is less than the size of the file, the file contents after
789         `size` will be discarded.  If `size` is greater than the current size
790         of the file, an IOError will be raised.
791
792         """
793         if size < self.size():
794             new_segs = []
795             for r in self._segments:
796                 range_end = r.range_start+r.range_size
797                 if r.range_start >= size:
798                     # segment is past the trucate size, all done
799                     break
800                 elif size < range_end:
801                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
802                     nr.segment_offset = r.segment_offset
803                     new_segs.append(nr)
804                     break
805                 else:
806                     new_segs.append(r)
807
808             self._segments = new_segs
809             self._committed = False
810         elif size > self.size():
811             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
812
813     def readfrom(self, offset, size, num_retries, exact=False):
814         """Read up to `size` bytes from the file starting at `offset`.
815
816         :exact:
817          If False (default), return less data than requested if the read
818          crosses a block boundary and the next block isn't cached.  If True,
819          only return less data than requested when hitting EOF.
820         """
821
822         with self.lock:
823             if size == 0 or offset >= self.size():
824                 return ''
825             readsegs = locators_and_ranges(self._segments, offset, size)
826             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
827
828         locs = set()
829         data = []
830         for lr in readsegs:
831             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
832             if block:
833                 blockview = memoryview(block)
834                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
835                 locs.add(lr.locator)
836             else:
837                 break
838
839         for lr in prefetch:
840             if lr.locator not in locs:
841                 self.parent._my_block_manager().block_prefetch(lr.locator)
842                 locs.add(lr.locator)
843
844         return ''.join(data)
845
846     def _repack_writes(self, num_retries):
847         """Test if the buffer block has more data than actual segments.
848
849         This happens when a buffered write over-writes a file range written in
850         a previous buffered write.  Re-pack the buffer block for efficiency
851         and to avoid leaking information.
852
853         """
854         segs = self._segments
855
856         # Sum up the segments to get the total bytes of the file referencing
857         # into the buffer block.
858         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
859         write_total = sum([s.range_size for s in bufferblock_segs])
860
861         if write_total < self._current_bblock.size():
862             # There is more data in the buffer block than is actually accounted for by segments, so
863             # re-pack into a new buffer by copying over to a new buffer block.
864             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
865             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
866             for t in bufferblock_segs:
867                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
868                 t.segment_offset = new_bb.size() - t.range_size
869
870             self._current_bblock = new_bb
871
872     @must_be_writable
873     @synchronized
874     def writeto(self, offset, data, num_retries):
875         """Write `data` to the file starting at `offset`.
876
877         This will update existing bytes and/or extend the size of the file as
878         necessary.
879
880         """
881         if len(data) == 0:
882             return
883
884         if offset > self.size():
885             raise ArgumentError("Offset is past the end of the file")
886
887         if len(data) > config.KEEP_BLOCK_SIZE:
888             # Chunk it up into smaller writes
889             n = 0
890             dataview = memoryview(data)
891             while n < len(data):
892                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
893                 n += config.KEEP_BLOCK_SIZE
894             return
895
896         self._committed = False
897
898         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
899             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
900
901         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
902             self._repack_writes(num_retries)
903             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
904                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
905                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
906
907         self._current_bblock.append(data)
908
909         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
910
911         self.parent.notify(WRITE, self.parent, self.name, (self, self))
912
913         return len(data)
914
915     @synchronized
916     def flush(self, sync=True, num_retries=0):
917         """Flush the current bufferblock to Keep.
918
919         :sync:
920           If True, commit block synchronously, wait until buffer block has been written.
921           If False, commit block asynchronously, return immediately after putting block into
922           the keep put queue.
923         """
924         if self.committed():
925             return
926
927         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
928             if self._current_bblock.state() == _BufferBlock.WRITABLE:
929                 self._repack_writes(num_retries)
930             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
931
932         if sync:
933             to_delete = set()
934             for s in self._segments:
935                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
936                 if bb:
937                     if bb.state() != _BufferBlock.COMMITTED:
938                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
939                     to_delete.add(s.locator)
940                     s.locator = bb.locator()
941             for s in to_delete:
942                self.parent._my_block_manager().delete_bufferblock(s)
943
944         self.parent.notify(MOD, self.parent, self.name, (self, self))
945
946     @must_be_writable
947     @synchronized
948     def add_segment(self, blocks, pos, size):
949         """Add a segment to the end of the file.
950
951         `pos` and `offset` reference a section of the stream described by
952         `blocks` (a list of Range objects)
953
954         """
955         self._add_segment(blocks, pos, size)
956
957     def _add_segment(self, blocks, pos, size):
958         """Internal implementation of add_segment."""
959         self._committed = False
960         for lr in locators_and_ranges(blocks, pos, size):
961             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
962             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
963             self._segments.append(r)
964
965     @synchronized
966     def size(self):
967         """Get the file size."""
968         if self._segments:
969             n = self._segments[-1]
970             return n.range_start + n.range_size
971         else:
972             return 0
973
974     @synchronized
975     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
976         buf = ""
977         filestream = []
978         for segment in self.segments:
979             loc = segment.locator
980             if loc.startswith("bufferblock"):
981                 loc = self._bufferblocks[loc].calculate_locator()
982             if portable_locators:
983                 loc = KeepLocator(loc).stripped()
984             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
985                                  segment.segment_offset, segment.range_size))
986         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
987         buf += "\n"
988         return buf
989
990     @must_be_writable
991     @synchronized
992     def _reparent(self, newparent, newname):
993         self._committed = False
994         self.flush(sync=True)
995         self.parent.remove(self.name)
996         self.parent = newparent
997         self.name = newname
998         self.lock = self.parent.root_collection().lock
999
1000
1001 class ArvadosFileReader(ArvadosFileReaderBase):
1002     """Wraps ArvadosFile in a file-like object supporting reading only.
1003
1004     Be aware that this class is NOT thread safe as there is no locking around
1005     updating file pointer.
1006
1007     """
1008
1009     def __init__(self, arvadosfile, num_retries=None):
1010         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1011         self.arvadosfile = arvadosfile
1012
1013     def size(self):
1014         return self.arvadosfile.size()
1015
1016     def stream_name(self):
1017         return self.arvadosfile.parent.stream_name()
1018
1019     @_FileLikeObjectBase._before_close
1020     @retry_method
1021     def read(self, size=None, num_retries=None):
1022         """Read up to `size` bytes from the file and return the result.
1023
1024         Starts at the current file position.  If `size` is None, read the
1025         entire remainder of the file.
1026         """
1027         if size is None:
1028             data = []
1029             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1030             while rd:
1031                 data.append(rd)
1032                 self._filepos += len(rd)
1033                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1034             return ''.join(data)
1035         else:
1036             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1037             self._filepos += len(data)
1038             return data
1039
1040     @_FileLikeObjectBase._before_close
1041     @retry_method
1042     def readfrom(self, offset, size, num_retries=None):
1043         """Read up to `size` bytes from the stream, starting at the specified file offset.
1044
1045         This method does not change the file position.
1046         """
1047         return self.arvadosfile.readfrom(offset, size, num_retries)
1048
1049     def flush(self):
1050         pass
1051
1052
1053 class ArvadosFileWriter(ArvadosFileReader):
1054     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1055
1056     Be aware that this class is NOT thread safe as there is no locking around
1057     updating file pointer.
1058
1059     """
1060
1061     def __init__(self, arvadosfile, mode, num_retries=None):
1062         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1063         self.mode = mode
1064
1065     @_FileLikeObjectBase._before_close
1066     @retry_method
1067     def write(self, data, num_retries=None):
1068         if self.mode[0] == "a":
1069             self.arvadosfile.writeto(self.size(), data, num_retries)
1070         else:
1071             self.arvadosfile.writeto(self._filepos, data, num_retries)
1072             self._filepos += len(data)
1073         return len(data)
1074
1075     @_FileLikeObjectBase._before_close
1076     @retry_method
1077     def writelines(self, seq, num_retries=None):
1078         for s in seq:
1079             self.write(s, num_retries=num_retries)
1080
1081     @_FileLikeObjectBase._before_close
1082     def truncate(self, size=None):
1083         if size is None:
1084             size = self._filepos
1085         self.arvadosfile.truncate(size)
1086         if self._filepos > self.size():
1087             self._filepos = self.size()
1088
1089     @_FileLikeObjectBase._before_close
1090     def flush(self):
1091         self.arvadosfile.flush()
1092
1093     def close(self):
1094         if not self.closed:
1095             self.flush()
1096             super(ArvadosFileWriter, self).close()