Merge branch '9776-cwl-HOME-TMPDIR' closes #9776
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21 WRITE = "write"
22
23 _logger = logging.getLogger('arvados.arvfile')
24
25 def split(path):
26     """split(path) -> streamname, filename
27
28     Separate the stream name and file name in a /-separated stream path and
29     return a tuple (stream_name, file_name).  If no stream name is available,
30     assume '.'.
31
32     """
33     try:
34         stream_name, file_name = path.rsplit('/', 1)
35     except ValueError:  # No / in string
36         stream_name, file_name = '.', path
37     return stream_name, file_name
38
39 class _FileLikeObjectBase(object):
40     def __init__(self, name, mode):
41         self.name = name
42         self.mode = mode
43         self.closed = False
44
45     @staticmethod
46     def _before_close(orig_func):
47         @functools.wraps(orig_func)
48         def before_close_wrapper(self, *args, **kwargs):
49             if self.closed:
50                 raise ValueError("I/O operation on closed stream file")
51             return orig_func(self, *args, **kwargs)
52         return before_close_wrapper
53
54     def __enter__(self):
55         return self
56
57     def __exit__(self, exc_type, exc_value, traceback):
58         try:
59             self.close()
60         except Exception:
61             if exc_type is None:
62                 raise
63
64     def close(self):
65         self.closed = True
66
67
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69     def __init__(self, name, mode, num_retries=None):
70         super(ArvadosFileReaderBase, self).__init__(name, mode)
71         self._filepos = 0L
72         self.num_retries = num_retries
73         self._readline_cache = (None, None)
74
75     def __iter__(self):
76         while True:
77             data = self.readline()
78             if not data:
79                 break
80             yield data
81
82     def decompressed_name(self):
83         return re.sub('\.(bz2|gz)$', '', self.name)
84
85     @_FileLikeObjectBase._before_close
86     def seek(self, pos, whence=os.SEEK_SET):
87         if whence == os.SEEK_CUR:
88             pos += self._filepos
89         elif whence == os.SEEK_END:
90             pos += self.size()
91         self._filepos = min(max(pos, 0L), self.size())
92
93     def tell(self):
94         return self._filepos
95
96     @_FileLikeObjectBase._before_close
97     @retry_method
98     def readall(self, size=2**20, num_retries=None):
99         while True:
100             data = self.read(size, num_retries=num_retries)
101             if data == '':
102                 break
103             yield data
104
105     @_FileLikeObjectBase._before_close
106     @retry_method
107     def readline(self, size=float('inf'), num_retries=None):
108         cache_pos, cache_data = self._readline_cache
109         if self.tell() == cache_pos:
110             data = [cache_data]
111             self._filepos += len(cache_data)
112         else:
113             data = ['']
114         data_size = len(data[-1])
115         while (data_size < size) and ('\n' not in data[-1]):
116             next_read = self.read(2 ** 20, num_retries=num_retries)
117             if not next_read:
118                 break
119             data.append(next_read)
120             data_size += len(next_read)
121         data = ''.join(data)
122         try:
123             nextline_index = data.index('\n') + 1
124         except ValueError:
125             nextline_index = len(data)
126         nextline_index = min(nextline_index, size)
127         self._filepos -= len(data) - nextline_index
128         self._readline_cache = (self.tell(), data[nextline_index:])
129         return data[:nextline_index]
130
131     @_FileLikeObjectBase._before_close
132     @retry_method
133     def decompress(self, decompress, size, num_retries=None):
134         for segment in self.readall(size, num_retries=num_retries):
135             data = decompress(segment)
136             if data:
137                 yield data
138
139     @_FileLikeObjectBase._before_close
140     @retry_method
141     def readall_decompressed(self, size=2**20, num_retries=None):
142         self.seek(0)
143         if self.name.endswith('.bz2'):
144             dc = bz2.BZ2Decompressor()
145             return self.decompress(dc.decompress, size,
146                                    num_retries=num_retries)
147         elif self.name.endswith('.gz'):
148             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
149             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
150                                    size, num_retries=num_retries)
151         else:
152             return self.readall(size, num_retries=num_retries)
153
154     @_FileLikeObjectBase._before_close
155     @retry_method
156     def readlines(self, sizehint=float('inf'), num_retries=None):
157         data = []
158         data_size = 0
159         for s in self.readall(num_retries=num_retries):
160             data.append(s)
161             data_size += len(s)
162             if data_size >= sizehint:
163                 break
164         return ''.join(data).splitlines(True)
165
166     def size(self):
167         raise NotImplementedError()
168
169     def read(self, size, num_retries=None):
170         raise NotImplementedError()
171
172     def readfrom(self, start, size, num_retries=None):
173         raise NotImplementedError()
174
175
176 class StreamFileReader(ArvadosFileReaderBase):
177     class _NameAttribute(str):
178         # The Python file API provides a plain .name attribute.
179         # Older SDK provided a name() method.
180         # This class provides both, for maximum compatibility.
181         def __call__(self):
182             return self
183
184     def __init__(self, stream, segments, name):
185         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
186         self._stream = stream
187         self.segments = segments
188
189     def stream_name(self):
190         return self._stream.name()
191
192     def size(self):
193         n = self.segments[-1]
194         return n.range_start + n.range_size
195
196     @_FileLikeObjectBase._before_close
197     @retry_method
198     def read(self, size, num_retries=None):
199         """Read up to 'size' bytes from the stream, starting at the current file position"""
200         if size == 0:
201             return ''
202
203         data = ''
204         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
205         if available_chunks:
206             lr = available_chunks[0]
207             data = self._stream.readfrom(lr.locator+lr.segment_offset,
208                                           lr.segment_size,
209                                           num_retries=num_retries)
210
211         self._filepos += len(data)
212         return data
213
214     @_FileLikeObjectBase._before_close
215     @retry_method
216     def readfrom(self, start, size, num_retries=None):
217         """Read up to 'size' bytes from the stream, starting at 'start'"""
218         if size == 0:
219             return ''
220
221         data = []
222         for lr in locators_and_ranges(self.segments, start, size):
223             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
224                                               num_retries=num_retries))
225         return ''.join(data)
226
227     def as_manifest(self):
228         segs = []
229         for r in self.segments:
230             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
231         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
232
233
234 def synchronized(orig_func):
235     @functools.wraps(orig_func)
236     def synchronized_wrapper(self, *args, **kwargs):
237         with self.lock:
238             return orig_func(self, *args, **kwargs)
239     return synchronized_wrapper
240
241
242 class StateChangeError(Exception):
243     def __init__(self, message, state, nextstate):
244         super(StateChangeError, self).__init__(message)
245         self.state = state
246         self.nextstate = nextstate
247
248 class _BufferBlock(object):
249     """A stand-in for a Keep block that is in the process of being written.
250
251     Writers can append to it, get the size, and compute the Keep locator.
252     There are three valid states:
253
254     WRITABLE
255       Can append to block.
256
257     PENDING
258       Block is in the process of being uploaded to Keep, append is an error.
259
260     COMMITTED
261       The block has been written to Keep, its internal buffer has been
262       released, fetching the block will fetch it via keep client (since we
263       discarded the internal copy), and identifiers referring to the BufferBlock
264       can be replaced with the block locator.
265
266     """
267
268     WRITABLE = 0
269     PENDING = 1
270     COMMITTED = 2
271     ERROR = 3
272
273     def __init__(self, blockid, starting_capacity, owner):
274         """
275         :blockid:
276           the identifier for this block
277
278         :starting_capacity:
279           the initial buffer capacity
280
281         :owner:
282           ArvadosFile that owns this block
283
284         """
285         self.blockid = blockid
286         self.buffer_block = bytearray(starting_capacity)
287         self.buffer_view = memoryview(self.buffer_block)
288         self.write_pointer = 0
289         self._state = _BufferBlock.WRITABLE
290         self._locator = None
291         self.owner = owner
292         self.lock = threading.Lock()
293         self.wait_for_commit = threading.Event()
294         self.error = None
295
296     @synchronized
297     def append(self, data):
298         """Append some data to the buffer.
299
300         Only valid if the block is in WRITABLE state.  Implements an expanding
301         buffer, doubling capacity as needed to accomdate all the data.
302
303         """
304         if self._state == _BufferBlock.WRITABLE:
305             while (self.write_pointer+len(data)) > len(self.buffer_block):
306                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
307                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
308                 self.buffer_block = new_buffer_block
309                 self.buffer_view = memoryview(self.buffer_block)
310             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
311             self.write_pointer += len(data)
312             self._locator = None
313         else:
314             raise AssertionError("Buffer block is not writable")
315
316     STATE_TRANSITIONS = frozenset([
317             (WRITABLE, PENDING),
318             (PENDING, COMMITTED),
319             (PENDING, ERROR),
320             (ERROR, PENDING)])
321
322     @synchronized
323     def set_state(self, nextstate, val=None):
324         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
325             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
326         self._state = nextstate
327
328         if self._state == _BufferBlock.PENDING:
329             self.wait_for_commit.clear()
330
331         if self._state == _BufferBlock.COMMITTED:
332             self._locator = val
333             self.buffer_view = None
334             self.buffer_block = None
335             self.wait_for_commit.set()
336
337         if self._state == _BufferBlock.ERROR:
338             self.error = val
339             self.wait_for_commit.set()
340
341     @synchronized
342     def state(self):
343         return self._state
344
345     def size(self):
346         """The amount of data written to the buffer."""
347         return self.write_pointer
348
349     @synchronized
350     def locator(self):
351         """The Keep locator for this buffer's contents."""
352         if self._locator is None:
353             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
354         return self._locator
355
356     @synchronized
357     def clone(self, new_blockid, owner):
358         if self._state == _BufferBlock.COMMITTED:
359             raise AssertionError("Cannot duplicate committed buffer block")
360         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
361         bufferblock.append(self.buffer_view[0:self.size()])
362         return bufferblock
363
364     @synchronized
365     def clear(self):
366         self.owner = None
367         self.buffer_block = None
368         self.buffer_view = None
369
370
371 class NoopLock(object):
372     def __enter__(self):
373         return self
374
375     def __exit__(self, exc_type, exc_value, traceback):
376         pass
377
378     def acquire(self, blocking=False):
379         pass
380
381     def release(self):
382         pass
383
384
385 def must_be_writable(orig_func):
386     @functools.wraps(orig_func)
387     def must_be_writable_wrapper(self, *args, **kwargs):
388         if not self.writable():
389             raise IOError(errno.EROFS, "Collection is read-only.")
390         return orig_func(self, *args, **kwargs)
391     return must_be_writable_wrapper
392
393
394 class _BlockManager(object):
395     """BlockManager handles buffer blocks.
396
397     Also handles background block uploads, and background block prefetch for a
398     Collection of ArvadosFiles.
399
400     """
401
402     DEFAULT_PUT_THREADS = 2
403     DEFAULT_GET_THREADS = 2
404
405     def __init__(self, keep):
406         """keep: KeepClient object to use"""
407         self._keep = keep
408         self._bufferblocks = {}
409         self._put_queue = None
410         self._put_threads = None
411         self._prefetch_queue = None
412         self._prefetch_threads = None
413         self.lock = threading.Lock()
414         self.prefetch_enabled = True
415         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
416         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
417
418     @synchronized
419     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
420         """Allocate a new, empty bufferblock in WRITABLE state and return it.
421
422         :blockid:
423           optional block identifier, otherwise one will be automatically assigned
424
425         :starting_capacity:
426           optional capacity, otherwise will use default capacity
427
428         :owner:
429           ArvadosFile that owns this block
430
431         """
432         if blockid is None:
433             blockid = "bufferblock%i" % len(self._bufferblocks)
434         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
435         self._bufferblocks[bufferblock.blockid] = bufferblock
436         return bufferblock
437
438     @synchronized
439     def dup_block(self, block, owner):
440         """Create a new bufferblock initialized with the content of an existing bufferblock.
441
442         :block:
443           the buffer block to copy.
444
445         :owner:
446           ArvadosFile that owns the new block
447
448         """
449         new_blockid = "bufferblock%i" % len(self._bufferblocks)
450         bufferblock = block.clone(new_blockid, owner)
451         self._bufferblocks[bufferblock.blockid] = bufferblock
452         return bufferblock
453
454     @synchronized
455     def is_bufferblock(self, locator):
456         return locator in self._bufferblocks
457
458     def _commit_bufferblock_worker(self):
459         """Background uploader thread."""
460
461         while True:
462             try:
463                 bufferblock = self._put_queue.get()
464                 if bufferblock is None:
465                     return
466
467                 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
468                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
469
470             except Exception as e:
471                 bufferblock.set_state(_BufferBlock.ERROR, e)
472             finally:
473                 if self._put_queue is not None:
474                     self._put_queue.task_done()
475
476     @synchronized
477     def start_put_threads(self):
478         if self._put_threads is None:
479             # Start uploader threads.
480
481             # If we don't limit the Queue size, the upload queue can quickly
482             # grow to take up gigabytes of RAM if the writing process is
483             # generating data more quickly than it can be send to the Keep
484             # servers.
485             #
486             # With two upload threads and a queue size of 2, this means up to 4
487             # blocks pending.  If they are full 64 MiB blocks, that means up to
488             # 256 MiB of internal buffering, which is the same size as the
489             # default download block cache in KeepClient.
490             self._put_queue = Queue.Queue(maxsize=2)
491
492             self._put_threads = []
493             for i in xrange(0, self.num_put_threads):
494                 thread = threading.Thread(target=self._commit_bufferblock_worker)
495                 self._put_threads.append(thread)
496                 thread.daemon = True
497                 thread.start()
498
499     def _block_prefetch_worker(self):
500         """The background downloader thread."""
501         while True:
502             try:
503                 b = self._prefetch_queue.get()
504                 if b is None:
505                     return
506                 self._keep.get(b)
507             except Exception:
508                 pass
509
510     @synchronized
511     def start_get_threads(self):
512         if self._prefetch_threads is None:
513             self._prefetch_queue = Queue.Queue()
514             self._prefetch_threads = []
515             for i in xrange(0, self.num_get_threads):
516                 thread = threading.Thread(target=self._block_prefetch_worker)
517                 self._prefetch_threads.append(thread)
518                 thread.daemon = True
519                 thread.start()
520
521
522     @synchronized
523     def stop_threads(self):
524         """Shut down and wait for background upload and download threads to finish."""
525
526         if self._put_threads is not None:
527             for t in self._put_threads:
528                 self._put_queue.put(None)
529             for t in self._put_threads:
530                 t.join()
531         self._put_threads = None
532         self._put_queue = None
533
534         if self._prefetch_threads is not None:
535             for t in self._prefetch_threads:
536                 self._prefetch_queue.put(None)
537             for t in self._prefetch_threads:
538                 t.join()
539         self._prefetch_threads = None
540         self._prefetch_queue = None
541
542     def __enter__(self):
543         return self
544
545     def __exit__(self, exc_type, exc_value, traceback):
546         self.stop_threads()
547
548     def commit_bufferblock(self, block, sync):
549         """Initiate a background upload of a bufferblock.
550
551         :block:
552           The block object to upload
553
554         :sync:
555           If `sync` is True, upload the block synchronously.
556           If `sync` is False, upload the block asynchronously.  This will
557           return immediately unless the upload queue is at capacity, in
558           which case it will wait on an upload queue slot.
559
560         """
561
562         try:
563             # Mark the block as PENDING so to disallow any more appends.
564             block.set_state(_BufferBlock.PENDING)
565         except StateChangeError as e:
566             if e.state == _BufferBlock.PENDING:
567                 if sync:
568                     block.wait_for_commit.wait()
569                 else:
570                     return
571             if block.state() == _BufferBlock.COMMITTED:
572                 return
573             elif block.state() == _BufferBlock.ERROR:
574                 raise block.error
575             else:
576                 raise
577
578         if sync:
579             try:
580                 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
581                 block.set_state(_BufferBlock.COMMITTED, loc)
582             except Exception as e:
583                 block.set_state(_BufferBlock.ERROR, e)
584                 raise
585         else:
586             self.start_put_threads()
587             self._put_queue.put(block)
588
589     @synchronized
590     def get_bufferblock(self, locator):
591         return self._bufferblocks.get(locator)
592
593     @synchronized
594     def delete_bufferblock(self, locator):
595         bb = self._bufferblocks[locator]
596         bb.clear()
597         del self._bufferblocks[locator]
598
599     def get_block_contents(self, locator, num_retries, cache_only=False):
600         """Fetch a block.
601
602         First checks to see if the locator is a BufferBlock and return that, if
603         not, passes the request through to KeepClient.get().
604
605         """
606         with self.lock:
607             if locator in self._bufferblocks:
608                 bufferblock = self._bufferblocks[locator]
609                 if bufferblock.state() != _BufferBlock.COMMITTED:
610                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
611                 else:
612                     locator = bufferblock._locator
613         if cache_only:
614             return self._keep.get_from_cache(locator)
615         else:
616             return self._keep.get(locator, num_retries=num_retries)
617
618     def commit_all(self):
619         """Commit all outstanding buffer blocks.
620
621         This is a synchronous call, and will not return until all buffer blocks
622         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
623
624         """
625         with self.lock:
626             items = self._bufferblocks.items()
627
628         for k,v in items:
629             if v.state() != _BufferBlock.COMMITTED:
630                 v.owner.flush(sync=False)
631
632         with self.lock:
633             if self._put_queue is not None:
634                 self._put_queue.join()
635
636                 err = []
637                 for k,v in items:
638                     if v.state() == _BufferBlock.ERROR:
639                         err.append((v.locator(), v.error))
640                 if err:
641                     raise KeepWriteError("Error writing some blocks", err, label="block")
642
643         for k,v in items:
644             # flush again with sync=True to remove committed bufferblocks from
645             # the segments.
646             if v.owner:
647                 v.owner.flush(sync=True)
648
649     def block_prefetch(self, locator):
650         """Initiate a background download of a block.
651
652         This assumes that the underlying KeepClient implements a block cache,
653         so repeated requests for the same block will not result in repeated
654         downloads (unless the block is evicted from the cache.)  This method
655         does not block.
656
657         """
658
659         if not self.prefetch_enabled:
660             return
661
662         if self._keep.get_from_cache(locator) is not None:
663             return
664
665         with self.lock:
666             if locator in self._bufferblocks:
667                 return
668
669         self.start_get_threads()
670         self._prefetch_queue.put(locator)
671
672
673 class ArvadosFile(object):
674     """Represent a file in a Collection.
675
676     ArvadosFile manages the underlying representation of a file in Keep as a
677     sequence of segments spanning a set of blocks, and implements random
678     read/write access.
679
680     This object may be accessed from multiple threads.
681
682     """
683
684     def __init__(self, parent, name, stream=[], segments=[]):
685         """
686         ArvadosFile constructor.
687
688         :stream:
689           a list of Range objects representing a block stream
690
691         :segments:
692           a list of Range objects representing segments
693         """
694         self.parent = parent
695         self.name = name
696         self._committed = False
697         self._segments = []
698         self.lock = parent.root_collection().lock
699         for s in segments:
700             self._add_segment(stream, s.locator, s.range_size)
701         self._current_bblock = None
702
703     def writable(self):
704         return self.parent.writable()
705
706     @synchronized
707     def segments(self):
708         return copy.copy(self._segments)
709
710     @synchronized
711     def clone(self, new_parent, new_name):
712         """Make a copy of this file."""
713         cp = ArvadosFile(new_parent, new_name)
714         cp.replace_contents(self)
715         return cp
716
717     @must_be_writable
718     @synchronized
719     def replace_contents(self, other):
720         """Replace segments of this file with segments from another `ArvadosFile` object."""
721
722         map_loc = {}
723         self._segments = []
724         for other_segment in other.segments():
725             new_loc = other_segment.locator
726             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
727                 if other_segment.locator not in map_loc:
728                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
729                     if bufferblock.state() != _BufferBlock.WRITABLE:
730                         map_loc[other_segment.locator] = bufferblock.locator()
731                     else:
732                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
733                 new_loc = map_loc[other_segment.locator]
734
735             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
736
737         self._committed = False
738
739     def __eq__(self, other):
740         if other is self:
741             return True
742         if not isinstance(other, ArvadosFile):
743             return False
744
745         othersegs = other.segments()
746         with self.lock:
747             if len(self._segments) != len(othersegs):
748                 return False
749             for i in xrange(0, len(othersegs)):
750                 seg1 = self._segments[i]
751                 seg2 = othersegs[i]
752                 loc1 = seg1.locator
753                 loc2 = seg2.locator
754
755                 if self.parent._my_block_manager().is_bufferblock(loc1):
756                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
757
758                 if other.parent._my_block_manager().is_bufferblock(loc2):
759                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
760
761                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
762                     seg1.range_start != seg2.range_start or
763                     seg1.range_size != seg2.range_size or
764                     seg1.segment_offset != seg2.segment_offset):
765                     return False
766
767         return True
768
769     def __ne__(self, other):
770         return not self.__eq__(other)
771
772     @synchronized
773     def set_committed(self):
774         """Set committed flag to False"""
775         self._committed = True
776
777     @synchronized
778     def committed(self):
779         """Get whether this is committed or not."""
780         return self._committed
781
782     @must_be_writable
783     @synchronized
784     def truncate(self, size):
785         """Shrink the size of the file.
786
787         If `size` is less than the size of the file, the file contents after
788         `size` will be discarded.  If `size` is greater than the current size
789         of the file, an IOError will be raised.
790
791         """
792         if size < self.size():
793             new_segs = []
794             for r in self._segments:
795                 range_end = r.range_start+r.range_size
796                 if r.range_start >= size:
797                     # segment is past the trucate size, all done
798                     break
799                 elif size < range_end:
800                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
801                     nr.segment_offset = r.segment_offset
802                     new_segs.append(nr)
803                     break
804                 else:
805                     new_segs.append(r)
806
807             self._segments = new_segs
808             self._committed = False
809         elif size > self.size():
810             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
811
812     def readfrom(self, offset, size, num_retries, exact=False):
813         """Read up to `size` bytes from the file starting at `offset`.
814
815         :exact:
816          If False (default), return less data than requested if the read
817          crosses a block boundary and the next block isn't cached.  If True,
818          only return less data than requested when hitting EOF.
819         """
820
821         with self.lock:
822             if size == 0 or offset >= self.size():
823                 return ''
824             readsegs = locators_and_ranges(self._segments, offset, size)
825             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
826
827         locs = set()
828         data = []
829         for lr in readsegs:
830             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
831             if block:
832                 blockview = memoryview(block)
833                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
834                 locs.add(lr.locator)
835             else:
836                 break
837
838         for lr in prefetch:
839             if lr.locator not in locs:
840                 self.parent._my_block_manager().block_prefetch(lr.locator)
841                 locs.add(lr.locator)
842
843         return ''.join(data)
844
845     def _repack_writes(self, num_retries):
846         """Test if the buffer block has more data than actual segments.
847
848         This happens when a buffered write over-writes a file range written in
849         a previous buffered write.  Re-pack the buffer block for efficiency
850         and to avoid leaking information.
851
852         """
853         segs = self._segments
854
855         # Sum up the segments to get the total bytes of the file referencing
856         # into the buffer block.
857         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
858         write_total = sum([s.range_size for s in bufferblock_segs])
859
860         if write_total < self._current_bblock.size():
861             # There is more data in the buffer block than is actually accounted for by segments, so
862             # re-pack into a new buffer by copying over to a new buffer block.
863             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
864             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
865             for t in bufferblock_segs:
866                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
867                 t.segment_offset = new_bb.size() - t.range_size
868
869             self._current_bblock = new_bb
870
871     @must_be_writable
872     @synchronized
873     def writeto(self, offset, data, num_retries):
874         """Write `data` to the file starting at `offset`.
875
876         This will update existing bytes and/or extend the size of the file as
877         necessary.
878
879         """
880         if len(data) == 0:
881             return
882
883         if offset > self.size():
884             raise ArgumentError("Offset is past the end of the file")
885
886         if len(data) > config.KEEP_BLOCK_SIZE:
887             # Chunk it up into smaller writes
888             n = 0
889             dataview = memoryview(data)
890             while n < len(data):
891                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
892                 n += config.KEEP_BLOCK_SIZE
893             return
894
895         self._committed = False
896
897         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
898             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
899
900         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
901             self._repack_writes(num_retries)
902             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
903                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
904                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
905
906         self._current_bblock.append(data)
907
908         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
909
910         self.parent.notify(WRITE, self.parent, self.name, (self, self))
911
912         return len(data)
913
914     @synchronized
915     def flush(self, sync=True, num_retries=0):
916         """Flush the current bufferblock to Keep.
917
918         :sync:
919           If True, commit block synchronously, wait until buffer block has been written.
920           If False, commit block asynchronously, return immediately after putting block into
921           the keep put queue.
922         """
923         if self.committed():
924             return
925
926         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
927             if self._current_bblock.state() == _BufferBlock.WRITABLE:
928                 self._repack_writes(num_retries)
929             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
930
931         if sync:
932             to_delete = set()
933             for s in self._segments:
934                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
935                 if bb:
936                     if bb.state() != _BufferBlock.COMMITTED:
937                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
938                     to_delete.add(s.locator)
939                     s.locator = bb.locator()
940             for s in to_delete:
941                self.parent._my_block_manager().delete_bufferblock(s)
942
943         self.parent.notify(MOD, self.parent, self.name, (self, self))
944
945     @must_be_writable
946     @synchronized
947     def add_segment(self, blocks, pos, size):
948         """Add a segment to the end of the file.
949
950         `pos` and `offset` reference a section of the stream described by
951         `blocks` (a list of Range objects)
952
953         """
954         self._add_segment(blocks, pos, size)
955
956     def _add_segment(self, blocks, pos, size):
957         """Internal implementation of add_segment."""
958         self._committed = False
959         for lr in locators_and_ranges(blocks, pos, size):
960             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
961             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
962             self._segments.append(r)
963
964     @synchronized
965     def size(self):
966         """Get the file size."""
967         if self._segments:
968             n = self._segments[-1]
969             return n.range_start + n.range_size
970         else:
971             return 0
972
973     @synchronized
974     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
975         buf = ""
976         filestream = []
977         for segment in self.segments:
978             loc = segment.locator
979             if loc.startswith("bufferblock"):
980                 loc = self._bufferblocks[loc].calculate_locator()
981             if portable_locators:
982                 loc = KeepLocator(loc).stripped()
983             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
984                                  segment.segment_offset, segment.range_size))
985         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
986         buf += "\n"
987         return buf
988
989     @must_be_writable
990     @synchronized
991     def _reparent(self, newparent, newname):
992         self._committed = False
993         self.flush(sync=True)
994         self.parent.remove(self.name)
995         self.parent = newparent
996         self.name = newname
997         self.lock = self.parent.root_collection().lock
998
999
1000 class ArvadosFileReader(ArvadosFileReaderBase):
1001     """Wraps ArvadosFile in a file-like object supporting reading only.
1002
1003     Be aware that this class is NOT thread safe as there is no locking around
1004     updating file pointer.
1005
1006     """
1007
1008     def __init__(self, arvadosfile, num_retries=None):
1009         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1010         self.arvadosfile = arvadosfile
1011
1012     def size(self):
1013         return self.arvadosfile.size()
1014
1015     def stream_name(self):
1016         return self.arvadosfile.parent.stream_name()
1017
1018     @_FileLikeObjectBase._before_close
1019     @retry_method
1020     def read(self, size=None, num_retries=None):
1021         """Read up to `size` bytes from the file and return the result.
1022
1023         Starts at the current file position.  If `size` is None, read the
1024         entire remainder of the file.
1025         """
1026         if size is None:
1027             data = []
1028             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1029             while rd:
1030                 data.append(rd)
1031                 self._filepos += len(rd)
1032                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1033             return ''.join(data)
1034         else:
1035             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1036             self._filepos += len(data)
1037             return data
1038
1039     @_FileLikeObjectBase._before_close
1040     @retry_method
1041     def readfrom(self, offset, size, num_retries=None):
1042         """Read up to `size` bytes from the stream, starting at the specified file offset.
1043
1044         This method does not change the file position.
1045         """
1046         return self.arvadosfile.readfrom(offset, size, num_retries)
1047
1048     def flush(self):
1049         pass
1050
1051
1052 class ArvadosFileWriter(ArvadosFileReader):
1053     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1054
1055     Be aware that this class is NOT thread safe as there is no locking around
1056     updating file pointer.
1057
1058     """
1059
1060     def __init__(self, arvadosfile, mode, num_retries=None):
1061         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1062         self.mode = mode
1063
1064     @_FileLikeObjectBase._before_close
1065     @retry_method
1066     def write(self, data, num_retries=None):
1067         if self.mode[0] == "a":
1068             self.arvadosfile.writeto(self.size(), data, num_retries)
1069         else:
1070             self.arvadosfile.writeto(self._filepos, data, num_retries)
1071             self._filepos += len(data)
1072         return len(data)
1073
1074     @_FileLikeObjectBase._before_close
1075     @retry_method
1076     def writelines(self, seq, num_retries=None):
1077         for s in seq:
1078             self.write(s, num_retries=num_retries)
1079
1080     @_FileLikeObjectBase._before_close
1081     def truncate(self, size=None):
1082         if size is None:
1083             size = self._filepos
1084         self.arvadosfile.truncate(size)
1085         if self._filepos > self.size():
1086             self._filepos = self.size()
1087
1088     @_FileLikeObjectBase._before_close
1089     def flush(self):
1090         self.arvadosfile.flush()
1091
1092     def close(self):
1093         if not self.closed:
1094             self.flush()
1095             super(ArvadosFileWriter, self).close()