18941: Separate get() behavior for prefetch
[arvados.git] / sdk / python / arvados / arvfile.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
12 import bz2
13 import collections
14 import copy
15 import errno
16 import functools
17 import hashlib
18 import logging
19 import os
20 import queue
21 import re
22 import sys
23 import threading
24 import uuid
25 import zlib
26
27 from . import config
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
33
34 MOD = "mod"
35 WRITE = "write"
36
37 _logger = logging.getLogger('arvados.arvfile')
38
39 def split(path):
40     """split(path) -> streamname, filename
41
42     Separate the stream name and file name in a /-separated stream path and
43     return a tuple (stream_name, file_name).  If no stream name is available,
44     assume '.'.
45
46     """
47     try:
48         stream_name, file_name = path.rsplit('/', 1)
49     except ValueError:  # No / in string
50         stream_name, file_name = '.', path
51     return stream_name, file_name
52
53
54 class UnownedBlockError(Exception):
55     """Raised when there's an writable block without an owner on the BlockManager."""
56     pass
57
58
59 class _FileLikeObjectBase(object):
60     def __init__(self, name, mode):
61         self.name = name
62         self.mode = mode
63         self.closed = False
64
65     @staticmethod
66     def _before_close(orig_func):
67         @functools.wraps(orig_func)
68         def before_close_wrapper(self, *args, **kwargs):
69             if self.closed:
70                 raise ValueError("I/O operation on closed stream file")
71             return orig_func(self, *args, **kwargs)
72         return before_close_wrapper
73
74     def __enter__(self):
75         return self
76
77     def __exit__(self, exc_type, exc_value, traceback):
78         try:
79             self.close()
80         except Exception:
81             if exc_type is None:
82                 raise
83
84     def close(self):
85         self.closed = True
86
87
88 class ArvadosFileReaderBase(_FileLikeObjectBase):
89     def __init__(self, name, mode, num_retries=None):
90         super(ArvadosFileReaderBase, self).__init__(name, mode)
91         self._filepos = 0
92         self.num_retries = num_retries
93         self._readline_cache = (None, None)
94
95     def __iter__(self):
96         while True:
97             data = self.readline()
98             if not data:
99                 break
100             yield data
101
102     def decompressed_name(self):
103         return re.sub('\.(bz2|gz)$', '', self.name)
104
105     @_FileLikeObjectBase._before_close
106     def seek(self, pos, whence=os.SEEK_SET):
107         if whence == os.SEEK_CUR:
108             pos += self._filepos
109         elif whence == os.SEEK_END:
110             pos += self.size()
111         if pos < 0:
112             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
113         self._filepos = pos
114         return self._filepos
115
116     def tell(self):
117         return self._filepos
118
119     def readable(self):
120         return True
121
122     def writable(self):
123         return False
124
125     def seekable(self):
126         return True
127
128     @_FileLikeObjectBase._before_close
129     @retry_method
130     def readall(self, size=2**20, num_retries=None):
131         while True:
132             data = self.read(size, num_retries=num_retries)
133             if len(data) == 0:
134                 break
135             yield data
136
137     @_FileLikeObjectBase._before_close
138     @retry_method
139     def readline(self, size=float('inf'), num_retries=None):
140         cache_pos, cache_data = self._readline_cache
141         if self.tell() == cache_pos:
142             data = [cache_data]
143             self._filepos += len(cache_data)
144         else:
145             data = [b'']
146         data_size = len(data[-1])
147         while (data_size < size) and (b'\n' not in data[-1]):
148             next_read = self.read(2 ** 20, num_retries=num_retries)
149             if not next_read:
150                 break
151             data.append(next_read)
152             data_size += len(next_read)
153         data = b''.join(data)
154         try:
155             nextline_index = data.index(b'\n') + 1
156         except ValueError:
157             nextline_index = len(data)
158         nextline_index = min(nextline_index, size)
159         self._filepos -= len(data) - nextline_index
160         self._readline_cache = (self.tell(), data[nextline_index:])
161         return data[:nextline_index].decode()
162
163     @_FileLikeObjectBase._before_close
164     @retry_method
165     def decompress(self, decompress, size, num_retries=None):
166         for segment in self.readall(size, num_retries=num_retries):
167             data = decompress(segment)
168             if data:
169                 yield data
170
171     @_FileLikeObjectBase._before_close
172     @retry_method
173     def readall_decompressed(self, size=2**20, num_retries=None):
174         self.seek(0)
175         if self.name.endswith('.bz2'):
176             dc = bz2.BZ2Decompressor()
177             return self.decompress(dc.decompress, size,
178                                    num_retries=num_retries)
179         elif self.name.endswith('.gz'):
180             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
181             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
182                                    size, num_retries=num_retries)
183         else:
184             return self.readall(size, num_retries=num_retries)
185
186     @_FileLikeObjectBase._before_close
187     @retry_method
188     def readlines(self, sizehint=float('inf'), num_retries=None):
189         data = []
190         data_size = 0
191         for s in self.readall(num_retries=num_retries):
192             data.append(s)
193             data_size += len(s)
194             if data_size >= sizehint:
195                 break
196         return b''.join(data).decode().splitlines(True)
197
198     def size(self):
199         raise IOError(errno.ENOSYS, "Not implemented")
200
201     def read(self, size, num_retries=None):
202         raise IOError(errno.ENOSYS, "Not implemented")
203
204     def readfrom(self, start, size, num_retries=None):
205         raise IOError(errno.ENOSYS, "Not implemented")
206
207
208 class StreamFileReader(ArvadosFileReaderBase):
209     class _NameAttribute(str):
210         # The Python file API provides a plain .name attribute.
211         # Older SDK provided a name() method.
212         # This class provides both, for maximum compatibility.
213         def __call__(self):
214             return self
215
216     def __init__(self, stream, segments, name):
217         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
218         self._stream = stream
219         self.segments = segments
220
221     def stream_name(self):
222         return self._stream.name()
223
224     def size(self):
225         n = self.segments[-1]
226         return n.range_start + n.range_size
227
228     @_FileLikeObjectBase._before_close
229     @retry_method
230     def read(self, size, num_retries=None):
231         """Read up to 'size' bytes from the stream, starting at the current file position"""
232         if size == 0:
233             return b''
234
235         data = b''
236         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
237         if available_chunks:
238             lr = available_chunks[0]
239             data = self._stream.readfrom(lr.locator+lr.segment_offset,
240                                          lr.segment_size,
241                                          num_retries=num_retries)
242
243         self._filepos += len(data)
244         return data
245
246     @_FileLikeObjectBase._before_close
247     @retry_method
248     def readfrom(self, start, size, num_retries=None):
249         """Read up to 'size' bytes from the stream, starting at 'start'"""
250         if size == 0:
251             return b''
252
253         data = []
254         for lr in locators_and_ranges(self.segments, start, size):
255             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
256                                               num_retries=num_retries))
257         return b''.join(data)
258
259     def as_manifest(self):
260         segs = []
261         for r in self.segments:
262             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
263         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
264
265
266 def synchronized(orig_func):
267     @functools.wraps(orig_func)
268     def synchronized_wrapper(self, *args, **kwargs):
269         with self.lock:
270             return orig_func(self, *args, **kwargs)
271     return synchronized_wrapper
272
273
274 class StateChangeError(Exception):
275     def __init__(self, message, state, nextstate):
276         super(StateChangeError, self).__init__(message)
277         self.state = state
278         self.nextstate = nextstate
279
280 class _BufferBlock(object):
281     """A stand-in for a Keep block that is in the process of being written.
282
283     Writers can append to it, get the size, and compute the Keep locator.
284     There are three valid states:
285
286     WRITABLE
287       Can append to block.
288
289     PENDING
290       Block is in the process of being uploaded to Keep, append is an error.
291
292     COMMITTED
293       The block has been written to Keep, its internal buffer has been
294       released, fetching the block will fetch it via keep client (since we
295       discarded the internal copy), and identifiers referring to the BufferBlock
296       can be replaced with the block locator.
297
298     """
299
300     WRITABLE = 0
301     PENDING = 1
302     COMMITTED = 2
303     ERROR = 3
304     DELETED = 4
305
306     def __init__(self, blockid, starting_capacity, owner):
307         """
308         :blockid:
309           the identifier for this block
310
311         :starting_capacity:
312           the initial buffer capacity
313
314         :owner:
315           ArvadosFile that owns this block
316
317         """
318         self.blockid = blockid
319         self.buffer_block = bytearray(starting_capacity)
320         self.buffer_view = memoryview(self.buffer_block)
321         self.write_pointer = 0
322         self._state = _BufferBlock.WRITABLE
323         self._locator = None
324         self.owner = owner
325         self.lock = threading.Lock()
326         self.wait_for_commit = threading.Event()
327         self.error = None
328
329     @synchronized
330     def append(self, data):
331         """Append some data to the buffer.
332
333         Only valid if the block is in WRITABLE state.  Implements an expanding
334         buffer, doubling capacity as needed to accomdate all the data.
335
336         """
337         if self._state == _BufferBlock.WRITABLE:
338             if not isinstance(data, bytes) and not isinstance(data, memoryview):
339                 data = data.encode()
340             while (self.write_pointer+len(data)) > len(self.buffer_block):
341                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
342                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
343                 self.buffer_block = new_buffer_block
344                 self.buffer_view = memoryview(self.buffer_block)
345             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
346             self.write_pointer += len(data)
347             self._locator = None
348         else:
349             raise AssertionError("Buffer block is not writable")
350
351     STATE_TRANSITIONS = frozenset([
352             (WRITABLE, PENDING),
353             (PENDING, COMMITTED),
354             (PENDING, ERROR),
355             (ERROR, PENDING)])
356
357     @synchronized
358     def set_state(self, nextstate, val=None):
359         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
360             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
361         self._state = nextstate
362
363         if self._state == _BufferBlock.PENDING:
364             self.wait_for_commit.clear()
365
366         if self._state == _BufferBlock.COMMITTED:
367             self._locator = val
368             self.buffer_view = None
369             self.buffer_block = None
370             self.wait_for_commit.set()
371
372         if self._state == _BufferBlock.ERROR:
373             self.error = val
374             self.wait_for_commit.set()
375
376     @synchronized
377     def state(self):
378         return self._state
379
380     def size(self):
381         """The amount of data written to the buffer."""
382         return self.write_pointer
383
384     @synchronized
385     def locator(self):
386         """The Keep locator for this buffer's contents."""
387         if self._locator is None:
388             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
389         return self._locator
390
391     @synchronized
392     def clone(self, new_blockid, owner):
393         if self._state == _BufferBlock.COMMITTED:
394             raise AssertionError("Cannot duplicate committed buffer block")
395         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
396         bufferblock.append(self.buffer_view[0:self.size()])
397         return bufferblock
398
399     @synchronized
400     def clear(self):
401         self._state = _BufferBlock.DELETED
402         self.owner = None
403         self.buffer_block = None
404         self.buffer_view = None
405
406     @synchronized
407     def repack_writes(self):
408         """Optimize buffer block by repacking segments in file sequence.
409
410         When the client makes random writes, they appear in the buffer block in
411         the sequence they were written rather than the sequence they appear in
412         the file.  This makes for inefficient, fragmented manifests.  Attempt
413         to optimize by repacking writes in file sequence.
414
415         """
416         if self._state != _BufferBlock.WRITABLE:
417             raise AssertionError("Cannot repack non-writable block")
418
419         segs = self.owner.segments()
420
421         # Collect the segments that reference the buffer block.
422         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
423
424         # Collect total data referenced by segments (could be smaller than
425         # bufferblock size if a portion of the file was written and
426         # then overwritten).
427         write_total = sum([s.range_size for s in bufferblock_segs])
428
429         if write_total < self.size() or len(bufferblock_segs) > 1:
430             # If there's more than one segment referencing this block, it is
431             # due to out-of-order writes and will produce a fragmented
432             # manifest, so try to optimize by re-packing into a new buffer.
433             contents = self.buffer_view[0:self.write_pointer].tobytes()
434             new_bb = _BufferBlock(None, write_total, None)
435             for t in bufferblock_segs:
436                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
437                 t.segment_offset = new_bb.size() - t.range_size
438
439             self.buffer_block = new_bb.buffer_block
440             self.buffer_view = new_bb.buffer_view
441             self.write_pointer = new_bb.write_pointer
442             self._locator = None
443             new_bb.clear()
444             self.owner.set_segments(segs)
445
446     def __repr__(self):
447         return "<BufferBlock %s>" % (self.blockid)
448
449
450 class NoopLock(object):
451     def __enter__(self):
452         return self
453
454     def __exit__(self, exc_type, exc_value, traceback):
455         pass
456
457     def acquire(self, blocking=False):
458         pass
459
460     def release(self):
461         pass
462
463
464 def must_be_writable(orig_func):
465     @functools.wraps(orig_func)
466     def must_be_writable_wrapper(self, *args, **kwargs):
467         if not self.writable():
468             raise IOError(errno.EROFS, "Collection is read-only.")
469         return orig_func(self, *args, **kwargs)
470     return must_be_writable_wrapper
471
472
473 class _BlockManager(object):
474     """BlockManager handles buffer blocks.
475
476     Also handles background block uploads, and background block prefetch for a
477     Collection of ArvadosFiles.
478
479     """
480
481     DEFAULT_PUT_THREADS = 2
482     DEFAULT_GET_THREADS = 2
483
484     def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
485         """keep: KeepClient object to use"""
486         self._keep = keep
487         self._bufferblocks = collections.OrderedDict()
488         self._put_queue = None
489         self._put_threads = None
490         self._prefetch_queue = None
491         self._prefetch_threads = None
492         self.lock = threading.Lock()
493         self.prefetch_enabled = True
494         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
495         self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
496         self.copies = copies
497         self.storage_classes = storage_classes_func or (lambda: [])
498         self._pending_write_size = 0
499         self.threads_lock = threading.Lock()
500         self.padding_block = None
501         self.num_retries = num_retries
502
503     @synchronized
504     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
505         """Allocate a new, empty bufferblock in WRITABLE state and return it.
506
507         :blockid:
508           optional block identifier, otherwise one will be automatically assigned
509
510         :starting_capacity:
511           optional capacity, otherwise will use default capacity
512
513         :owner:
514           ArvadosFile that owns this block
515
516         """
517         return self._alloc_bufferblock(blockid, starting_capacity, owner)
518
519     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
520         if blockid is None:
521             blockid = str(uuid.uuid4())
522         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
523         self._bufferblocks[bufferblock.blockid] = bufferblock
524         return bufferblock
525
526     @synchronized
527     def dup_block(self, block, owner):
528         """Create a new bufferblock initialized with the content of an existing bufferblock.
529
530         :block:
531           the buffer block to copy.
532
533         :owner:
534           ArvadosFile that owns the new block
535
536         """
537         new_blockid = str(uuid.uuid4())
538         bufferblock = block.clone(new_blockid, owner)
539         self._bufferblocks[bufferblock.blockid] = bufferblock
540         return bufferblock
541
542     @synchronized
543     def is_bufferblock(self, locator):
544         return locator in self._bufferblocks
545
546     def _commit_bufferblock_worker(self):
547         """Background uploader thread."""
548
549         while True:
550             try:
551                 bufferblock = self._put_queue.get()
552                 if bufferblock is None:
553                     return
554
555                 if self.copies is None:
556                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
557                 else:
558                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
559                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
560             except Exception as e:
561                 bufferblock.set_state(_BufferBlock.ERROR, e)
562             finally:
563                 if self._put_queue is not None:
564                     self._put_queue.task_done()
565
566     def start_put_threads(self):
567         with self.threads_lock:
568             if self._put_threads is None:
569                 # Start uploader threads.
570
571                 # If we don't limit the Queue size, the upload queue can quickly
572                 # grow to take up gigabytes of RAM if the writing process is
573                 # generating data more quickly than it can be sent to the Keep
574                 # servers.
575                 #
576                 # With two upload threads and a queue size of 2, this means up to 4
577                 # blocks pending.  If they are full 64 MiB blocks, that means up to
578                 # 256 MiB of internal buffering, which is the same size as the
579                 # default download block cache in KeepClient.
580                 self._put_queue = queue.Queue(maxsize=2)
581
582                 self._put_threads = []
583                 for i in range(0, self.num_put_threads):
584                     thread = threading.Thread(target=self._commit_bufferblock_worker)
585                     self._put_threads.append(thread)
586                     thread.daemon = True
587                     thread.start()
588
589     def _block_prefetch_worker(self):
590         """The background downloader thread."""
591         while True:
592             try:
593                 b = self._prefetch_queue.get()
594                 if b is None:
595                     return
596                 self._keep.get(b, cache_slot_get=False)
597             except Exception:
598                 _logger.exception("Exception doing block prefetch")
599
600     @synchronized
601     def start_get_threads(self):
602         if self._prefetch_threads is None:
603             self._prefetch_queue = queue.Queue()
604             self._prefetch_threads = []
605             for i in range(0, self.num_get_threads):
606                 thread = threading.Thread(target=self._block_prefetch_worker)
607                 self._prefetch_threads.append(thread)
608                 thread.daemon = True
609                 thread.start()
610
611
612     @synchronized
613     def stop_threads(self):
614         """Shut down and wait for background upload and download threads to finish."""
615
616         if self._put_threads is not None:
617             for t in self._put_threads:
618                 self._put_queue.put(None)
619             for t in self._put_threads:
620                 t.join()
621         self._put_threads = None
622         self._put_queue = None
623
624         if self._prefetch_threads is not None:
625             for t in self._prefetch_threads:
626                 self._prefetch_queue.put(None)
627             for t in self._prefetch_threads:
628                 t.join()
629         self._prefetch_threads = None
630         self._prefetch_queue = None
631
632     def __enter__(self):
633         return self
634
635     def __exit__(self, exc_type, exc_value, traceback):
636         self.stop_threads()
637
638     @synchronized
639     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
640         """Packs small blocks together before uploading"""
641
642         self._pending_write_size += closed_file_size
643
644         # Check if there are enough small blocks for filling up one in full
645         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
646             return
647
648         # Search blocks ready for getting packed together before being
649         # committed to Keep.
650         # A WRITABLE block always has an owner.
651         # A WRITABLE block with its owner.closed() implies that its
652         # size is <= KEEP_BLOCK_SIZE/2.
653         try:
654             small_blocks = [b for b in listvalues(self._bufferblocks)
655                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
656         except AttributeError:
657             # Writable blocks without owner shouldn't exist.
658             raise UnownedBlockError()
659
660         if len(small_blocks) <= 1:
661             # Not enough small blocks for repacking
662             return
663
664         for bb in small_blocks:
665             bb.repack_writes()
666
667         # Update the pending write size count with its true value, just in case
668         # some small file was opened, written and closed several times.
669         self._pending_write_size = sum([b.size() for b in small_blocks])
670
671         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
672             return
673
674         new_bb = self._alloc_bufferblock()
675         new_bb.owner = []
676         files = []
677         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
678             bb = small_blocks.pop(0)
679             new_bb.owner.append(bb.owner)
680             self._pending_write_size -= bb.size()
681             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
682             files.append((bb, new_bb.write_pointer - bb.size()))
683
684         self.commit_bufferblock(new_bb, sync=sync)
685
686         for bb, new_bb_segment_offset in files:
687             newsegs = bb.owner.segments()
688             for s in newsegs:
689                 if s.locator == bb.blockid:
690                     s.locator = new_bb.blockid
691                     s.segment_offset = new_bb_segment_offset+s.segment_offset
692             bb.owner.set_segments(newsegs)
693             self._delete_bufferblock(bb.blockid)
694
695     def commit_bufferblock(self, block, sync):
696         """Initiate a background upload of a bufferblock.
697
698         :block:
699           The block object to upload
700
701         :sync:
702           If `sync` is True, upload the block synchronously.
703           If `sync` is False, upload the block asynchronously.  This will
704           return immediately unless the upload queue is at capacity, in
705           which case it will wait on an upload queue slot.
706
707         """
708         try:
709             # Mark the block as PENDING so to disallow any more appends.
710             block.set_state(_BufferBlock.PENDING)
711         except StateChangeError as e:
712             if e.state == _BufferBlock.PENDING:
713                 if sync:
714                     block.wait_for_commit.wait()
715                 else:
716                     return
717             if block.state() == _BufferBlock.COMMITTED:
718                 return
719             elif block.state() == _BufferBlock.ERROR:
720                 raise block.error
721             else:
722                 raise
723
724         if sync:
725             try:
726                 if self.copies is None:
727                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
728                 else:
729                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
730                 block.set_state(_BufferBlock.COMMITTED, loc)
731             except Exception as e:
732                 block.set_state(_BufferBlock.ERROR, e)
733                 raise
734         else:
735             self.start_put_threads()
736             self._put_queue.put(block)
737
738     @synchronized
739     def get_bufferblock(self, locator):
740         return self._bufferblocks.get(locator)
741
742     @synchronized
743     def get_padding_block(self):
744         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
745         when using truncate() to extend the size of a file.
746
747         For reference (and possible future optimization), the md5sum of the
748         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
749
750         """
751
752         if self.padding_block is None:
753             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
754             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
755             self.commit_bufferblock(self.padding_block, False)
756         return self.padding_block
757
758     @synchronized
759     def delete_bufferblock(self, locator):
760         self._delete_bufferblock(locator)
761
762     def _delete_bufferblock(self, locator):
763         if locator in self._bufferblocks:
764             bb = self._bufferblocks[locator]
765             bb.clear()
766             del self._bufferblocks[locator]
767
768     def get_block_contents(self, locator, num_retries, cache_only=False):
769         """Fetch a block.
770
771         First checks to see if the locator is a BufferBlock and return that, if
772         not, passes the request through to KeepClient.get().
773
774         """
775         with self.lock:
776             if locator in self._bufferblocks:
777                 bufferblock = self._bufferblocks[locator]
778                 if bufferblock.state() != _BufferBlock.COMMITTED:
779                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
780                 else:
781                     locator = bufferblock._locator
782         if cache_only:
783             return self._keep.get_from_cache(locator)
784         else:
785             return self._keep.get(locator, num_retries=num_retries)
786
787     def commit_all(self):
788         """Commit all outstanding buffer blocks.
789
790         This is a synchronous call, and will not return until all buffer blocks
791         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
792
793         """
794         self.repack_small_blocks(force=True, sync=True)
795
796         with self.lock:
797             items = listitems(self._bufferblocks)
798
799         for k,v in items:
800             if v.state() != _BufferBlock.COMMITTED and v.owner:
801                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
802                 # state, they're already being committed asynchronously.
803                 if isinstance(v.owner, ArvadosFile):
804                     v.owner.flush(sync=False)
805
806         with self.lock:
807             if self._put_queue is not None:
808                 self._put_queue.join()
809
810                 err = []
811                 for k,v in items:
812                     if v.state() == _BufferBlock.ERROR:
813                         err.append((v.locator(), v.error))
814                 if err:
815                     raise KeepWriteError("Error writing some blocks", err, label="block")
816
817         for k,v in items:
818             # flush again with sync=True to remove committed bufferblocks from
819             # the segments.
820             if v.owner:
821                 if isinstance(v.owner, ArvadosFile):
822                     v.owner.flush(sync=True)
823                 elif isinstance(v.owner, list) and len(v.owner) > 0:
824                     # This bufferblock is referenced by many files as a result
825                     # of repacking small blocks, so don't delete it when flushing
826                     # its owners, just do it after flushing them all.
827                     for owner in v.owner:
828                         owner.flush(sync=True)
829                     self.delete_bufferblock(k)
830
831     def block_prefetch(self, locator):
832         """Initiate a background download of a block.
833
834         This assumes that the underlying KeepClient implements a block cache,
835         so repeated requests for the same block will not result in repeated
836         downloads (unless the block is evicted from the cache.)  This method
837         does not block.
838
839         """
840
841         if not self.prefetch_enabled:
842             return
843
844         with self.lock:
845             if locator in self._bufferblocks:
846                 return
847
848         self.start_get_threads()
849         # _logger.debug("pushing %s to prefetch", locator)
850         self._prefetch_queue.put(locator)
851
852
853 class ArvadosFile(object):
854     """Represent a file in a Collection.
855
856     ArvadosFile manages the underlying representation of a file in Keep as a
857     sequence of segments spanning a set of blocks, and implements random
858     read/write access.
859
860     This object may be accessed from multiple threads.
861
862     """
863
864     __slots__ = ('parent', 'name', '_writers', '_committed',
865                  '_segments', 'lock', '_current_bblock', 'fuse_entry')
866
867     def __init__(self, parent, name, stream=[], segments=[]):
868         """
869         ArvadosFile constructor.
870
871         :stream:
872           a list of Range objects representing a block stream
873
874         :segments:
875           a list of Range objects representing segments
876         """
877         self.parent = parent
878         self.name = name
879         self._writers = set()
880         self._committed = False
881         self._segments = []
882         self.lock = parent.root_collection().lock
883         for s in segments:
884             self._add_segment(stream, s.locator, s.range_size)
885         self._current_bblock = None
886
887     def writable(self):
888         return self.parent.writable()
889
890     @synchronized
891     def permission_expired(self, as_of_dt=None):
892         """Returns True if any of the segment's locators is expired"""
893         for r in self._segments:
894             if KeepLocator(r.locator).permission_expired(as_of_dt):
895                 return True
896         return False
897
898     @synchronized
899     def has_remote_blocks(self):
900         """Returns True if any of the segment's locators has a +R signature"""
901
902         for s in self._segments:
903             if '+R' in s.locator:
904                 return True
905         return False
906
907     @synchronized
908     def _copy_remote_blocks(self, remote_blocks={}):
909         """Ask Keep to copy remote blocks and point to their local copies.
910
911         This is called from the parent Collection.
912
913         :remote_blocks:
914             Shared cache of remote to local block mappings. This is used to avoid
915             doing extra work when blocks are shared by more than one file in
916             different subdirectories.
917         """
918
919         for s in self._segments:
920             if '+R' in s.locator:
921                 try:
922                     loc = remote_blocks[s.locator]
923                 except KeyError:
924                     loc = self.parent._my_keep().refresh_signature(s.locator)
925                     remote_blocks[s.locator] = loc
926                 s.locator = loc
927                 self.parent.set_committed(False)
928         return remote_blocks
929
930     @synchronized
931     def segments(self):
932         return copy.copy(self._segments)
933
934     @synchronized
935     def clone(self, new_parent, new_name):
936         """Make a copy of this file."""
937         cp = ArvadosFile(new_parent, new_name)
938         cp.replace_contents(self)
939         return cp
940
941     @must_be_writable
942     @synchronized
943     def replace_contents(self, other):
944         """Replace segments of this file with segments from another `ArvadosFile` object."""
945
946         map_loc = {}
947         self._segments = []
948         for other_segment in other.segments():
949             new_loc = other_segment.locator
950             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
951                 if other_segment.locator not in map_loc:
952                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
953                     if bufferblock.state() != _BufferBlock.WRITABLE:
954                         map_loc[other_segment.locator] = bufferblock.locator()
955                     else:
956                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
957                 new_loc = map_loc[other_segment.locator]
958
959             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
960
961         self.set_committed(False)
962
963     def __eq__(self, other):
964         if other is self:
965             return True
966         if not isinstance(other, ArvadosFile):
967             return False
968
969         othersegs = other.segments()
970         with self.lock:
971             if len(self._segments) != len(othersegs):
972                 return False
973             for i in range(0, len(othersegs)):
974                 seg1 = self._segments[i]
975                 seg2 = othersegs[i]
976                 loc1 = seg1.locator
977                 loc2 = seg2.locator
978
979                 if self.parent._my_block_manager().is_bufferblock(loc1):
980                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
981
982                 if other.parent._my_block_manager().is_bufferblock(loc2):
983                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
984
985                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
986                     seg1.range_start != seg2.range_start or
987                     seg1.range_size != seg2.range_size or
988                     seg1.segment_offset != seg2.segment_offset):
989                     return False
990
991         return True
992
993     def __ne__(self, other):
994         return not self.__eq__(other)
995
996     @synchronized
997     def set_segments(self, segs):
998         self._segments = segs
999
1000     @synchronized
1001     def set_committed(self, value=True):
1002         """Set committed flag.
1003
1004         If value is True, set committed to be True.
1005
1006         If value is False, set committed to be False for this and all parents.
1007         """
1008         if value == self._committed:
1009             return
1010         self._committed = value
1011         if self._committed is False and self.parent is not None:
1012             self.parent.set_committed(False)
1013
1014     @synchronized
1015     def committed(self):
1016         """Get whether this is committed or not."""
1017         return self._committed
1018
1019     @synchronized
1020     def add_writer(self, writer):
1021         """Add an ArvadosFileWriter reference to the list of writers"""
1022         if isinstance(writer, ArvadosFileWriter):
1023             self._writers.add(writer)
1024
1025     @synchronized
1026     def remove_writer(self, writer, flush):
1027         """
1028         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
1029         and do some block maintenance tasks.
1030         """
1031         self._writers.remove(writer)
1032
1033         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1034             # File writer closed, not small enough for repacking
1035             self.flush()
1036         elif self.closed():
1037             # All writers closed and size is adequate for repacking
1038             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1039
1040     def closed(self):
1041         """
1042         Get whether this is closed or not. When the writers list is empty, the file
1043         is supposed to be closed.
1044         """
1045         return len(self._writers) == 0
1046
1047     @must_be_writable
1048     @synchronized
1049     def truncate(self, size):
1050         """Shrink or expand the size of the file.
1051
1052         If `size` is less than the size of the file, the file contents after
1053         `size` will be discarded.  If `size` is greater than the current size
1054         of the file, it will be filled with zero bytes.
1055
1056         """
1057         if size < self.size():
1058             new_segs = []
1059             for r in self._segments:
1060                 range_end = r.range_start+r.range_size
1061                 if r.range_start >= size:
1062                     # segment is past the trucate size, all done
1063                     break
1064                 elif size < range_end:
1065                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1066                     nr.segment_offset = r.segment_offset
1067                     new_segs.append(nr)
1068                     break
1069                 else:
1070                     new_segs.append(r)
1071
1072             self._segments = new_segs
1073             self.set_committed(False)
1074         elif size > self.size():
1075             padding = self.parent._my_block_manager().get_padding_block()
1076             diff = size - self.size()
1077             while diff > config.KEEP_BLOCK_SIZE:
1078                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1079                 diff -= config.KEEP_BLOCK_SIZE
1080             if diff > 0:
1081                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1082             self.set_committed(False)
1083         else:
1084             # size == self.size()
1085             pass
1086
1087     def readfrom(self, offset, size, num_retries, exact=False):
1088         """Read up to `size` bytes from the file starting at `offset`.
1089
1090         :exact:
1091          If False (default), return less data than requested if the read
1092          crosses a block boundary and the next block isn't cached.  If True,
1093          only return less data than requested when hitting EOF.
1094         """
1095
1096         with self.lock:
1097             if size == 0 or offset >= self.size():
1098                 return b''
1099             readsegs = locators_and_ranges(self._segments, offset, size)
1100             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
1101
1102         locs = set()
1103         data = []
1104         for lr in readsegs:
1105             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1106             if block:
1107                 blockview = memoryview(block)
1108                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1109                 locs.add(lr.locator)
1110             else:
1111                 break
1112
1113         for lr in prefetch:
1114             if lr.locator not in locs:
1115                 self.parent._my_block_manager().block_prefetch(lr.locator)
1116                 locs.add(lr.locator)
1117
1118         return b''.join(data)
1119
1120     @must_be_writable
1121     @synchronized
1122     def writeto(self, offset, data, num_retries):
1123         """Write `data` to the file starting at `offset`.
1124
1125         This will update existing bytes and/or extend the size of the file as
1126         necessary.
1127
1128         """
1129         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1130             data = data.encode()
1131         if len(data) == 0:
1132             return
1133
1134         if offset > self.size():
1135             self.truncate(offset)
1136
1137         if len(data) > config.KEEP_BLOCK_SIZE:
1138             # Chunk it up into smaller writes
1139             n = 0
1140             dataview = memoryview(data)
1141             while n < len(data):
1142                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1143                 n += config.KEEP_BLOCK_SIZE
1144             return
1145
1146         self.set_committed(False)
1147
1148         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1149             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1150
1151         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1152             self._current_bblock.repack_writes()
1153             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1154                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1155                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1156
1157         self._current_bblock.append(data)
1158
1159         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1160
1161         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1162
1163         return len(data)
1164
1165     @synchronized
1166     def flush(self, sync=True, num_retries=0):
1167         """Flush the current bufferblock to Keep.
1168
1169         :sync:
1170           If True, commit block synchronously, wait until buffer block has been written.
1171           If False, commit block asynchronously, return immediately after putting block into
1172           the keep put queue.
1173         """
1174         if self.committed():
1175             return
1176
1177         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1178             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1179                 self._current_bblock.repack_writes()
1180             if self._current_bblock.state() != _BufferBlock.DELETED:
1181                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1182
1183         if sync:
1184             to_delete = set()
1185             for s in self._segments:
1186                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1187                 if bb:
1188                     if bb.state() != _BufferBlock.COMMITTED:
1189                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1190                     to_delete.add(s.locator)
1191                     s.locator = bb.locator()
1192             for s in to_delete:
1193                 # Don't delete the bufferblock if it's owned by many files. It'll be
1194                 # deleted after all of its owners are flush()ed.
1195                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1196                     self.parent._my_block_manager().delete_bufferblock(s)
1197
1198         self.parent.notify(MOD, self.parent, self.name, (self, self))
1199
1200     @must_be_writable
1201     @synchronized
1202     def add_segment(self, blocks, pos, size):
1203         """Add a segment to the end of the file.
1204
1205         `pos` and `offset` reference a section of the stream described by
1206         `blocks` (a list of Range objects)
1207
1208         """
1209         self._add_segment(blocks, pos, size)
1210
1211     def _add_segment(self, blocks, pos, size):
1212         """Internal implementation of add_segment."""
1213         self.set_committed(False)
1214         for lr in locators_and_ranges(blocks, pos, size):
1215             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1216             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1217             self._segments.append(r)
1218
1219     @synchronized
1220     def size(self):
1221         """Get the file size."""
1222         if self._segments:
1223             n = self._segments[-1]
1224             return n.range_start + n.range_size
1225         else:
1226             return 0
1227
1228     @synchronized
1229     def manifest_text(self, stream_name=".", portable_locators=False,
1230                       normalize=False, only_committed=False):
1231         buf = ""
1232         filestream = []
1233         for segment in self._segments:
1234             loc = segment.locator
1235             if self.parent._my_block_manager().is_bufferblock(loc):
1236                 if only_committed:
1237                     continue
1238                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1239             if portable_locators:
1240                 loc = KeepLocator(loc).stripped()
1241             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1242                                  segment.segment_offset, segment.range_size))
1243         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1244         buf += "\n"
1245         return buf
1246
1247     @must_be_writable
1248     @synchronized
1249     def _reparent(self, newparent, newname):
1250         self.set_committed(False)
1251         self.flush(sync=True)
1252         self.parent.remove(self.name)
1253         self.parent = newparent
1254         self.name = newname
1255         self.lock = self.parent.root_collection().lock
1256
1257
1258 class ArvadosFileReader(ArvadosFileReaderBase):
1259     """Wraps ArvadosFile in a file-like object supporting reading only.
1260
1261     Be aware that this class is NOT thread safe as there is no locking around
1262     updating file pointer.
1263
1264     """
1265
1266     def __init__(self, arvadosfile, mode="r", num_retries=None):
1267         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1268         self.arvadosfile = arvadosfile
1269
1270     def size(self):
1271         return self.arvadosfile.size()
1272
1273     def stream_name(self):
1274         return self.arvadosfile.parent.stream_name()
1275
1276     def readinto(self, b):
1277         data = self.read(len(b))
1278         b[:len(data)] = data
1279         return len(data)
1280
1281     @_FileLikeObjectBase._before_close
1282     @retry_method
1283     def read(self, size=None, num_retries=None):
1284         """Read up to `size` bytes from the file and return the result.
1285
1286         Starts at the current file position.  If `size` is None, read the
1287         entire remainder of the file.
1288         """
1289         if size is None:
1290             data = []
1291             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1292             while rd:
1293                 data.append(rd)
1294                 self._filepos += len(rd)
1295                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1296             return b''.join(data)
1297         else:
1298             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1299             self._filepos += len(data)
1300             return data
1301
1302     @_FileLikeObjectBase._before_close
1303     @retry_method
1304     def readfrom(self, offset, size, num_retries=None):
1305         """Read up to `size` bytes from the stream, starting at the specified file offset.
1306
1307         This method does not change the file position.
1308         """
1309         return self.arvadosfile.readfrom(offset, size, num_retries)
1310
1311     def flush(self):
1312         pass
1313
1314
1315 class ArvadosFileWriter(ArvadosFileReader):
1316     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1317
1318     Be aware that this class is NOT thread safe as there is no locking around
1319     updating file pointer.
1320
1321     """
1322
1323     def __init__(self, arvadosfile, mode, num_retries=None):
1324         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1325         self.arvadosfile.add_writer(self)
1326
1327     def writable(self):
1328         return True
1329
1330     @_FileLikeObjectBase._before_close
1331     @retry_method
1332     def write(self, data, num_retries=None):
1333         if self.mode[0] == "a":
1334             self._filepos = self.size()
1335         self.arvadosfile.writeto(self._filepos, data, num_retries)
1336         self._filepos += len(data)
1337         return len(data)
1338
1339     @_FileLikeObjectBase._before_close
1340     @retry_method
1341     def writelines(self, seq, num_retries=None):
1342         for s in seq:
1343             self.write(s, num_retries=num_retries)
1344
1345     @_FileLikeObjectBase._before_close
1346     def truncate(self, size=None):
1347         if size is None:
1348             size = self._filepos
1349         self.arvadosfile.truncate(size)
1350
1351     @_FileLikeObjectBase._before_close
1352     def flush(self):
1353         self.arvadosfile.flush()
1354
1355     def close(self, flush=True):
1356         if not self.closed:
1357             self.arvadosfile.remove_writer(self, flush)
1358             super(ArvadosFileWriter, self).close()
1359
1360
1361 class WrappableFile(object):
1362     """An interface to an Arvados file that's compatible with io wrappers.
1363
1364     """
1365     def __init__(self, f):
1366         self.f = f
1367         self.closed = False
1368     def close(self):
1369         self.closed = True
1370         return self.f.close()
1371     def flush(self):
1372         return self.f.flush()
1373     def read(self, *args, **kwargs):
1374         return self.f.read(*args, **kwargs)
1375     def readable(self):
1376         return self.f.readable()
1377     def readinto(self, *args, **kwargs):
1378         return self.f.readinto(*args, **kwargs)
1379     def seek(self, *args, **kwargs):
1380         return self.f.seek(*args, **kwargs)
1381     def seekable(self):
1382         return self.f.seekable()
1383     def tell(self):
1384         return self.f.tell()
1385     def writable(self):
1386         return self.f.writable()
1387     def write(self, *args, **kwargs):
1388         return self.f.write(*args, **kwargs)