21700: Install Bundler system-wide in Rails postinst
[arvados.git] / sdk / python / arvados / arvfile.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
12 import bz2
13 import collections
14 import copy
15 import errno
16 import functools
17 import hashlib
18 import logging
19 import os
20 import queue
21 import re
22 import sys
23 import threading
24 import uuid
25 import zlib
26
27 from . import config
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
33
34 MOD = "mod"
35 WRITE = "write"
36
37 _logger = logging.getLogger('arvados.arvfile')
38
39 def split(path):
40     """split(path) -> streamname, filename
41
42     Separate the stream name and file name in a /-separated stream path and
43     return a tuple (stream_name, file_name).  If no stream name is available,
44     assume '.'.
45
46     """
47     try:
48         stream_name, file_name = path.rsplit('/', 1)
49     except ValueError:  # No / in string
50         stream_name, file_name = '.', path
51     return stream_name, file_name
52
53
54 class UnownedBlockError(Exception):
55     """Raised when there's an writable block without an owner on the BlockManager."""
56     pass
57
58
59 class _FileLikeObjectBase(object):
60     def __init__(self, name, mode):
61         self.name = name
62         self.mode = mode
63         self.closed = False
64
65     @staticmethod
66     def _before_close(orig_func):
67         @functools.wraps(orig_func)
68         def before_close_wrapper(self, *args, **kwargs):
69             if self.closed:
70                 raise ValueError("I/O operation on closed stream file")
71             return orig_func(self, *args, **kwargs)
72         return before_close_wrapper
73
74     def __enter__(self):
75         return self
76
77     def __exit__(self, exc_type, exc_value, traceback):
78         try:
79             self.close()
80         except Exception:
81             if exc_type is None:
82                 raise
83
84     def close(self):
85         self.closed = True
86
87
88 class ArvadosFileReaderBase(_FileLikeObjectBase):
89     def __init__(self, name, mode, num_retries=None):
90         super(ArvadosFileReaderBase, self).__init__(name, mode)
91         self._filepos = 0
92         self.num_retries = num_retries
93         self._readline_cache = (None, None)
94
95     def __iter__(self):
96         while True:
97             data = self.readline()
98             if not data:
99                 break
100             yield data
101
102     def decompressed_name(self):
103         return re.sub(r'\.(bz2|gz)$', '', self.name)
104
105     @_FileLikeObjectBase._before_close
106     def seek(self, pos, whence=os.SEEK_SET):
107         if whence == os.SEEK_CUR:
108             pos += self._filepos
109         elif whence == os.SEEK_END:
110             pos += self.size()
111         if pos < 0:
112             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
113         self._filepos = pos
114         return self._filepos
115
116     def tell(self):
117         return self._filepos
118
119     def readable(self):
120         return True
121
122     def writable(self):
123         return False
124
125     def seekable(self):
126         return True
127
128     @_FileLikeObjectBase._before_close
129     @retry_method
130     def readall(self, size=2**20, num_retries=None):
131         while True:
132             data = self.read(size, num_retries=num_retries)
133             if len(data) == 0:
134                 break
135             yield data
136
137     @_FileLikeObjectBase._before_close
138     @retry_method
139     def readline(self, size=float('inf'), num_retries=None):
140         cache_pos, cache_data = self._readline_cache
141         if self.tell() == cache_pos:
142             data = [cache_data]
143             self._filepos += len(cache_data)
144         else:
145             data = [b'']
146         data_size = len(data[-1])
147         while (data_size < size) and (b'\n' not in data[-1]):
148             next_read = self.read(2 ** 20, num_retries=num_retries)
149             if not next_read:
150                 break
151             data.append(next_read)
152             data_size += len(next_read)
153         data = b''.join(data)
154         try:
155             nextline_index = data.index(b'\n') + 1
156         except ValueError:
157             nextline_index = len(data)
158         nextline_index = min(nextline_index, size)
159         self._filepos -= len(data) - nextline_index
160         self._readline_cache = (self.tell(), data[nextline_index:])
161         return data[:nextline_index].decode()
162
163     @_FileLikeObjectBase._before_close
164     @retry_method
165     def decompress(self, decompress, size, num_retries=None):
166         for segment in self.readall(size, num_retries=num_retries):
167             data = decompress(segment)
168             if data:
169                 yield data
170
171     @_FileLikeObjectBase._before_close
172     @retry_method
173     def readall_decompressed(self, size=2**20, num_retries=None):
174         self.seek(0)
175         if self.name.endswith('.bz2'):
176             dc = bz2.BZ2Decompressor()
177             return self.decompress(dc.decompress, size,
178                                    num_retries=num_retries)
179         elif self.name.endswith('.gz'):
180             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
181             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
182                                    size, num_retries=num_retries)
183         else:
184             return self.readall(size, num_retries=num_retries)
185
186     @_FileLikeObjectBase._before_close
187     @retry_method
188     def readlines(self, sizehint=float('inf'), num_retries=None):
189         data = []
190         data_size = 0
191         for s in self.readall(num_retries=num_retries):
192             data.append(s)
193             data_size += len(s)
194             if data_size >= sizehint:
195                 break
196         return b''.join(data).decode().splitlines(True)
197
198     def size(self):
199         raise IOError(errno.ENOSYS, "Not implemented")
200
201     def read(self, size, num_retries=None):
202         raise IOError(errno.ENOSYS, "Not implemented")
203
204     def readfrom(self, start, size, num_retries=None):
205         raise IOError(errno.ENOSYS, "Not implemented")
206
207
208 class StreamFileReader(ArvadosFileReaderBase):
209     class _NameAttribute(str):
210         # The Python file API provides a plain .name attribute.
211         # Older SDK provided a name() method.
212         # This class provides both, for maximum compatibility.
213         def __call__(self):
214             return self
215
216     def __init__(self, stream, segments, name):
217         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
218         self._stream = stream
219         self.segments = segments
220
221     def stream_name(self):
222         return self._stream.name()
223
224     def size(self):
225         n = self.segments[-1]
226         return n.range_start + n.range_size
227
228     @_FileLikeObjectBase._before_close
229     @retry_method
230     def read(self, size, num_retries=None):
231         """Read up to 'size' bytes from the stream, starting at the current file position"""
232         if size == 0:
233             return b''
234
235         data = b''
236         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
237         if available_chunks:
238             lr = available_chunks[0]
239             data = self._stream.readfrom(lr.locator+lr.segment_offset,
240                                          lr.segment_size,
241                                          num_retries=num_retries)
242
243         self._filepos += len(data)
244         return data
245
246     @_FileLikeObjectBase._before_close
247     @retry_method
248     def readfrom(self, start, size, num_retries=None):
249         """Read up to 'size' bytes from the stream, starting at 'start'"""
250         if size == 0:
251             return b''
252
253         data = []
254         for lr in locators_and_ranges(self.segments, start, size):
255             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
256                                               num_retries=num_retries))
257         return b''.join(data)
258
259     def as_manifest(self):
260         segs = []
261         for r in self.segments:
262             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
263         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
264
265
266 def synchronized(orig_func):
267     @functools.wraps(orig_func)
268     def synchronized_wrapper(self, *args, **kwargs):
269         with self.lock:
270             return orig_func(self, *args, **kwargs)
271     return synchronized_wrapper
272
273
274 class StateChangeError(Exception):
275     def __init__(self, message, state, nextstate):
276         super(StateChangeError, self).__init__(message)
277         self.state = state
278         self.nextstate = nextstate
279
280 class _BufferBlock(object):
281     """A stand-in for a Keep block that is in the process of being written.
282
283     Writers can append to it, get the size, and compute the Keep locator.
284     There are three valid states:
285
286     WRITABLE
287       Can append to block.
288
289     PENDING
290       Block is in the process of being uploaded to Keep, append is an error.
291
292     COMMITTED
293       The block has been written to Keep, its internal buffer has been
294       released, fetching the block will fetch it via keep client (since we
295       discarded the internal copy), and identifiers referring to the BufferBlock
296       can be replaced with the block locator.
297
298     """
299
300     WRITABLE = 0
301     PENDING = 1
302     COMMITTED = 2
303     ERROR = 3
304     DELETED = 4
305
306     def __init__(self, blockid, starting_capacity, owner):
307         """
308         :blockid:
309           the identifier for this block
310
311         :starting_capacity:
312           the initial buffer capacity
313
314         :owner:
315           ArvadosFile that owns this block
316
317         """
318         self.blockid = blockid
319         self.buffer_block = bytearray(starting_capacity)
320         self.buffer_view = memoryview(self.buffer_block)
321         self.write_pointer = 0
322         self._state = _BufferBlock.WRITABLE
323         self._locator = None
324         self.owner = owner
325         self.lock = threading.Lock()
326         self.wait_for_commit = threading.Event()
327         self.error = None
328
329     @synchronized
330     def append(self, data):
331         """Append some data to the buffer.
332
333         Only valid if the block is in WRITABLE state.  Implements an expanding
334         buffer, doubling capacity as needed to accomdate all the data.
335
336         """
337         if self._state == _BufferBlock.WRITABLE:
338             if not isinstance(data, bytes) and not isinstance(data, memoryview):
339                 data = data.encode()
340             while (self.write_pointer+len(data)) > len(self.buffer_block):
341                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
342                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
343                 self.buffer_block = new_buffer_block
344                 self.buffer_view = memoryview(self.buffer_block)
345             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
346             self.write_pointer += len(data)
347             self._locator = None
348         else:
349             raise AssertionError("Buffer block is not writable")
350
351     STATE_TRANSITIONS = frozenset([
352             (WRITABLE, PENDING),
353             (PENDING, COMMITTED),
354             (PENDING, ERROR),
355             (ERROR, PENDING)])
356
357     @synchronized
358     def set_state(self, nextstate, val=None):
359         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
360             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
361         self._state = nextstate
362
363         if self._state == _BufferBlock.PENDING:
364             self.wait_for_commit.clear()
365
366         if self._state == _BufferBlock.COMMITTED:
367             self._locator = val
368             self.buffer_view = None
369             self.buffer_block = None
370             self.wait_for_commit.set()
371
372         if self._state == _BufferBlock.ERROR:
373             self.error = val
374             self.wait_for_commit.set()
375
376     @synchronized
377     def state(self):
378         return self._state
379
380     def size(self):
381         """The amount of data written to the buffer."""
382         return self.write_pointer
383
384     @synchronized
385     def locator(self):
386         """The Keep locator for this buffer's contents."""
387         if self._locator is None:
388             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
389         return self._locator
390
391     @synchronized
392     def clone(self, new_blockid, owner):
393         if self._state == _BufferBlock.COMMITTED:
394             raise AssertionError("Cannot duplicate committed buffer block")
395         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
396         bufferblock.append(self.buffer_view[0:self.size()])
397         return bufferblock
398
399     @synchronized
400     def clear(self):
401         self._state = _BufferBlock.DELETED
402         self.owner = None
403         self.buffer_block = None
404         self.buffer_view = None
405
406     @synchronized
407     def repack_writes(self):
408         """Optimize buffer block by repacking segments in file sequence.
409
410         When the client makes random writes, they appear in the buffer block in
411         the sequence they were written rather than the sequence they appear in
412         the file.  This makes for inefficient, fragmented manifests.  Attempt
413         to optimize by repacking writes in file sequence.
414
415         """
416         if self._state != _BufferBlock.WRITABLE:
417             raise AssertionError("Cannot repack non-writable block")
418
419         segs = self.owner.segments()
420
421         # Collect the segments that reference the buffer block.
422         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
423
424         # Collect total data referenced by segments (could be smaller than
425         # bufferblock size if a portion of the file was written and
426         # then overwritten).
427         write_total = sum([s.range_size for s in bufferblock_segs])
428
429         if write_total < self.size() or len(bufferblock_segs) > 1:
430             # If there's more than one segment referencing this block, it is
431             # due to out-of-order writes and will produce a fragmented
432             # manifest, so try to optimize by re-packing into a new buffer.
433             contents = self.buffer_view[0:self.write_pointer].tobytes()
434             new_bb = _BufferBlock(None, write_total, None)
435             for t in bufferblock_segs:
436                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
437                 t.segment_offset = new_bb.size() - t.range_size
438
439             self.buffer_block = new_bb.buffer_block
440             self.buffer_view = new_bb.buffer_view
441             self.write_pointer = new_bb.write_pointer
442             self._locator = None
443             new_bb.clear()
444             self.owner.set_segments(segs)
445
446     def __repr__(self):
447         return "<BufferBlock %s>" % (self.blockid)
448
449
450 class NoopLock(object):
451     def __enter__(self):
452         return self
453
454     def __exit__(self, exc_type, exc_value, traceback):
455         pass
456
457     def acquire(self, blocking=False):
458         pass
459
460     def release(self):
461         pass
462
463
464 def must_be_writable(orig_func):
465     @functools.wraps(orig_func)
466     def must_be_writable_wrapper(self, *args, **kwargs):
467         if not self.writable():
468             raise IOError(errno.EROFS, "Collection is read-only.")
469         return orig_func(self, *args, **kwargs)
470     return must_be_writable_wrapper
471
472
473 class _BlockManager(object):
474     """BlockManager handles buffer blocks.
475
476     Also handles background block uploads, and background block prefetch for a
477     Collection of ArvadosFiles.
478
479     """
480
481     DEFAULT_PUT_THREADS = 2
482
483     def __init__(self, keep,
484                  copies=None,
485                  put_threads=None,
486                  num_retries=None,
487                  storage_classes_func=None):
488         """keep: KeepClient object to use"""
489         self._keep = keep
490         self._bufferblocks = collections.OrderedDict()
491         self._put_queue = None
492         self._put_threads = None
493         self.lock = threading.Lock()
494         self.prefetch_lookahead = self._keep.num_prefetch_threads
495         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
496         self.copies = copies
497         self.storage_classes = storage_classes_func or (lambda: [])
498         self._pending_write_size = 0
499         self.threads_lock = threading.Lock()
500         self.padding_block = None
501         self.num_retries = num_retries
502
503     @synchronized
504     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
505         """Allocate a new, empty bufferblock in WRITABLE state and return it.
506
507         :blockid:
508           optional block identifier, otherwise one will be automatically assigned
509
510         :starting_capacity:
511           optional capacity, otherwise will use default capacity
512
513         :owner:
514           ArvadosFile that owns this block
515
516         """
517         return self._alloc_bufferblock(blockid, starting_capacity, owner)
518
519     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
520         if blockid is None:
521             blockid = str(uuid.uuid4())
522         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
523         self._bufferblocks[bufferblock.blockid] = bufferblock
524         return bufferblock
525
526     @synchronized
527     def dup_block(self, block, owner):
528         """Create a new bufferblock initialized with the content of an existing bufferblock.
529
530         :block:
531           the buffer block to copy.
532
533         :owner:
534           ArvadosFile that owns the new block
535
536         """
537         new_blockid = str(uuid.uuid4())
538         bufferblock = block.clone(new_blockid, owner)
539         self._bufferblocks[bufferblock.blockid] = bufferblock
540         return bufferblock
541
542     @synchronized
543     def is_bufferblock(self, locator):
544         return locator in self._bufferblocks
545
546     def _commit_bufferblock_worker(self):
547         """Background uploader thread."""
548
549         while True:
550             try:
551                 bufferblock = self._put_queue.get()
552                 if bufferblock is None:
553                     return
554
555                 if self.copies is None:
556                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
557                 else:
558                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
559                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
560             except Exception as e:
561                 bufferblock.set_state(_BufferBlock.ERROR, e)
562             finally:
563                 if self._put_queue is not None:
564                     self._put_queue.task_done()
565
566     def start_put_threads(self):
567         with self.threads_lock:
568             if self._put_threads is None:
569                 # Start uploader threads.
570
571                 # If we don't limit the Queue size, the upload queue can quickly
572                 # grow to take up gigabytes of RAM if the writing process is
573                 # generating data more quickly than it can be sent to the Keep
574                 # servers.
575                 #
576                 # With two upload threads and a queue size of 2, this means up to 4
577                 # blocks pending.  If they are full 64 MiB blocks, that means up to
578                 # 256 MiB of internal buffering, which is the same size as the
579                 # default download block cache in KeepClient.
580                 self._put_queue = queue.Queue(maxsize=2)
581
582                 self._put_threads = []
583                 for i in range(0, self.num_put_threads):
584                     thread = threading.Thread(target=self._commit_bufferblock_worker)
585                     self._put_threads.append(thread)
586                     thread.daemon = True
587                     thread.start()
588
589     @synchronized
590     def stop_threads(self):
591         """Shut down and wait for background upload and download threads to finish."""
592
593         if self._put_threads is not None:
594             for t in self._put_threads:
595                 self._put_queue.put(None)
596             for t in self._put_threads:
597                 t.join()
598         self._put_threads = None
599         self._put_queue = None
600
601     def __enter__(self):
602         return self
603
604     def __exit__(self, exc_type, exc_value, traceback):
605         self.stop_threads()
606
607     @synchronized
608     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
609         """Packs small blocks together before uploading"""
610
611         self._pending_write_size += closed_file_size
612
613         # Check if there are enough small blocks for filling up one in full
614         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
615             return
616
617         # Search blocks ready for getting packed together before being
618         # committed to Keep.
619         # A WRITABLE block always has an owner.
620         # A WRITABLE block with its owner.closed() implies that its
621         # size is <= KEEP_BLOCK_SIZE/2.
622         try:
623             small_blocks = [b for b in listvalues(self._bufferblocks)
624                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
625         except AttributeError:
626             # Writable blocks without owner shouldn't exist.
627             raise UnownedBlockError()
628
629         if len(small_blocks) <= 1:
630             # Not enough small blocks for repacking
631             return
632
633         for bb in small_blocks:
634             bb.repack_writes()
635
636         # Update the pending write size count with its true value, just in case
637         # some small file was opened, written and closed several times.
638         self._pending_write_size = sum([b.size() for b in small_blocks])
639
640         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
641             return
642
643         new_bb = self._alloc_bufferblock()
644         new_bb.owner = []
645         files = []
646         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
647             bb = small_blocks.pop(0)
648             new_bb.owner.append(bb.owner)
649             self._pending_write_size -= bb.size()
650             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
651             files.append((bb, new_bb.write_pointer - bb.size()))
652
653         self.commit_bufferblock(new_bb, sync=sync)
654
655         for bb, new_bb_segment_offset in files:
656             newsegs = bb.owner.segments()
657             for s in newsegs:
658                 if s.locator == bb.blockid:
659                     s.locator = new_bb.blockid
660                     s.segment_offset = new_bb_segment_offset+s.segment_offset
661             bb.owner.set_segments(newsegs)
662             self._delete_bufferblock(bb.blockid)
663
664     def commit_bufferblock(self, block, sync):
665         """Initiate a background upload of a bufferblock.
666
667         :block:
668           The block object to upload
669
670         :sync:
671           If `sync` is True, upload the block synchronously.
672           If `sync` is False, upload the block asynchronously.  This will
673           return immediately unless the upload queue is at capacity, in
674           which case it will wait on an upload queue slot.
675
676         """
677         try:
678             # Mark the block as PENDING so to disallow any more appends.
679             block.set_state(_BufferBlock.PENDING)
680         except StateChangeError as e:
681             if e.state == _BufferBlock.PENDING:
682                 if sync:
683                     block.wait_for_commit.wait()
684                 else:
685                     return
686             if block.state() == _BufferBlock.COMMITTED:
687                 return
688             elif block.state() == _BufferBlock.ERROR:
689                 raise block.error
690             else:
691                 raise
692
693         if sync:
694             try:
695                 if self.copies is None:
696                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
697                 else:
698                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
699                 block.set_state(_BufferBlock.COMMITTED, loc)
700             except Exception as e:
701                 block.set_state(_BufferBlock.ERROR, e)
702                 raise
703         else:
704             self.start_put_threads()
705             self._put_queue.put(block)
706
707     @synchronized
708     def get_bufferblock(self, locator):
709         return self._bufferblocks.get(locator)
710
711     @synchronized
712     def get_padding_block(self):
713         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
714         when using truncate() to extend the size of a file.
715
716         For reference (and possible future optimization), the md5sum of the
717         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
718
719         """
720
721         if self.padding_block is None:
722             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
723             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
724             self.commit_bufferblock(self.padding_block, False)
725         return self.padding_block
726
727     @synchronized
728     def delete_bufferblock(self, locator):
729         self._delete_bufferblock(locator)
730
731     def _delete_bufferblock(self, locator):
732         if locator in self._bufferblocks:
733             bb = self._bufferblocks[locator]
734             bb.clear()
735             del self._bufferblocks[locator]
736
737     def get_block_contents(self, locator, num_retries, cache_only=False):
738         """Fetch a block.
739
740         First checks to see if the locator is a BufferBlock and return that, if
741         not, passes the request through to KeepClient.get().
742
743         """
744         with self.lock:
745             if locator in self._bufferblocks:
746                 bufferblock = self._bufferblocks[locator]
747                 if bufferblock.state() != _BufferBlock.COMMITTED:
748                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
749                 else:
750                     locator = bufferblock._locator
751         if cache_only:
752             return self._keep.get_from_cache(locator)
753         else:
754             return self._keep.get(locator, num_retries=num_retries)
755
756     def commit_all(self):
757         """Commit all outstanding buffer blocks.
758
759         This is a synchronous call, and will not return until all buffer blocks
760         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
761
762         """
763         self.repack_small_blocks(force=True, sync=True)
764
765         with self.lock:
766             items = listitems(self._bufferblocks)
767
768         for k,v in items:
769             if v.state() != _BufferBlock.COMMITTED and v.owner:
770                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
771                 # state, they're already being committed asynchronously.
772                 if isinstance(v.owner, ArvadosFile):
773                     v.owner.flush(sync=False)
774
775         with self.lock:
776             if self._put_queue is not None:
777                 self._put_queue.join()
778
779                 err = []
780                 for k,v in items:
781                     if v.state() == _BufferBlock.ERROR:
782                         err.append((v.locator(), v.error))
783                 if err:
784                     raise KeepWriteError("Error writing some blocks", err, label="block")
785
786         for k,v in items:
787             # flush again with sync=True to remove committed bufferblocks from
788             # the segments.
789             if v.owner:
790                 if isinstance(v.owner, ArvadosFile):
791                     v.owner.flush(sync=True)
792                 elif isinstance(v.owner, list) and len(v.owner) > 0:
793                     # This bufferblock is referenced by many files as a result
794                     # of repacking small blocks, so don't delete it when flushing
795                     # its owners, just do it after flushing them all.
796                     for owner in v.owner:
797                         owner.flush(sync=True)
798                     self.delete_bufferblock(k)
799
800         self.stop_threads()
801
802     def block_prefetch(self, locator):
803         """Initiate a background download of a block.
804         """
805
806         if not self.prefetch_lookahead:
807             return
808
809         with self.lock:
810             if locator in self._bufferblocks:
811                 return
812
813         self._keep.block_prefetch(locator)
814
815
816 class ArvadosFile(object):
817     """Represent a file in a Collection.
818
819     ArvadosFile manages the underlying representation of a file in Keep as a
820     sequence of segments spanning a set of blocks, and implements random
821     read/write access.
822
823     This object may be accessed from multiple threads.
824
825     """
826
827     __slots__ = ('parent', 'name', '_writers', '_committed',
828                  '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
829
830     def __init__(self, parent, name, stream=[], segments=[]):
831         """
832         ArvadosFile constructor.
833
834         :stream:
835           a list of Range objects representing a block stream
836
837         :segments:
838           a list of Range objects representing segments
839         """
840         self.parent = parent
841         self.name = name
842         self._writers = set()
843         self._committed = False
844         self._segments = []
845         self.lock = parent.root_collection().lock
846         for s in segments:
847             self._add_segment(stream, s.locator, s.range_size)
848         self._current_bblock = None
849         self._read_counter = 0
850
851     def writable(self):
852         return self.parent.writable()
853
854     @synchronized
855     def permission_expired(self, as_of_dt=None):
856         """Returns True if any of the segment's locators is expired"""
857         for r in self._segments:
858             if KeepLocator(r.locator).permission_expired(as_of_dt):
859                 return True
860         return False
861
862     @synchronized
863     def has_remote_blocks(self):
864         """Returns True if any of the segment's locators has a +R signature"""
865
866         for s in self._segments:
867             if '+R' in s.locator:
868                 return True
869         return False
870
871     @synchronized
872     def _copy_remote_blocks(self, remote_blocks={}):
873         """Ask Keep to copy remote blocks and point to their local copies.
874
875         This is called from the parent Collection.
876
877         :remote_blocks:
878             Shared cache of remote to local block mappings. This is used to avoid
879             doing extra work when blocks are shared by more than one file in
880             different subdirectories.
881         """
882
883         for s in self._segments:
884             if '+R' in s.locator:
885                 try:
886                     loc = remote_blocks[s.locator]
887                 except KeyError:
888                     loc = self.parent._my_keep().refresh_signature(s.locator)
889                     remote_blocks[s.locator] = loc
890                 s.locator = loc
891                 self.parent.set_committed(False)
892         return remote_blocks
893
894     @synchronized
895     def segments(self):
896         return copy.copy(self._segments)
897
898     @synchronized
899     def clone(self, new_parent, new_name):
900         """Make a copy of this file."""
901         cp = ArvadosFile(new_parent, new_name)
902         cp.replace_contents(self)
903         return cp
904
905     @must_be_writable
906     @synchronized
907     def replace_contents(self, other):
908         """Replace segments of this file with segments from another `ArvadosFile` object."""
909
910         map_loc = {}
911         self._segments = []
912         for other_segment in other.segments():
913             new_loc = other_segment.locator
914             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
915                 if other_segment.locator not in map_loc:
916                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
917                     if bufferblock.state() != _BufferBlock.WRITABLE:
918                         map_loc[other_segment.locator] = bufferblock.locator()
919                     else:
920                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
921                 new_loc = map_loc[other_segment.locator]
922
923             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
924
925         self.set_committed(False)
926
927     def __eq__(self, other):
928         if other is self:
929             return True
930         if not isinstance(other, ArvadosFile):
931             return False
932
933         othersegs = other.segments()
934         with self.lock:
935             if len(self._segments) != len(othersegs):
936                 return False
937             for i in range(0, len(othersegs)):
938                 seg1 = self._segments[i]
939                 seg2 = othersegs[i]
940                 loc1 = seg1.locator
941                 loc2 = seg2.locator
942
943                 if self.parent._my_block_manager().is_bufferblock(loc1):
944                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
945
946                 if other.parent._my_block_manager().is_bufferblock(loc2):
947                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
948
949                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
950                     seg1.range_start != seg2.range_start or
951                     seg1.range_size != seg2.range_size or
952                     seg1.segment_offset != seg2.segment_offset):
953                     return False
954
955         return True
956
957     def __ne__(self, other):
958         return not self.__eq__(other)
959
960     @synchronized
961     def set_segments(self, segs):
962         self._segments = segs
963
964     @synchronized
965     def set_committed(self, value=True):
966         """Set committed flag.
967
968         If value is True, set committed to be True.
969
970         If value is False, set committed to be False for this and all parents.
971         """
972         if value == self._committed:
973             return
974         self._committed = value
975         if self._committed is False and self.parent is not None:
976             self.parent.set_committed(False)
977
978     @synchronized
979     def committed(self):
980         """Get whether this is committed or not."""
981         return self._committed
982
983     @synchronized
984     def add_writer(self, writer):
985         """Add an ArvadosFileWriter reference to the list of writers"""
986         if isinstance(writer, ArvadosFileWriter):
987             self._writers.add(writer)
988
989     @synchronized
990     def remove_writer(self, writer, flush):
991         """
992         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
993         and do some block maintenance tasks.
994         """
995         self._writers.remove(writer)
996
997         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
998             # File writer closed, not small enough for repacking
999             self.flush()
1000         elif self.closed():
1001             # All writers closed and size is adequate for repacking
1002             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1003
1004     def closed(self):
1005         """
1006         Get whether this is closed or not. When the writers list is empty, the file
1007         is supposed to be closed.
1008         """
1009         return len(self._writers) == 0
1010
1011     @must_be_writable
1012     @synchronized
1013     def truncate(self, size):
1014         """Shrink or expand the size of the file.
1015
1016         If `size` is less than the size of the file, the file contents after
1017         `size` will be discarded.  If `size` is greater than the current size
1018         of the file, it will be filled with zero bytes.
1019
1020         """
1021         if size < self.size():
1022             new_segs = []
1023             for r in self._segments:
1024                 range_end = r.range_start+r.range_size
1025                 if r.range_start >= size:
1026                     # segment is past the trucate size, all done
1027                     break
1028                 elif size < range_end:
1029                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1030                     nr.segment_offset = r.segment_offset
1031                     new_segs.append(nr)
1032                     break
1033                 else:
1034                     new_segs.append(r)
1035
1036             self._segments = new_segs
1037             self.set_committed(False)
1038         elif size > self.size():
1039             padding = self.parent._my_block_manager().get_padding_block()
1040             diff = size - self.size()
1041             while diff > config.KEEP_BLOCK_SIZE:
1042                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1043                 diff -= config.KEEP_BLOCK_SIZE
1044             if diff > 0:
1045                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1046             self.set_committed(False)
1047         else:
1048             # size == self.size()
1049             pass
1050
1051     def readfrom(self, offset, size, num_retries, exact=False):
1052         """Read up to `size` bytes from the file starting at `offset`.
1053
1054         :exact:
1055          If False (default), return less data than requested if the read
1056          crosses a block boundary and the next block isn't cached.  If True,
1057          only return less data than requested when hitting EOF.
1058         """
1059
1060         with self.lock:
1061             if size == 0 or offset >= self.size():
1062                 return b''
1063             readsegs = locators_and_ranges(self._segments, offset, size)
1064
1065             prefetch = None
1066             prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1067             if prefetch_lookahead:
1068                 # Doing prefetch on every read() call is surprisingly expensive
1069                 # when we're trying to deliver data at 600+ MiBps and want
1070                 # the read() fast path to be as lightweight as possible.
1071                 #
1072                 # Only prefetching every 128 read operations
1073                 # dramatically reduces the overhead while still
1074                 # getting the benefit of prefetching (e.g. when
1075                 # reading 128 KiB at a time, it checks for prefetch
1076                 # every 16 MiB).
1077                 self._read_counter = (self._read_counter+1) % 128
1078                 if self._read_counter == 1:
1079                     prefetch = locators_and_ranges(self._segments,
1080                                                    offset + size,
1081                                                    config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1082                                                    limit=(1+prefetch_lookahead))
1083
1084         locs = set()
1085         data = []
1086         for lr in readsegs:
1087             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1088             if block:
1089                 blockview = memoryview(block)
1090                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1091                 locs.add(lr.locator)
1092             else:
1093                 break
1094
1095         if prefetch:
1096             for lr in prefetch:
1097                 if lr.locator not in locs:
1098                     self.parent._my_block_manager().block_prefetch(lr.locator)
1099                     locs.add(lr.locator)
1100
1101         if len(data) == 1:
1102             return data[0]
1103         else:
1104             return b''.join(data)
1105
1106     @must_be_writable
1107     @synchronized
1108     def writeto(self, offset, data, num_retries):
1109         """Write `data` to the file starting at `offset`.
1110
1111         This will update existing bytes and/or extend the size of the file as
1112         necessary.
1113
1114         """
1115         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1116             data = data.encode()
1117         if len(data) == 0:
1118             return
1119
1120         if offset > self.size():
1121             self.truncate(offset)
1122
1123         if len(data) > config.KEEP_BLOCK_SIZE:
1124             # Chunk it up into smaller writes
1125             n = 0
1126             dataview = memoryview(data)
1127             while n < len(data):
1128                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1129                 n += config.KEEP_BLOCK_SIZE
1130             return
1131
1132         self.set_committed(False)
1133
1134         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1135             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1136
1137         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1138             self._current_bblock.repack_writes()
1139             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1140                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1141                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1142
1143         self._current_bblock.append(data)
1144
1145         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1146
1147         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1148
1149         return len(data)
1150
1151     @synchronized
1152     def flush(self, sync=True, num_retries=0):
1153         """Flush the current bufferblock to Keep.
1154
1155         :sync:
1156           If True, commit block synchronously, wait until buffer block has been written.
1157           If False, commit block asynchronously, return immediately after putting block into
1158           the keep put queue.
1159         """
1160         if self.committed():
1161             return
1162
1163         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1164             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1165                 self._current_bblock.repack_writes()
1166             if self._current_bblock.state() != _BufferBlock.DELETED:
1167                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1168
1169         if sync:
1170             to_delete = set()
1171             for s in self._segments:
1172                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1173                 if bb:
1174                     if bb.state() != _BufferBlock.COMMITTED:
1175                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1176                     to_delete.add(s.locator)
1177                     s.locator = bb.locator()
1178             for s in to_delete:
1179                 # Don't delete the bufferblock if it's owned by many files. It'll be
1180                 # deleted after all of its owners are flush()ed.
1181                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1182                     self.parent._my_block_manager().delete_bufferblock(s)
1183
1184         self.parent.notify(MOD, self.parent, self.name, (self, self))
1185
1186     @must_be_writable
1187     @synchronized
1188     def add_segment(self, blocks, pos, size):
1189         """Add a segment to the end of the file.
1190
1191         `pos` and `offset` reference a section of the stream described by
1192         `blocks` (a list of Range objects)
1193
1194         """
1195         self._add_segment(blocks, pos, size)
1196
1197     def _add_segment(self, blocks, pos, size):
1198         """Internal implementation of add_segment."""
1199         self.set_committed(False)
1200         for lr in locators_and_ranges(blocks, pos, size):
1201             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1202             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1203             self._segments.append(r)
1204
1205     @synchronized
1206     def size(self):
1207         """Get the file size."""
1208         if self._segments:
1209             n = self._segments[-1]
1210             return n.range_start + n.range_size
1211         else:
1212             return 0
1213
1214     @synchronized
1215     def manifest_text(self, stream_name=".", portable_locators=False,
1216                       normalize=False, only_committed=False):
1217         buf = ""
1218         filestream = []
1219         for segment in self._segments:
1220             loc = segment.locator
1221             if self.parent._my_block_manager().is_bufferblock(loc):
1222                 if only_committed:
1223                     continue
1224                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1225             if portable_locators:
1226                 loc = KeepLocator(loc).stripped()
1227             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1228                                  segment.segment_offset, segment.range_size))
1229         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1230         buf += "\n"
1231         return buf
1232
1233     @must_be_writable
1234     @synchronized
1235     def _reparent(self, newparent, newname):
1236         self.set_committed(False)
1237         self.flush(sync=True)
1238         self.parent.remove(self.name)
1239         self.parent = newparent
1240         self.name = newname
1241         self.lock = self.parent.root_collection().lock
1242
1243
1244 class ArvadosFileReader(ArvadosFileReaderBase):
1245     """Wraps ArvadosFile in a file-like object supporting reading only.
1246
1247     Be aware that this class is NOT thread safe as there is no locking around
1248     updating file pointer.
1249
1250     """
1251
1252     def __init__(self, arvadosfile, mode="r", num_retries=None):
1253         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1254         self.arvadosfile = arvadosfile
1255
1256     def size(self):
1257         return self.arvadosfile.size()
1258
1259     def stream_name(self):
1260         return self.arvadosfile.parent.stream_name()
1261
1262     def readinto(self, b):
1263         data = self.read(len(b))
1264         b[:len(data)] = data
1265         return len(data)
1266
1267     @_FileLikeObjectBase._before_close
1268     @retry_method
1269     def read(self, size=None, num_retries=None):
1270         """Read up to `size` bytes from the file and return the result.
1271
1272         Starts at the current file position.  If `size` is None, read the
1273         entire remainder of the file.
1274         """
1275         if size is None:
1276             data = []
1277             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1278             while rd:
1279                 data.append(rd)
1280                 self._filepos += len(rd)
1281                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1282             return b''.join(data)
1283         else:
1284             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1285             self._filepos += len(data)
1286             return data
1287
1288     @_FileLikeObjectBase._before_close
1289     @retry_method
1290     def readfrom(self, offset, size, num_retries=None):
1291         """Read up to `size` bytes from the stream, starting at the specified file offset.
1292
1293         This method does not change the file position.
1294         """
1295         return self.arvadosfile.readfrom(offset, size, num_retries)
1296
1297     def flush(self):
1298         pass
1299
1300
1301 class ArvadosFileWriter(ArvadosFileReader):
1302     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1303
1304     Be aware that this class is NOT thread safe as there is no locking around
1305     updating file pointer.
1306
1307     """
1308
1309     def __init__(self, arvadosfile, mode, num_retries=None):
1310         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1311         self.arvadosfile.add_writer(self)
1312
1313     def writable(self):
1314         return True
1315
1316     @_FileLikeObjectBase._before_close
1317     @retry_method
1318     def write(self, data, num_retries=None):
1319         if self.mode[0] == "a":
1320             self._filepos = self.size()
1321         self.arvadosfile.writeto(self._filepos, data, num_retries)
1322         self._filepos += len(data)
1323         return len(data)
1324
1325     @_FileLikeObjectBase._before_close
1326     @retry_method
1327     def writelines(self, seq, num_retries=None):
1328         for s in seq:
1329             self.write(s, num_retries=num_retries)
1330
1331     @_FileLikeObjectBase._before_close
1332     def truncate(self, size=None):
1333         if size is None:
1334             size = self._filepos
1335         self.arvadosfile.truncate(size)
1336
1337     @_FileLikeObjectBase._before_close
1338     def flush(self):
1339         self.arvadosfile.flush()
1340
1341     def close(self, flush=True):
1342         if not self.closed:
1343             self.arvadosfile.remove_writer(self, flush)
1344             super(ArvadosFileWriter, self).close()
1345
1346
1347 class WrappableFile(object):
1348     """An interface to an Arvados file that's compatible with io wrappers.
1349
1350     """
1351     def __init__(self, f):
1352         self.f = f
1353         self.closed = False
1354     def close(self):
1355         self.closed = True
1356         return self.f.close()
1357     def flush(self):
1358         return self.f.flush()
1359     def read(self, *args, **kwargs):
1360         return self.f.read(*args, **kwargs)
1361     def readable(self):
1362         return self.f.readable()
1363     def readinto(self, *args, **kwargs):
1364         return self.f.readinto(*args, **kwargs)
1365     def seek(self, *args, **kwargs):
1366         return self.f.seek(*args, **kwargs)
1367     def seekable(self):
1368         return self.f.seekable()
1369     def tell(self):
1370         return self.f.tell()
1371     def writable(self):
1372         return self.f.writable()
1373     def write(self, *args, **kwargs):
1374         return self.f.write(*args, **kwargs)