21720: added type assertions for AxiosInstance get
[arvados.git] / sdk / python / arvados / arvfile.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import bz2
6 import collections
7 import copy
8 import errno
9 import functools
10 import hashlib
11 import logging
12 import os
13 import queue
14 import re
15 import sys
16 import threading
17 import uuid
18 import zlib
19
20 from . import config
21 from .errors import KeepWriteError, AssertionError, ArgumentError
22 from .keep import KeepLocator
23 from ._normalize_stream import normalize_stream
24 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
25 from .retry import retry_method
26
27 MOD = "mod"
28 WRITE = "write"
29
30 _logger = logging.getLogger('arvados.arvfile')
31
32 def split(path):
33     """split(path) -> streamname, filename
34
35     Separate the stream name and file name in a /-separated stream path and
36     return a tuple (stream_name, file_name).  If no stream name is available,
37     assume '.'.
38
39     """
40     try:
41         stream_name, file_name = path.rsplit('/', 1)
42     except ValueError:  # No / in string
43         stream_name, file_name = '.', path
44     return stream_name, file_name
45
46
47 class UnownedBlockError(Exception):
48     """Raised when there's an writable block without an owner on the BlockManager."""
49     pass
50
51
52 class _FileLikeObjectBase(object):
53     def __init__(self, name, mode):
54         self.name = name
55         self.mode = mode
56         self.closed = False
57
58     @staticmethod
59     def _before_close(orig_func):
60         @functools.wraps(orig_func)
61         def before_close_wrapper(self, *args, **kwargs):
62             if self.closed:
63                 raise ValueError("I/O operation on closed stream file")
64             return orig_func(self, *args, **kwargs)
65         return before_close_wrapper
66
67     def __enter__(self):
68         return self
69
70     def __exit__(self, exc_type, exc_value, traceback):
71         try:
72             self.close()
73         except Exception:
74             if exc_type is None:
75                 raise
76
77     def close(self):
78         self.closed = True
79
80
81 class ArvadosFileReaderBase(_FileLikeObjectBase):
82     def __init__(self, name, mode, num_retries=None):
83         super(ArvadosFileReaderBase, self).__init__(name, mode)
84         self._filepos = 0
85         self.num_retries = num_retries
86         self._readline_cache = (None, None)
87
88     def __iter__(self):
89         while True:
90             data = self.readline()
91             if not data:
92                 break
93             yield data
94
95     def decompressed_name(self):
96         return re.sub(r'\.(bz2|gz)$', '', self.name)
97
98     @_FileLikeObjectBase._before_close
99     def seek(self, pos, whence=os.SEEK_SET):
100         if whence == os.SEEK_CUR:
101             pos += self._filepos
102         elif whence == os.SEEK_END:
103             pos += self.size()
104         if pos < 0:
105             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
106         self._filepos = pos
107         return self._filepos
108
109     def tell(self):
110         return self._filepos
111
112     def readable(self):
113         return True
114
115     def writable(self):
116         return False
117
118     def seekable(self):
119         return True
120
121     @_FileLikeObjectBase._before_close
122     @retry_method
123     def readall(self, size=2**20, num_retries=None):
124         while True:
125             data = self.read(size, num_retries=num_retries)
126             if len(data) == 0:
127                 break
128             yield data
129
130     @_FileLikeObjectBase._before_close
131     @retry_method
132     def readline(self, size=float('inf'), num_retries=None):
133         cache_pos, cache_data = self._readline_cache
134         if self.tell() == cache_pos:
135             data = [cache_data]
136             self._filepos += len(cache_data)
137         else:
138             data = [b'']
139         data_size = len(data[-1])
140         while (data_size < size) and (b'\n' not in data[-1]):
141             next_read = self.read(2 ** 20, num_retries=num_retries)
142             if not next_read:
143                 break
144             data.append(next_read)
145             data_size += len(next_read)
146         data = b''.join(data)
147         try:
148             nextline_index = data.index(b'\n') + 1
149         except ValueError:
150             nextline_index = len(data)
151         nextline_index = min(nextline_index, size)
152         self._filepos -= len(data) - nextline_index
153         self._readline_cache = (self.tell(), data[nextline_index:])
154         return data[:nextline_index].decode()
155
156     @_FileLikeObjectBase._before_close
157     @retry_method
158     def decompress(self, decompress, size, num_retries=None):
159         for segment in self.readall(size, num_retries=num_retries):
160             data = decompress(segment)
161             if data:
162                 yield data
163
164     @_FileLikeObjectBase._before_close
165     @retry_method
166     def readall_decompressed(self, size=2**20, num_retries=None):
167         self.seek(0)
168         if self.name.endswith('.bz2'):
169             dc = bz2.BZ2Decompressor()
170             return self.decompress(dc.decompress, size,
171                                    num_retries=num_retries)
172         elif self.name.endswith('.gz'):
173             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
174             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
175                                    size, num_retries=num_retries)
176         else:
177             return self.readall(size, num_retries=num_retries)
178
179     @_FileLikeObjectBase._before_close
180     @retry_method
181     def readlines(self, sizehint=float('inf'), num_retries=None):
182         data = []
183         data_size = 0
184         for s in self.readall(num_retries=num_retries):
185             data.append(s)
186             data_size += len(s)
187             if data_size >= sizehint:
188                 break
189         return b''.join(data).decode().splitlines(True)
190
191     def size(self):
192         raise IOError(errno.ENOSYS, "Not implemented")
193
194     def read(self, size, num_retries=None):
195         raise IOError(errno.ENOSYS, "Not implemented")
196
197     def readfrom(self, start, size, num_retries=None):
198         raise IOError(errno.ENOSYS, "Not implemented")
199
200
201 class StreamFileReader(ArvadosFileReaderBase):
202     class _NameAttribute(str):
203         # The Python file API provides a plain .name attribute.
204         # Older SDK provided a name() method.
205         # This class provides both, for maximum compatibility.
206         def __call__(self):
207             return self
208
209     def __init__(self, stream, segments, name):
210         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
211         self._stream = stream
212         self.segments = segments
213
214     def stream_name(self):
215         return self._stream.name()
216
217     def size(self):
218         n = self.segments[-1]
219         return n.range_start + n.range_size
220
221     @_FileLikeObjectBase._before_close
222     @retry_method
223     def read(self, size, num_retries=None):
224         """Read up to 'size' bytes from the stream, starting at the current file position"""
225         if size == 0:
226             return b''
227
228         data = b''
229         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
230         if available_chunks:
231             lr = available_chunks[0]
232             data = self._stream.readfrom(lr.locator+lr.segment_offset,
233                                          lr.segment_size,
234                                          num_retries=num_retries)
235
236         self._filepos += len(data)
237         return data
238
239     @_FileLikeObjectBase._before_close
240     @retry_method
241     def readfrom(self, start, size, num_retries=None):
242         """Read up to 'size' bytes from the stream, starting at 'start'"""
243         if size == 0:
244             return b''
245
246         data = []
247         for lr in locators_and_ranges(self.segments, start, size):
248             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
249                                               num_retries=num_retries))
250         return b''.join(data)
251
252     def as_manifest(self):
253         segs = []
254         for r in self.segments:
255             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
256         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
257
258
259 def synchronized(orig_func):
260     @functools.wraps(orig_func)
261     def synchronized_wrapper(self, *args, **kwargs):
262         with self.lock:
263             return orig_func(self, *args, **kwargs)
264     return synchronized_wrapper
265
266
267 class StateChangeError(Exception):
268     def __init__(self, message, state, nextstate):
269         super(StateChangeError, self).__init__(message)
270         self.state = state
271         self.nextstate = nextstate
272
273 class _BufferBlock(object):
274     """A stand-in for a Keep block that is in the process of being written.
275
276     Writers can append to it, get the size, and compute the Keep locator.
277     There are three valid states:
278
279     WRITABLE
280       Can append to block.
281
282     PENDING
283       Block is in the process of being uploaded to Keep, append is an error.
284
285     COMMITTED
286       The block has been written to Keep, its internal buffer has been
287       released, fetching the block will fetch it via keep client (since we
288       discarded the internal copy), and identifiers referring to the BufferBlock
289       can be replaced with the block locator.
290
291     """
292
293     WRITABLE = 0
294     PENDING = 1
295     COMMITTED = 2
296     ERROR = 3
297     DELETED = 4
298
299     def __init__(self, blockid, starting_capacity, owner):
300         """
301         :blockid:
302           the identifier for this block
303
304         :starting_capacity:
305           the initial buffer capacity
306
307         :owner:
308           ArvadosFile that owns this block
309
310         """
311         self.blockid = blockid
312         self.buffer_block = bytearray(starting_capacity)
313         self.buffer_view = memoryview(self.buffer_block)
314         self.write_pointer = 0
315         self._state = _BufferBlock.WRITABLE
316         self._locator = None
317         self.owner = owner
318         self.lock = threading.Lock()
319         self.wait_for_commit = threading.Event()
320         self.error = None
321
322     @synchronized
323     def append(self, data):
324         """Append some data to the buffer.
325
326         Only valid if the block is in WRITABLE state.  Implements an expanding
327         buffer, doubling capacity as needed to accomdate all the data.
328
329         """
330         if self._state == _BufferBlock.WRITABLE:
331             if not isinstance(data, bytes) and not isinstance(data, memoryview):
332                 data = data.encode()
333             while (self.write_pointer+len(data)) > len(self.buffer_block):
334                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
335                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
336                 self.buffer_block = new_buffer_block
337                 self.buffer_view = memoryview(self.buffer_block)
338             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
339             self.write_pointer += len(data)
340             self._locator = None
341         else:
342             raise AssertionError("Buffer block is not writable")
343
344     STATE_TRANSITIONS = frozenset([
345             (WRITABLE, PENDING),
346             (PENDING, COMMITTED),
347             (PENDING, ERROR),
348             (ERROR, PENDING)])
349
350     @synchronized
351     def set_state(self, nextstate, val=None):
352         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
353             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
354         self._state = nextstate
355
356         if self._state == _BufferBlock.PENDING:
357             self.wait_for_commit.clear()
358
359         if self._state == _BufferBlock.COMMITTED:
360             self._locator = val
361             self.buffer_view = None
362             self.buffer_block = None
363             self.wait_for_commit.set()
364
365         if self._state == _BufferBlock.ERROR:
366             self.error = val
367             self.wait_for_commit.set()
368
369     @synchronized
370     def state(self):
371         return self._state
372
373     def size(self):
374         """The amount of data written to the buffer."""
375         return self.write_pointer
376
377     @synchronized
378     def locator(self):
379         """The Keep locator for this buffer's contents."""
380         if self._locator is None:
381             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
382         return self._locator
383
384     @synchronized
385     def clone(self, new_blockid, owner):
386         if self._state == _BufferBlock.COMMITTED:
387             raise AssertionError("Cannot duplicate committed buffer block")
388         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
389         bufferblock.append(self.buffer_view[0:self.size()])
390         return bufferblock
391
392     @synchronized
393     def clear(self):
394         self._state = _BufferBlock.DELETED
395         self.owner = None
396         self.buffer_block = None
397         self.buffer_view = None
398
399     @synchronized
400     def repack_writes(self):
401         """Optimize buffer block by repacking segments in file sequence.
402
403         When the client makes random writes, they appear in the buffer block in
404         the sequence they were written rather than the sequence they appear in
405         the file.  This makes for inefficient, fragmented manifests.  Attempt
406         to optimize by repacking writes in file sequence.
407
408         """
409         if self._state != _BufferBlock.WRITABLE:
410             raise AssertionError("Cannot repack non-writable block")
411
412         segs = self.owner.segments()
413
414         # Collect the segments that reference the buffer block.
415         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
416
417         # Collect total data referenced by segments (could be smaller than
418         # bufferblock size if a portion of the file was written and
419         # then overwritten).
420         write_total = sum([s.range_size for s in bufferblock_segs])
421
422         if write_total < self.size() or len(bufferblock_segs) > 1:
423             # If there's more than one segment referencing this block, it is
424             # due to out-of-order writes and will produce a fragmented
425             # manifest, so try to optimize by re-packing into a new buffer.
426             contents = self.buffer_view[0:self.write_pointer].tobytes()
427             new_bb = _BufferBlock(None, write_total, None)
428             for t in bufferblock_segs:
429                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
430                 t.segment_offset = new_bb.size() - t.range_size
431
432             self.buffer_block = new_bb.buffer_block
433             self.buffer_view = new_bb.buffer_view
434             self.write_pointer = new_bb.write_pointer
435             self._locator = None
436             new_bb.clear()
437             self.owner.set_segments(segs)
438
439     def __repr__(self):
440         return "<BufferBlock %s>" % (self.blockid)
441
442
443 class NoopLock(object):
444     def __enter__(self):
445         return self
446
447     def __exit__(self, exc_type, exc_value, traceback):
448         pass
449
450     def acquire(self, blocking=False):
451         pass
452
453     def release(self):
454         pass
455
456
457 def must_be_writable(orig_func):
458     @functools.wraps(orig_func)
459     def must_be_writable_wrapper(self, *args, **kwargs):
460         if not self.writable():
461             raise IOError(errno.EROFS, "Collection is read-only.")
462         return orig_func(self, *args, **kwargs)
463     return must_be_writable_wrapper
464
465
466 class _BlockManager(object):
467     """BlockManager handles buffer blocks.
468
469     Also handles background block uploads, and background block prefetch for a
470     Collection of ArvadosFiles.
471
472     """
473
474     DEFAULT_PUT_THREADS = 2
475
476     def __init__(self, keep,
477                  copies=None,
478                  put_threads=None,
479                  num_retries=None,
480                  storage_classes_func=None):
481         """keep: KeepClient object to use"""
482         self._keep = keep
483         self._bufferblocks = collections.OrderedDict()
484         self._put_queue = None
485         self._put_threads = None
486         self.lock = threading.Lock()
487         self.prefetch_lookahead = self._keep.num_prefetch_threads
488         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
489         self.copies = copies
490         self.storage_classes = storage_classes_func or (lambda: [])
491         self._pending_write_size = 0
492         self.threads_lock = threading.Lock()
493         self.padding_block = None
494         self.num_retries = num_retries
495
496     @synchronized
497     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
498         """Allocate a new, empty bufferblock in WRITABLE state and return it.
499
500         :blockid:
501           optional block identifier, otherwise one will be automatically assigned
502
503         :starting_capacity:
504           optional capacity, otherwise will use default capacity
505
506         :owner:
507           ArvadosFile that owns this block
508
509         """
510         return self._alloc_bufferblock(blockid, starting_capacity, owner)
511
512     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
513         if blockid is None:
514             blockid = str(uuid.uuid4())
515         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
516         self._bufferblocks[bufferblock.blockid] = bufferblock
517         return bufferblock
518
519     @synchronized
520     def dup_block(self, block, owner):
521         """Create a new bufferblock initialized with the content of an existing bufferblock.
522
523         :block:
524           the buffer block to copy.
525
526         :owner:
527           ArvadosFile that owns the new block
528
529         """
530         new_blockid = str(uuid.uuid4())
531         bufferblock = block.clone(new_blockid, owner)
532         self._bufferblocks[bufferblock.blockid] = bufferblock
533         return bufferblock
534
535     @synchronized
536     def is_bufferblock(self, locator):
537         return locator in self._bufferblocks
538
539     def _commit_bufferblock_worker(self):
540         """Background uploader thread."""
541
542         while True:
543             try:
544                 bufferblock = self._put_queue.get()
545                 if bufferblock is None:
546                     return
547
548                 if self.copies is None:
549                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
550                 else:
551                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
552                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
553             except Exception as e:
554                 bufferblock.set_state(_BufferBlock.ERROR, e)
555             finally:
556                 if self._put_queue is not None:
557                     self._put_queue.task_done()
558
559     def start_put_threads(self):
560         with self.threads_lock:
561             if self._put_threads is None:
562                 # Start uploader threads.
563
564                 # If we don't limit the Queue size, the upload queue can quickly
565                 # grow to take up gigabytes of RAM if the writing process is
566                 # generating data more quickly than it can be sent to the Keep
567                 # servers.
568                 #
569                 # With two upload threads and a queue size of 2, this means up to 4
570                 # blocks pending.  If they are full 64 MiB blocks, that means up to
571                 # 256 MiB of internal buffering, which is the same size as the
572                 # default download block cache in KeepClient.
573                 self._put_queue = queue.Queue(maxsize=2)
574
575                 self._put_threads = []
576                 for i in range(0, self.num_put_threads):
577                     thread = threading.Thread(target=self._commit_bufferblock_worker)
578                     self._put_threads.append(thread)
579                     thread.daemon = True
580                     thread.start()
581
582     @synchronized
583     def stop_threads(self):
584         """Shut down and wait for background upload and download threads to finish."""
585
586         if self._put_threads is not None:
587             for t in self._put_threads:
588                 self._put_queue.put(None)
589             for t in self._put_threads:
590                 t.join()
591         self._put_threads = None
592         self._put_queue = None
593
594     def __enter__(self):
595         return self
596
597     def __exit__(self, exc_type, exc_value, traceback):
598         self.stop_threads()
599
600     @synchronized
601     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
602         """Packs small blocks together before uploading"""
603
604         self._pending_write_size += closed_file_size
605
606         # Check if there are enough small blocks for filling up one in full
607         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
608             return
609
610         # Search blocks ready for getting packed together before being
611         # committed to Keep.
612         # A WRITABLE block always has an owner.
613         # A WRITABLE block with its owner.closed() implies that its
614         # size is <= KEEP_BLOCK_SIZE/2.
615         try:
616             small_blocks = [b for b in self._bufferblocks.values()
617                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
618         except AttributeError:
619             # Writable blocks without owner shouldn't exist.
620             raise UnownedBlockError()
621
622         if len(small_blocks) <= 1:
623             # Not enough small blocks for repacking
624             return
625
626         for bb in small_blocks:
627             bb.repack_writes()
628
629         # Update the pending write size count with its true value, just in case
630         # some small file was opened, written and closed several times.
631         self._pending_write_size = sum([b.size() for b in small_blocks])
632
633         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
634             return
635
636         new_bb = self._alloc_bufferblock()
637         new_bb.owner = []
638         files = []
639         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
640             bb = small_blocks.pop(0)
641             new_bb.owner.append(bb.owner)
642             self._pending_write_size -= bb.size()
643             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
644             files.append((bb, new_bb.write_pointer - bb.size()))
645
646         self.commit_bufferblock(new_bb, sync=sync)
647
648         for bb, new_bb_segment_offset in files:
649             newsegs = bb.owner.segments()
650             for s in newsegs:
651                 if s.locator == bb.blockid:
652                     s.locator = new_bb.blockid
653                     s.segment_offset = new_bb_segment_offset+s.segment_offset
654             bb.owner.set_segments(newsegs)
655             self._delete_bufferblock(bb.blockid)
656
657     def commit_bufferblock(self, block, sync):
658         """Initiate a background upload of a bufferblock.
659
660         :block:
661           The block object to upload
662
663         :sync:
664           If `sync` is True, upload the block synchronously.
665           If `sync` is False, upload the block asynchronously.  This will
666           return immediately unless the upload queue is at capacity, in
667           which case it will wait on an upload queue slot.
668
669         """
670         try:
671             # Mark the block as PENDING so to disallow any more appends.
672             block.set_state(_BufferBlock.PENDING)
673         except StateChangeError as e:
674             if e.state == _BufferBlock.PENDING:
675                 if sync:
676                     block.wait_for_commit.wait()
677                 else:
678                     return
679             if block.state() == _BufferBlock.COMMITTED:
680                 return
681             elif block.state() == _BufferBlock.ERROR:
682                 raise block.error
683             else:
684                 raise
685
686         if sync:
687             try:
688                 if self.copies is None:
689                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
690                 else:
691                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
692                 block.set_state(_BufferBlock.COMMITTED, loc)
693             except Exception as e:
694                 block.set_state(_BufferBlock.ERROR, e)
695                 raise
696         else:
697             self.start_put_threads()
698             self._put_queue.put(block)
699
700     @synchronized
701     def get_bufferblock(self, locator):
702         return self._bufferblocks.get(locator)
703
704     @synchronized
705     def get_padding_block(self):
706         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
707         when using truncate() to extend the size of a file.
708
709         For reference (and possible future optimization), the md5sum of the
710         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
711
712         """
713
714         if self.padding_block is None:
715             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
716             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
717             self.commit_bufferblock(self.padding_block, False)
718         return self.padding_block
719
720     @synchronized
721     def delete_bufferblock(self, locator):
722         self._delete_bufferblock(locator)
723
724     def _delete_bufferblock(self, locator):
725         if locator in self._bufferblocks:
726             bb = self._bufferblocks[locator]
727             bb.clear()
728             del self._bufferblocks[locator]
729
730     def get_block_contents(self, locator, num_retries, cache_only=False):
731         """Fetch a block.
732
733         First checks to see if the locator is a BufferBlock and return that, if
734         not, passes the request through to KeepClient.get().
735
736         """
737         with self.lock:
738             if locator in self._bufferblocks:
739                 bufferblock = self._bufferblocks[locator]
740                 if bufferblock.state() != _BufferBlock.COMMITTED:
741                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
742                 else:
743                     locator = bufferblock._locator
744         if cache_only:
745             return self._keep.get_from_cache(locator)
746         else:
747             return self._keep.get(locator, num_retries=num_retries)
748
749     def commit_all(self):
750         """Commit all outstanding buffer blocks.
751
752         This is a synchronous call, and will not return until all buffer blocks
753         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
754
755         """
756         self.repack_small_blocks(force=True, sync=True)
757
758         with self.lock:
759             items = list(self._bufferblocks.items())
760
761         for k,v in items:
762             if v.state() != _BufferBlock.COMMITTED and v.owner:
763                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
764                 # state, they're already being committed asynchronously.
765                 if isinstance(v.owner, ArvadosFile):
766                     v.owner.flush(sync=False)
767
768         with self.lock:
769             if self._put_queue is not None:
770                 self._put_queue.join()
771
772                 err = []
773                 for k,v in items:
774                     if v.state() == _BufferBlock.ERROR:
775                         err.append((v.locator(), v.error))
776                 if err:
777                     raise KeepWriteError("Error writing some blocks", err, label="block")
778
779         for k,v in items:
780             # flush again with sync=True to remove committed bufferblocks from
781             # the segments.
782             if v.owner:
783                 if isinstance(v.owner, ArvadosFile):
784                     v.owner.flush(sync=True)
785                 elif isinstance(v.owner, list) and len(v.owner) > 0:
786                     # This bufferblock is referenced by many files as a result
787                     # of repacking small blocks, so don't delete it when flushing
788                     # its owners, just do it after flushing them all.
789                     for owner in v.owner:
790                         owner.flush(sync=True)
791                     self.delete_bufferblock(k)
792
793         self.stop_threads()
794
795     def block_prefetch(self, locator):
796         """Initiate a background download of a block.
797         """
798
799         if not self.prefetch_lookahead:
800             return
801
802         with self.lock:
803             if locator in self._bufferblocks:
804                 return
805
806         self._keep.block_prefetch(locator)
807
808
809 class ArvadosFile(object):
810     """Represent a file in a Collection.
811
812     ArvadosFile manages the underlying representation of a file in Keep as a
813     sequence of segments spanning a set of blocks, and implements random
814     read/write access.
815
816     This object may be accessed from multiple threads.
817
818     """
819
820     __slots__ = ('parent', 'name', '_writers', '_committed',
821                  '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
822
823     def __init__(self, parent, name, stream=[], segments=[]):
824         """
825         ArvadosFile constructor.
826
827         :stream:
828           a list of Range objects representing a block stream
829
830         :segments:
831           a list of Range objects representing segments
832         """
833         self.parent = parent
834         self.name = name
835         self._writers = set()
836         self._committed = False
837         self._segments = []
838         self.lock = parent.root_collection().lock
839         for s in segments:
840             self._add_segment(stream, s.locator, s.range_size)
841         self._current_bblock = None
842         self._read_counter = 0
843
844     def writable(self):
845         return self.parent.writable()
846
847     @synchronized
848     def permission_expired(self, as_of_dt=None):
849         """Returns True if any of the segment's locators is expired"""
850         for r in self._segments:
851             if KeepLocator(r.locator).permission_expired(as_of_dt):
852                 return True
853         return False
854
855     @synchronized
856     def has_remote_blocks(self):
857         """Returns True if any of the segment's locators has a +R signature"""
858
859         for s in self._segments:
860             if '+R' in s.locator:
861                 return True
862         return False
863
864     @synchronized
865     def _copy_remote_blocks(self, remote_blocks={}):
866         """Ask Keep to copy remote blocks and point to their local copies.
867
868         This is called from the parent Collection.
869
870         :remote_blocks:
871             Shared cache of remote to local block mappings. This is used to avoid
872             doing extra work when blocks are shared by more than one file in
873             different subdirectories.
874         """
875
876         for s in self._segments:
877             if '+R' in s.locator:
878                 try:
879                     loc = remote_blocks[s.locator]
880                 except KeyError:
881                     loc = self.parent._my_keep().refresh_signature(s.locator)
882                     remote_blocks[s.locator] = loc
883                 s.locator = loc
884                 self.parent.set_committed(False)
885         return remote_blocks
886
887     @synchronized
888     def segments(self):
889         return copy.copy(self._segments)
890
891     @synchronized
892     def clone(self, new_parent, new_name):
893         """Make a copy of this file."""
894         cp = ArvadosFile(new_parent, new_name)
895         cp.replace_contents(self)
896         return cp
897
898     @must_be_writable
899     @synchronized
900     def replace_contents(self, other):
901         """Replace segments of this file with segments from another `ArvadosFile` object."""
902
903         map_loc = {}
904         self._segments = []
905         for other_segment in other.segments():
906             new_loc = other_segment.locator
907             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
908                 if other_segment.locator not in map_loc:
909                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
910                     if bufferblock.state() != _BufferBlock.WRITABLE:
911                         map_loc[other_segment.locator] = bufferblock.locator()
912                     else:
913                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
914                 new_loc = map_loc[other_segment.locator]
915
916             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
917
918         self.set_committed(False)
919
920     def __eq__(self, other):
921         if other is self:
922             return True
923         if not isinstance(other, ArvadosFile):
924             return False
925
926         othersegs = other.segments()
927         with self.lock:
928             if len(self._segments) != len(othersegs):
929                 return False
930             for i in range(0, len(othersegs)):
931                 seg1 = self._segments[i]
932                 seg2 = othersegs[i]
933                 loc1 = seg1.locator
934                 loc2 = seg2.locator
935
936                 if self.parent._my_block_manager().is_bufferblock(loc1):
937                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
938
939                 if other.parent._my_block_manager().is_bufferblock(loc2):
940                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
941
942                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
943                     seg1.range_start != seg2.range_start or
944                     seg1.range_size != seg2.range_size or
945                     seg1.segment_offset != seg2.segment_offset):
946                     return False
947
948         return True
949
950     def __ne__(self, other):
951         return not self.__eq__(other)
952
953     @synchronized
954     def set_segments(self, segs):
955         self._segments = segs
956
957     @synchronized
958     def set_committed(self, value=True):
959         """Set committed flag.
960
961         If value is True, set committed to be True.
962
963         If value is False, set committed to be False for this and all parents.
964         """
965         if value == self._committed:
966             return
967         self._committed = value
968         if self._committed is False and self.parent is not None:
969             self.parent.set_committed(False)
970
971     @synchronized
972     def committed(self):
973         """Get whether this is committed or not."""
974         return self._committed
975
976     @synchronized
977     def add_writer(self, writer):
978         """Add an ArvadosFileWriter reference to the list of writers"""
979         if isinstance(writer, ArvadosFileWriter):
980             self._writers.add(writer)
981
982     @synchronized
983     def remove_writer(self, writer, flush):
984         """
985         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
986         and do some block maintenance tasks.
987         """
988         self._writers.remove(writer)
989
990         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
991             # File writer closed, not small enough for repacking
992             self.flush()
993         elif self.closed():
994             # All writers closed and size is adequate for repacking
995             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
996
997     def closed(self):
998         """
999         Get whether this is closed or not. When the writers list is empty, the file
1000         is supposed to be closed.
1001         """
1002         return len(self._writers) == 0
1003
1004     @must_be_writable
1005     @synchronized
1006     def truncate(self, size):
1007         """Shrink or expand the size of the file.
1008
1009         If `size` is less than the size of the file, the file contents after
1010         `size` will be discarded.  If `size` is greater than the current size
1011         of the file, it will be filled with zero bytes.
1012
1013         """
1014         if size < self.size():
1015             new_segs = []
1016             for r in self._segments:
1017                 range_end = r.range_start+r.range_size
1018                 if r.range_start >= size:
1019                     # segment is past the trucate size, all done
1020                     break
1021                 elif size < range_end:
1022                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1023                     nr.segment_offset = r.segment_offset
1024                     new_segs.append(nr)
1025                     break
1026                 else:
1027                     new_segs.append(r)
1028
1029             self._segments = new_segs
1030             self.set_committed(False)
1031         elif size > self.size():
1032             padding = self.parent._my_block_manager().get_padding_block()
1033             diff = size - self.size()
1034             while diff > config.KEEP_BLOCK_SIZE:
1035                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1036                 diff -= config.KEEP_BLOCK_SIZE
1037             if diff > 0:
1038                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1039             self.set_committed(False)
1040         else:
1041             # size == self.size()
1042             pass
1043
1044     def readfrom(self, offset, size, num_retries, exact=False):
1045         """Read up to `size` bytes from the file starting at `offset`.
1046
1047         :exact:
1048          If False (default), return less data than requested if the read
1049          crosses a block boundary and the next block isn't cached.  If True,
1050          only return less data than requested when hitting EOF.
1051         """
1052
1053         with self.lock:
1054             if size == 0 or offset >= self.size():
1055                 return b''
1056             readsegs = locators_and_ranges(self._segments, offset, size)
1057
1058             prefetch = None
1059             prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1060             if prefetch_lookahead:
1061                 # Doing prefetch on every read() call is surprisingly expensive
1062                 # when we're trying to deliver data at 600+ MiBps and want
1063                 # the read() fast path to be as lightweight as possible.
1064                 #
1065                 # Only prefetching every 128 read operations
1066                 # dramatically reduces the overhead while still
1067                 # getting the benefit of prefetching (e.g. when
1068                 # reading 128 KiB at a time, it checks for prefetch
1069                 # every 16 MiB).
1070                 self._read_counter = (self._read_counter+1) % 128
1071                 if self._read_counter == 1:
1072                     prefetch = locators_and_ranges(self._segments,
1073                                                    offset + size,
1074                                                    config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1075                                                    limit=(1+prefetch_lookahead))
1076
1077         locs = set()
1078         data = []
1079         for lr in readsegs:
1080             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1081             if block:
1082                 blockview = memoryview(block)
1083                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1084                 locs.add(lr.locator)
1085             else:
1086                 break
1087
1088         if prefetch:
1089             for lr in prefetch:
1090                 if lr.locator not in locs:
1091                     self.parent._my_block_manager().block_prefetch(lr.locator)
1092                     locs.add(lr.locator)
1093
1094         if len(data) == 1:
1095             return data[0]
1096         else:
1097             return b''.join(data)
1098
1099     @must_be_writable
1100     @synchronized
1101     def writeto(self, offset, data, num_retries):
1102         """Write `data` to the file starting at `offset`.
1103
1104         This will update existing bytes and/or extend the size of the file as
1105         necessary.
1106
1107         """
1108         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1109             data = data.encode()
1110         if len(data) == 0:
1111             return
1112
1113         if offset > self.size():
1114             self.truncate(offset)
1115
1116         if len(data) > config.KEEP_BLOCK_SIZE:
1117             # Chunk it up into smaller writes
1118             n = 0
1119             dataview = memoryview(data)
1120             while n < len(data):
1121                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1122                 n += config.KEEP_BLOCK_SIZE
1123             return
1124
1125         self.set_committed(False)
1126
1127         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1128             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1129
1130         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1131             self._current_bblock.repack_writes()
1132             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1133                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1134                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1135
1136         self._current_bblock.append(data)
1137
1138         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1139
1140         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1141
1142         return len(data)
1143
1144     @synchronized
1145     def flush(self, sync=True, num_retries=0):
1146         """Flush the current bufferblock to Keep.
1147
1148         :sync:
1149           If True, commit block synchronously, wait until buffer block has been written.
1150           If False, commit block asynchronously, return immediately after putting block into
1151           the keep put queue.
1152         """
1153         if self.committed():
1154             return
1155
1156         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1157             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1158                 self._current_bblock.repack_writes()
1159             if self._current_bblock.state() != _BufferBlock.DELETED:
1160                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1161
1162         if sync:
1163             to_delete = set()
1164             for s in self._segments:
1165                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1166                 if bb:
1167                     if bb.state() != _BufferBlock.COMMITTED:
1168                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1169                     to_delete.add(s.locator)
1170                     s.locator = bb.locator()
1171             for s in to_delete:
1172                 # Don't delete the bufferblock if it's owned by many files. It'll be
1173                 # deleted after all of its owners are flush()ed.
1174                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1175                     self.parent._my_block_manager().delete_bufferblock(s)
1176
1177         self.parent.notify(MOD, self.parent, self.name, (self, self))
1178
1179     @must_be_writable
1180     @synchronized
1181     def add_segment(self, blocks, pos, size):
1182         """Add a segment to the end of the file.
1183
1184         `pos` and `offset` reference a section of the stream described by
1185         `blocks` (a list of Range objects)
1186
1187         """
1188         self._add_segment(blocks, pos, size)
1189
1190     def _add_segment(self, blocks, pos, size):
1191         """Internal implementation of add_segment."""
1192         self.set_committed(False)
1193         for lr in locators_and_ranges(blocks, pos, size):
1194             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1195             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1196             self._segments.append(r)
1197
1198     @synchronized
1199     def size(self):
1200         """Get the file size."""
1201         if self._segments:
1202             n = self._segments[-1]
1203             return n.range_start + n.range_size
1204         else:
1205             return 0
1206
1207     @synchronized
1208     def manifest_text(self, stream_name=".", portable_locators=False,
1209                       normalize=False, only_committed=False):
1210         buf = ""
1211         filestream = []
1212         for segment in self._segments:
1213             loc = segment.locator
1214             if self.parent._my_block_manager().is_bufferblock(loc):
1215                 if only_committed:
1216                     continue
1217                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1218             if portable_locators:
1219                 loc = KeepLocator(loc).stripped()
1220             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1221                                  segment.segment_offset, segment.range_size))
1222         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1223         buf += "\n"
1224         return buf
1225
1226     @must_be_writable
1227     @synchronized
1228     def _reparent(self, newparent, newname):
1229         self.set_committed(False)
1230         self.flush(sync=True)
1231         self.parent.remove(self.name)
1232         self.parent = newparent
1233         self.name = newname
1234         self.lock = self.parent.root_collection().lock
1235
1236
1237 class ArvadosFileReader(ArvadosFileReaderBase):
1238     """Wraps ArvadosFile in a file-like object supporting reading only.
1239
1240     Be aware that this class is NOT thread safe as there is no locking around
1241     updating file pointer.
1242
1243     """
1244
1245     def __init__(self, arvadosfile, mode="r", num_retries=None):
1246         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1247         self.arvadosfile = arvadosfile
1248
1249     def size(self):
1250         return self.arvadosfile.size()
1251
1252     def stream_name(self):
1253         return self.arvadosfile.parent.stream_name()
1254
1255     def readinto(self, b):
1256         data = self.read(len(b))
1257         b[:len(data)] = data
1258         return len(data)
1259
1260     @_FileLikeObjectBase._before_close
1261     @retry_method
1262     def read(self, size=None, num_retries=None):
1263         """Read up to `size` bytes from the file and return the result.
1264
1265         Starts at the current file position.  If `size` is None, read the
1266         entire remainder of the file.
1267         """
1268         if size is None:
1269             data = []
1270             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1271             while rd:
1272                 data.append(rd)
1273                 self._filepos += len(rd)
1274                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1275             return b''.join(data)
1276         else:
1277             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1278             self._filepos += len(data)
1279             return data
1280
1281     @_FileLikeObjectBase._before_close
1282     @retry_method
1283     def readfrom(self, offset, size, num_retries=None):
1284         """Read up to `size` bytes from the stream, starting at the specified file offset.
1285
1286         This method does not change the file position.
1287         """
1288         return self.arvadosfile.readfrom(offset, size, num_retries)
1289
1290     def flush(self):
1291         pass
1292
1293
1294 class ArvadosFileWriter(ArvadosFileReader):
1295     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1296
1297     Be aware that this class is NOT thread safe as there is no locking around
1298     updating file pointer.
1299
1300     """
1301
1302     def __init__(self, arvadosfile, mode, num_retries=None):
1303         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1304         self.arvadosfile.add_writer(self)
1305
1306     def writable(self):
1307         return True
1308
1309     @_FileLikeObjectBase._before_close
1310     @retry_method
1311     def write(self, data, num_retries=None):
1312         if self.mode[0] == "a":
1313             self._filepos = self.size()
1314         self.arvadosfile.writeto(self._filepos, data, num_retries)
1315         self._filepos += len(data)
1316         return len(data)
1317
1318     @_FileLikeObjectBase._before_close
1319     @retry_method
1320     def writelines(self, seq, num_retries=None):
1321         for s in seq:
1322             self.write(s, num_retries=num_retries)
1323
1324     @_FileLikeObjectBase._before_close
1325     def truncate(self, size=None):
1326         if size is None:
1327             size = self._filepos
1328         self.arvadosfile.truncate(size)
1329
1330     @_FileLikeObjectBase._before_close
1331     def flush(self):
1332         self.arvadosfile.flush()
1333
1334     def close(self, flush=True):
1335         if not self.closed:
1336             self.arvadosfile.remove_writer(self, flush)
1337             super(ArvadosFileWriter, self).close()
1338
1339
1340 class WrappableFile(object):
1341     """An interface to an Arvados file that's compatible with io wrappers.
1342
1343     """
1344     def __init__(self, f):
1345         self.f = f
1346         self.closed = False
1347     def close(self):
1348         self.closed = True
1349         return self.f.close()
1350     def flush(self):
1351         return self.f.flush()
1352     def read(self, *args, **kwargs):
1353         return self.f.read(*args, **kwargs)
1354     def readable(self):
1355         return self.f.readable()
1356     def readinto(self, *args, **kwargs):
1357         return self.f.readinto(*args, **kwargs)
1358     def seek(self, *args, **kwargs):
1359         return self.f.seek(*args, **kwargs)
1360     def seekable(self):
1361         return self.f.seekable()
1362     def tell(self):
1363         return self.f.tell()
1364     def writable(self):
1365         return self.f.writable()
1366     def write(self, *args, **kwargs):
1367         return self.f.write(*args, **kwargs)