Merge branch 'main' into 15397-remove-obsolete-apis
[arvados.git] / sdk / python / arvados / arvfile.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import bz2
6 import collections
7 import copy
8 import errno
9 import functools
10 import hashlib
11 import logging
12 import os
13 import queue
14 import re
15 import sys
16 import threading
17 import uuid
18 import zlib
19
20 from . import config
21 from .errors import KeepWriteError, AssertionError, ArgumentError
22 from .keep import KeepLocator
23 from ._normalize_stream import normalize_stream
24 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
25 from .retry import retry_method
26
27 MOD = "mod"
28 WRITE = "write"
29
30 _logger = logging.getLogger('arvados.arvfile')
31
32 def split(path):
33     """split(path) -> streamname, filename
34
35     Separate the stream name and file name in a /-separated stream path and
36     return a tuple (stream_name, file_name).  If no stream name is available,
37     assume '.'.
38
39     """
40     try:
41         stream_name, file_name = path.rsplit('/', 1)
42     except ValueError:  # No / in string
43         stream_name, file_name = '.', path
44     return stream_name, file_name
45
46
47 class UnownedBlockError(Exception):
48     """Raised when there's an writable block without an owner on the BlockManager."""
49     pass
50
51
52 class _FileLikeObjectBase(object):
53     def __init__(self, name, mode):
54         self.name = name
55         self.mode = mode
56         self.closed = False
57
58     @staticmethod
59     def _before_close(orig_func):
60         @functools.wraps(orig_func)
61         def before_close_wrapper(self, *args, **kwargs):
62             if self.closed:
63                 raise ValueError("I/O operation on closed stream file")
64             return orig_func(self, *args, **kwargs)
65         return before_close_wrapper
66
67     def __enter__(self):
68         return self
69
70     def __exit__(self, exc_type, exc_value, traceback):
71         try:
72             self.close()
73         except Exception:
74             if exc_type is None:
75                 raise
76
77     def close(self):
78         self.closed = True
79
80
81 class ArvadosFileReaderBase(_FileLikeObjectBase):
82     def __init__(self, name, mode, num_retries=None):
83         super(ArvadosFileReaderBase, self).__init__(name, mode)
84         self._filepos = 0
85         self.num_retries = num_retries
86         self._readline_cache = (None, None)
87
88     def __iter__(self):
89         while True:
90             data = self.readline()
91             if not data:
92                 break
93             yield data
94
95     def decompressed_name(self):
96         return re.sub(r'\.(bz2|gz)$', '', self.name)
97
98     @_FileLikeObjectBase._before_close
99     def seek(self, pos, whence=os.SEEK_SET):
100         if whence == os.SEEK_CUR:
101             pos += self._filepos
102         elif whence == os.SEEK_END:
103             pos += self.size()
104         if pos < 0:
105             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
106         self._filepos = pos
107         return self._filepos
108
109     def tell(self):
110         return self._filepos
111
112     def readable(self):
113         return True
114
115     def writable(self):
116         return False
117
118     def seekable(self):
119         return True
120
121     @_FileLikeObjectBase._before_close
122     @retry_method
123     def readall(self, size=2**20, num_retries=None):
124         while True:
125             data = self.read(size, num_retries=num_retries)
126             if len(data) == 0:
127                 break
128             yield data
129
130     @_FileLikeObjectBase._before_close
131     @retry_method
132     def readline(self, size=float('inf'), num_retries=None):
133         cache_pos, cache_data = self._readline_cache
134         if self.tell() == cache_pos:
135             data = [cache_data]
136             self._filepos += len(cache_data)
137         else:
138             data = [b'']
139         data_size = len(data[-1])
140         while (data_size < size) and (b'\n' not in data[-1]):
141             next_read = self.read(2 ** 20, num_retries=num_retries)
142             if not next_read:
143                 break
144             data.append(next_read)
145             data_size += len(next_read)
146         data = b''.join(data)
147         try:
148             nextline_index = data.index(b'\n') + 1
149         except ValueError:
150             nextline_index = len(data)
151         nextline_index = min(nextline_index, size)
152         self._filepos -= len(data) - nextline_index
153         self._readline_cache = (self.tell(), data[nextline_index:])
154         return data[:nextline_index].decode()
155
156     @_FileLikeObjectBase._before_close
157     @retry_method
158     def decompress(self, decompress, size, num_retries=None):
159         for segment in self.readall(size, num_retries=num_retries):
160             data = decompress(segment)
161             if data:
162                 yield data
163
164     @_FileLikeObjectBase._before_close
165     @retry_method
166     def readall_decompressed(self, size=2**20, num_retries=None):
167         self.seek(0)
168         if self.name.endswith('.bz2'):
169             dc = bz2.BZ2Decompressor()
170             return self.decompress(dc.decompress, size,
171                                    num_retries=num_retries)
172         elif self.name.endswith('.gz'):
173             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
174             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
175                                    size, num_retries=num_retries)
176         else:
177             return self.readall(size, num_retries=num_retries)
178
179     @_FileLikeObjectBase._before_close
180     @retry_method
181     def readlines(self, sizehint=float('inf'), num_retries=None):
182         data = []
183         data_size = 0
184         for s in self.readall(num_retries=num_retries):
185             data.append(s)
186             data_size += len(s)
187             if data_size >= sizehint:
188                 break
189         return b''.join(data).decode().splitlines(True)
190
191     def size(self):
192         raise IOError(errno.ENOSYS, "Not implemented")
193
194     def read(self, size, num_retries=None):
195         raise IOError(errno.ENOSYS, "Not implemented")
196
197     def readfrom(self, start, size, num_retries=None):
198         raise IOError(errno.ENOSYS, "Not implemented")
199
200
201 class StreamFileReader(ArvadosFileReaderBase):
202     class _NameAttribute(str):
203         # The Python file API provides a plain .name attribute.
204         # Older SDK provided a name() method.
205         # This class provides both, for maximum compatibility.
206         def __call__(self):
207             return self
208
209     def __init__(self, stream, segments, name):
210         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
211         self._stream = stream
212         self.segments = segments
213
214     def stream_name(self):
215         return self._stream.name()
216
217     def size(self):
218         n = self.segments[-1]
219         return n.range_start + n.range_size
220
221     @_FileLikeObjectBase._before_close
222     @retry_method
223     def read(self, size, num_retries=None):
224         """Read up to 'size' bytes from the stream, starting at the current file position"""
225         if size == 0:
226             return b''
227
228         data = b''
229         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
230         if available_chunks:
231             lr = available_chunks[0]
232             data = self._stream.readfrom(lr.locator+lr.segment_offset,
233                                          lr.segment_size,
234                                          num_retries=num_retries)
235
236         self._filepos += len(data)
237         return data
238
239     @_FileLikeObjectBase._before_close
240     @retry_method
241     def readfrom(self, start, size, num_retries=None):
242         """Read up to 'size' bytes from the stream, starting at 'start'"""
243         if size == 0:
244             return b''
245
246         data = []
247         for lr in locators_and_ranges(self.segments, start, size):
248             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
249                                               num_retries=num_retries))
250         return b''.join(data)
251
252     def as_manifest(self):
253         segs = []
254         for r in self.segments:
255             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
256         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
257
258
259 def synchronized(orig_func):
260     @functools.wraps(orig_func)
261     def synchronized_wrapper(self, *args, **kwargs):
262         with self.lock:
263             return orig_func(self, *args, **kwargs)
264     return synchronized_wrapper
265
266
267 class StateChangeError(Exception):
268     def __init__(self, message, state, nextstate):
269         super(StateChangeError, self).__init__(message)
270         self.state = state
271         self.nextstate = nextstate
272
273 class _BufferBlock(object):
274     """A stand-in for a Keep block that is in the process of being written.
275
276     Writers can append to it, get the size, and compute the Keep locator.
277     There are three valid states:
278
279     WRITABLE
280       Can append to block.
281
282     PENDING
283       Block is in the process of being uploaded to Keep, append is an error.
284
285     COMMITTED
286       The block has been written to Keep, its internal buffer has been
287       released, fetching the block will fetch it via keep client (since we
288       discarded the internal copy), and identifiers referring to the BufferBlock
289       can be replaced with the block locator.
290
291     """
292
293     WRITABLE = 0
294     PENDING = 1
295     COMMITTED = 2
296     ERROR = 3
297     DELETED = 4
298
299     def __init__(self, blockid, starting_capacity, owner):
300         """
301         :blockid:
302           the identifier for this block
303
304         :starting_capacity:
305           the initial buffer capacity
306
307         :owner:
308           ArvadosFile that owns this block
309
310         """
311         self.blockid = blockid
312         self.buffer_block = bytearray(starting_capacity)
313         self.buffer_view = memoryview(self.buffer_block)
314         self.write_pointer = 0
315         self._state = _BufferBlock.WRITABLE
316         self._locator = None
317         self.owner = owner
318         self.lock = threading.Lock()
319         self.wait_for_commit = threading.Event()
320         self.error = None
321
322     @synchronized
323     def append(self, data):
324         """Append some data to the buffer.
325
326         Only valid if the block is in WRITABLE state.  Implements an expanding
327         buffer, doubling capacity as needed to accomdate all the data.
328
329         """
330         if self._state == _BufferBlock.WRITABLE:
331             if not isinstance(data, bytes) and not isinstance(data, memoryview):
332                 data = data.encode()
333             while (self.write_pointer+len(data)) > len(self.buffer_block):
334                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
335                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
336                 self.buffer_block = new_buffer_block
337                 self.buffer_view = memoryview(self.buffer_block)
338             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
339             self.write_pointer += len(data)
340             self._locator = None
341         else:
342             raise AssertionError("Buffer block is not writable")
343
344     STATE_TRANSITIONS = frozenset([
345             (WRITABLE, PENDING),
346             (PENDING, COMMITTED),
347             (PENDING, ERROR),
348             (ERROR, PENDING)])
349
350     @synchronized
351     def set_state(self, nextstate, val=None):
352         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
353             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
354         self._state = nextstate
355
356         if self._state == _BufferBlock.PENDING:
357             self.wait_for_commit.clear()
358
359         if self._state == _BufferBlock.COMMITTED:
360             self._locator = val
361             self.buffer_view = None
362             self.buffer_block = None
363             self.wait_for_commit.set()
364
365         if self._state == _BufferBlock.ERROR:
366             self.error = val
367             self.wait_for_commit.set()
368
369     @synchronized
370     def state(self):
371         return self._state
372
373     def size(self):
374         """The amount of data written to the buffer."""
375         return self.write_pointer
376
377     @synchronized
378     def locator(self):
379         """The Keep locator for this buffer's contents."""
380         if self._locator is None:
381             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
382         return self._locator
383
384     @synchronized
385     def clone(self, new_blockid, owner):
386         if self._state == _BufferBlock.COMMITTED:
387             raise AssertionError("Cannot duplicate committed buffer block")
388         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
389         bufferblock.append(self.buffer_view[0:self.size()])
390         return bufferblock
391
392     @synchronized
393     def clear(self):
394         self._state = _BufferBlock.DELETED
395         self.owner = None
396         self.buffer_block = None
397         self.buffer_view = None
398
399     @synchronized
400     def repack_writes(self):
401         """Optimize buffer block by repacking segments in file sequence.
402
403         When the client makes random writes, they appear in the buffer block in
404         the sequence they were written rather than the sequence they appear in
405         the file.  This makes for inefficient, fragmented manifests.  Attempt
406         to optimize by repacking writes in file sequence.
407
408         """
409         if self._state != _BufferBlock.WRITABLE:
410             raise AssertionError("Cannot repack non-writable block")
411
412         segs = self.owner.segments()
413
414         # Collect the segments that reference the buffer block.
415         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
416
417         # Collect total data referenced by segments (could be smaller than
418         # bufferblock size if a portion of the file was written and
419         # then overwritten).
420         write_total = sum([s.range_size for s in bufferblock_segs])
421
422         if write_total < self.size() or len(bufferblock_segs) > 1:
423             # If there's more than one segment referencing this block, it is
424             # due to out-of-order writes and will produce a fragmented
425             # manifest, so try to optimize by re-packing into a new buffer.
426             contents = self.buffer_view[0:self.write_pointer].tobytes()
427             new_bb = _BufferBlock(None, write_total, None)
428             for t in bufferblock_segs:
429                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
430                 t.segment_offset = new_bb.size() - t.range_size
431
432             self.buffer_block = new_bb.buffer_block
433             self.buffer_view = new_bb.buffer_view
434             self.write_pointer = new_bb.write_pointer
435             self._locator = None
436             new_bb.clear()
437             self.owner.set_segments(segs)
438
439     def __repr__(self):
440         return "<BufferBlock %s>" % (self.blockid)
441
442
443 class NoopLock(object):
444     def __enter__(self):
445         return self
446
447     def __exit__(self, exc_type, exc_value, traceback):
448         pass
449
450     def acquire(self, blocking=False):
451         pass
452
453     def release(self):
454         pass
455
456
457 def must_be_writable(orig_func):
458     @functools.wraps(orig_func)
459     def must_be_writable_wrapper(self, *args, **kwargs):
460         if not self.writable():
461             raise IOError(errno.EROFS, "Collection is read-only.")
462         return orig_func(self, *args, **kwargs)
463     return must_be_writable_wrapper
464
465
466 class _BlockManager(object):
467     """BlockManager handles buffer blocks.
468
469     Also handles background block uploads, and background block prefetch for a
470     Collection of ArvadosFiles.
471
472     """
473
474     DEFAULT_PUT_THREADS = 2
475
476     def __init__(self, keep,
477                  copies=None,
478                  put_threads=None,
479                  num_retries=None,
480                  storage_classes_func=None):
481         """keep: KeepClient object to use"""
482         self._keep = keep
483         self._bufferblocks = collections.OrderedDict()
484         self._put_queue = None
485         self._put_threads = None
486         self.lock = threading.Lock()
487         self.prefetch_lookahead = self._keep.num_prefetch_threads
488         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
489         self.copies = copies
490         self.storage_classes = storage_classes_func or (lambda: [])
491         self._pending_write_size = 0
492         self.threads_lock = threading.Lock()
493         self.padding_block = None
494         self.num_retries = num_retries
495
496     @synchronized
497     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
498         """Allocate a new, empty bufferblock in WRITABLE state and return it.
499
500         :blockid:
501           optional block identifier, otherwise one will be automatically assigned
502
503         :starting_capacity:
504           optional capacity, otherwise will use default capacity
505
506         :owner:
507           ArvadosFile that owns this block
508
509         """
510         return self._alloc_bufferblock(blockid, starting_capacity, owner)
511
512     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
513         if blockid is None:
514             blockid = str(uuid.uuid4())
515         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
516         self._bufferblocks[bufferblock.blockid] = bufferblock
517         return bufferblock
518
519     @synchronized
520     def dup_block(self, block, owner):
521         """Create a new bufferblock initialized with the content of an existing bufferblock.
522
523         :block:
524           the buffer block to copy.
525
526         :owner:
527           ArvadosFile that owns the new block
528
529         """
530         new_blockid = str(uuid.uuid4())
531         bufferblock = block.clone(new_blockid, owner)
532         self._bufferblocks[bufferblock.blockid] = bufferblock
533         return bufferblock
534
535     @synchronized
536     def is_bufferblock(self, locator):
537         return locator in self._bufferblocks
538
539     def _commit_bufferblock_worker(self):
540         """Background uploader thread."""
541
542         while True:
543             try:
544                 bufferblock = self._put_queue.get()
545                 if bufferblock is None:
546                     return
547
548                 if self.copies is None:
549                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
550                 else:
551                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
552                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
553             except Exception as e:
554                 bufferblock.set_state(_BufferBlock.ERROR, e)
555             finally:
556                 if self._put_queue is not None:
557                     self._put_queue.task_done()
558
559     def start_put_threads(self):
560         with self.threads_lock:
561             if self._put_threads is None:
562                 # Start uploader threads.
563
564                 # If we don't limit the Queue size, the upload queue can quickly
565                 # grow to take up gigabytes of RAM if the writing process is
566                 # generating data more quickly than it can be sent to the Keep
567                 # servers.
568                 #
569                 # With two upload threads and a queue size of 2, this means up to 4
570                 # blocks pending.  If they are full 64 MiB blocks, that means up to
571                 # 256 MiB of internal buffering, which is the same size as the
572                 # default download block cache in KeepClient.
573                 self._put_queue = queue.Queue(maxsize=2)
574
575                 self._put_threads = []
576                 for i in range(0, self.num_put_threads):
577                     thread = threading.Thread(target=self._commit_bufferblock_worker)
578                     self._put_threads.append(thread)
579                     thread.daemon = True
580                     thread.start()
581
582     @synchronized
583     def stop_threads(self):
584         """Shut down and wait for background upload and download threads to finish."""
585
586         if self._put_threads is not None:
587             for t in self._put_threads:
588                 self._put_queue.put(None)
589             for t in self._put_threads:
590                 t.join()
591         self._put_threads = None
592         self._put_queue = None
593
594     def __enter__(self):
595         return self
596
597     def __exit__(self, exc_type, exc_value, traceback):
598         self.stop_threads()
599
600     @synchronized
601     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
602         """Packs small blocks together before uploading"""
603
604         self._pending_write_size += closed_file_size
605
606         # Check if there are enough small blocks for filling up one in full
607         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
608             return
609
610         # Search blocks ready for getting packed together before being
611         # committed to Keep.
612         # A WRITABLE block always has an owner.
613         # A WRITABLE block with its owner.closed() implies that its
614         # size is <= KEEP_BLOCK_SIZE/2.
615         try:
616             small_blocks = [b for b in self._bufferblocks.values()
617                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
618         except AttributeError:
619             # Writable blocks without owner shouldn't exist.
620             raise UnownedBlockError()
621
622         if len(small_blocks) <= 1:
623             # Not enough small blocks for repacking
624             return
625
626         for bb in small_blocks:
627             bb.repack_writes()
628
629         # Update the pending write size count with its true value, just in case
630         # some small file was opened, written and closed several times.
631         self._pending_write_size = sum([b.size() for b in small_blocks])
632
633         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
634             return
635
636         new_bb = self._alloc_bufferblock()
637         new_bb.owner = []
638         files = []
639         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
640             bb = small_blocks.pop(0)
641             new_bb.owner.append(bb.owner)
642             self._pending_write_size -= bb.size()
643             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
644             files.append((bb, new_bb.write_pointer - bb.size()))
645
646         self.commit_bufferblock(new_bb, sync=sync)
647
648         for bb, new_bb_segment_offset in files:
649             newsegs = bb.owner.segments()
650             for s in newsegs:
651                 if s.locator == bb.blockid:
652                     s.locator = new_bb.blockid
653                     s.segment_offset = new_bb_segment_offset+s.segment_offset
654             bb.owner.set_segments(newsegs)
655             self._delete_bufferblock(bb.blockid)
656
657     def commit_bufferblock(self, block, sync):
658         """Initiate a background upload of a bufferblock.
659
660         :block:
661           The block object to upload
662
663         :sync:
664           If `sync` is True, upload the block synchronously.
665           If `sync` is False, upload the block asynchronously.  This will
666           return immediately unless the upload queue is at capacity, in
667           which case it will wait on an upload queue slot.
668
669         """
670         try:
671             # Mark the block as PENDING so to disallow any more appends.
672             block.set_state(_BufferBlock.PENDING)
673         except StateChangeError as e:
674             if e.state == _BufferBlock.PENDING:
675                 if sync:
676                     block.wait_for_commit.wait()
677                 else:
678                     return
679             if block.state() == _BufferBlock.COMMITTED:
680                 return
681             elif block.state() == _BufferBlock.ERROR:
682                 raise block.error
683             else:
684                 raise
685
686         if sync:
687             try:
688                 if self.copies is None:
689                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
690                 else:
691                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
692                 block.set_state(_BufferBlock.COMMITTED, loc)
693             except Exception as e:
694                 block.set_state(_BufferBlock.ERROR, e)
695                 raise
696         else:
697             self.start_put_threads()
698             self._put_queue.put(block)
699
700     @synchronized
701     def get_bufferblock(self, locator):
702         return self._bufferblocks.get(locator)
703
704     @synchronized
705     def get_padding_block(self):
706         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
707         when using truncate() to extend the size of a file.
708
709         For reference (and possible future optimization), the md5sum of the
710         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
711
712         """
713
714         if self.padding_block is None:
715             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
716             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
717             self.commit_bufferblock(self.padding_block, False)
718         return self.padding_block
719
720     @synchronized
721     def delete_bufferblock(self, locator):
722         self._delete_bufferblock(locator)
723
724     def _delete_bufferblock(self, locator):
725         if locator in self._bufferblocks:
726             bb = self._bufferblocks[locator]
727             bb.clear()
728             del self._bufferblocks[locator]
729
730     def get_block_contents(self, locator, num_retries, cache_only=False):
731         """Fetch a block.
732
733         First checks to see if the locator is a BufferBlock and return that, if
734         not, passes the request through to KeepClient.get().
735
736         """
737         with self.lock:
738             if locator in self._bufferblocks:
739                 bufferblock = self._bufferblocks[locator]
740                 if bufferblock.state() != _BufferBlock.COMMITTED:
741                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
742                 else:
743                     locator = bufferblock._locator
744         if cache_only:
745             return self._keep.get_from_cache(locator)
746         else:
747             return self._keep.get(locator, num_retries=num_retries)
748
749     def commit_all(self):
750         """Commit all outstanding buffer blocks.
751
752         This is a synchronous call, and will not return until all buffer blocks
753         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
754
755         """
756         self.repack_small_blocks(force=True, sync=True)
757
758         with self.lock:
759             items = list(self._bufferblocks.items())
760
761         for k,v in items:
762             if v.state() != _BufferBlock.COMMITTED and v.owner:
763                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
764                 # state, they're already being committed asynchronously.
765                 if isinstance(v.owner, ArvadosFile):
766                     v.owner.flush(sync=False)
767
768         with self.lock:
769             if self._put_queue is not None:
770                 self._put_queue.join()
771
772                 err = []
773                 for k,v in items:
774                     if v.state() == _BufferBlock.ERROR:
775                         err.append((v.locator(), v.error))
776                 if err:
777                     raise KeepWriteError("Error writing some blocks", err, label="block")
778
779         for k,v in items:
780             # flush again with sync=True to remove committed bufferblocks from
781             # the segments.
782             if v.owner:
783                 if isinstance(v.owner, ArvadosFile):
784                     v.owner.flush(sync=True)
785                 elif isinstance(v.owner, list) and len(v.owner) > 0:
786                     # This bufferblock is referenced by many files as a result
787                     # of repacking small blocks, so don't delete it when flushing
788                     # its owners, just do it after flushing them all.
789                     for owner in v.owner:
790                         owner.flush(sync=True)
791                     self.delete_bufferblock(k)
792
793         self.stop_threads()
794
795     def block_prefetch(self, locator):
796         """Initiate a background download of a block.
797         """
798
799         if not self.prefetch_lookahead:
800             return
801
802         with self.lock:
803             if locator in self._bufferblocks:
804                 return
805
806         self._keep.block_prefetch(locator)
807
808
809 class ArvadosFile(object):
810     """Represent a file in a Collection.
811
812     ArvadosFile manages the underlying representation of a file in Keep as a
813     sequence of segments spanning a set of blocks, and implements random
814     read/write access.
815
816     This object may be accessed from multiple threads.
817
818     """
819
820     __slots__ = ('parent', 'name', '_writers', '_committed',
821                  '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
822
823     def __init__(self, parent, name, stream=[], segments=[]):
824         """
825         ArvadosFile constructor.
826
827         :stream:
828           a list of Range objects representing a block stream
829
830         :segments:
831           a list of Range objects representing segments
832         """
833         self.parent = parent
834         self.name = name
835         self._writers = set()
836         self._committed = False
837         self._segments = []
838         self.lock = parent.root_collection().lock
839         for s in segments:
840             self._add_segment(stream, s.locator, s.range_size)
841         self._current_bblock = None
842         self._read_counter = 0
843
844     def writable(self):
845         return self.parent.writable()
846
847     @synchronized
848     def permission_expired(self, as_of_dt=None):
849         """Returns True if any of the segment's locators is expired"""
850         for r in self._segments:
851             if KeepLocator(r.locator).permission_expired(as_of_dt):
852                 return True
853         return False
854
855     @synchronized
856     def has_remote_blocks(self):
857         """Returns True if any of the segment's locators has a +R signature"""
858
859         for s in self._segments:
860             if '+R' in s.locator:
861                 return True
862         return False
863
864     @synchronized
865     def _copy_remote_blocks(self, remote_blocks={}):
866         """Ask Keep to copy remote blocks and point to their local copies.
867
868         This is called from the parent Collection.
869
870         :remote_blocks:
871             Shared cache of remote to local block mappings. This is used to avoid
872             doing extra work when blocks are shared by more than one file in
873             different subdirectories.
874         """
875
876         for s in self._segments:
877             if '+R' in s.locator:
878                 try:
879                     loc = remote_blocks[s.locator]
880                 except KeyError:
881                     loc = self.parent._my_keep().refresh_signature(s.locator)
882                     remote_blocks[s.locator] = loc
883                 s.locator = loc
884                 self.parent.set_committed(False)
885         return remote_blocks
886
887     @synchronized
888     def segments(self):
889         return copy.copy(self._segments)
890
891     @synchronized
892     def clone(self, new_parent, new_name):
893         """Make a copy of this file."""
894         cp = ArvadosFile(new_parent, new_name)
895         cp.replace_contents(self)
896         return cp
897
898     @must_be_writable
899     @synchronized
900     def replace_contents(self, other):
901         """Replace segments of this file with segments from another `ArvadosFile` object."""
902
903         map_loc = {}
904         self._segments = []
905         for other_segment in other.segments():
906             new_loc = other_segment.locator
907             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
908                 if other_segment.locator not in map_loc:
909                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
910                     if bufferblock.state() != _BufferBlock.WRITABLE:
911                         map_loc[other_segment.locator] = bufferblock.locator()
912                     else:
913                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
914                 new_loc = map_loc[other_segment.locator]
915
916             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
917
918         self.set_committed(False)
919
920     def __eq__(self, other):
921         if other is self:
922             return True
923         if not isinstance(other, ArvadosFile):
924             return False
925
926         othersegs = other.segments()
927         with self.lock:
928             if len(self._segments) != len(othersegs):
929                 return False
930             for i in range(0, len(othersegs)):
931                 seg1 = self._segments[i]
932                 seg2 = othersegs[i]
933                 loc1 = seg1.locator
934                 loc2 = seg2.locator
935
936                 if self.parent._my_block_manager().is_bufferblock(loc1):
937                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
938
939                 if other.parent._my_block_manager().is_bufferblock(loc2):
940                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
941
942                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
943                     seg1.range_start != seg2.range_start or
944                     seg1.range_size != seg2.range_size or
945                     seg1.segment_offset != seg2.segment_offset):
946                     return False
947
948         return True
949
950     def __ne__(self, other):
951         return not self.__eq__(other)
952
953     @synchronized
954     def set_segments(self, segs):
955         self._segments = segs
956
957     @synchronized
958     def set_committed(self, value=True):
959         """Set committed flag.
960
961         If value is True, set committed to be True.
962
963         If value is False, set committed to be False for this and all parents.
964         """
965         if value == self._committed:
966             return
967         self._committed = value
968         if self._committed is False and self.parent is not None:
969             self.parent.set_committed(False)
970
971     @synchronized
972     def committed(self):
973         """Get whether this is committed or not."""
974         return self._committed
975
976     @synchronized
977     def add_writer(self, writer):
978         """Add an ArvadosFileWriter reference to the list of writers"""
979         if isinstance(writer, ArvadosFileWriter):
980             self._writers.add(writer)
981
982     @synchronized
983     def remove_writer(self, writer, flush):
984         """
985         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
986         and do some block maintenance tasks.
987         """
988         self._writers.remove(writer)
989
990         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
991             # File writer closed, not small enough for repacking
992             self.flush()
993         elif self.closed():
994             # All writers closed and size is adequate for repacking
995             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
996
997     def closed(self):
998         """
999         Get whether this is closed or not. When the writers list is empty, the file
1000         is supposed to be closed.
1001         """
1002         return len(self._writers) == 0
1003
1004     @must_be_writable
1005     @synchronized
1006     def truncate(self, size):
1007         """Shrink or expand the size of the file.
1008
1009         If `size` is less than the size of the file, the file contents after
1010         `size` will be discarded.  If `size` is greater than the current size
1011         of the file, it will be filled with zero bytes.
1012
1013         """
1014         if size < self.size():
1015             new_segs = []
1016             for r in self._segments:
1017                 range_end = r.range_start+r.range_size
1018                 if r.range_start >= size:
1019                     # segment is past the trucate size, all done
1020                     break
1021                 elif size < range_end:
1022                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1023                     nr.segment_offset = r.segment_offset
1024                     new_segs.append(nr)
1025                     break
1026                 else:
1027                     new_segs.append(r)
1028
1029             self._segments = new_segs
1030             self.set_committed(False)
1031         elif size > self.size():
1032             padding = self.parent._my_block_manager().get_padding_block()
1033             diff = size - self.size()
1034             while diff > config.KEEP_BLOCK_SIZE:
1035                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1036                 diff -= config.KEEP_BLOCK_SIZE
1037             if diff > 0:
1038                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1039             self.set_committed(False)
1040         else:
1041             # size == self.size()
1042             pass
1043
1044     def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
1045         """Read up to `size` bytes from the file starting at `offset`.
1046
1047         Arguments:
1048
1049         * exact: bool --- If False (default), return less data than
1050          requested if the read crosses a block boundary and the next
1051          block isn't cached.  If True, only return less data than
1052          requested when hitting EOF.
1053
1054         * return_memoryview: bool --- If False (default) return a
1055           `bytes` object, which may entail making a copy in some
1056           situations.  If True, return a `memoryview` object which may
1057           avoid making a copy, but may be incompatible with code
1058           expecting a `bytes` object.
1059
1060         """
1061
1062         with self.lock:
1063             if size == 0 or offset >= self.size():
1064                 return memoryview(b'') if return_memoryview else b''
1065             readsegs = locators_and_ranges(self._segments, offset, size)
1066
1067             prefetch = None
1068             prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1069             if prefetch_lookahead:
1070                 # Doing prefetch on every read() call is surprisingly expensive
1071                 # when we're trying to deliver data at 600+ MiBps and want
1072                 # the read() fast path to be as lightweight as possible.
1073                 #
1074                 # Only prefetching every 128 read operations
1075                 # dramatically reduces the overhead while still
1076                 # getting the benefit of prefetching (e.g. when
1077                 # reading 128 KiB at a time, it checks for prefetch
1078                 # every 16 MiB).
1079                 self._read_counter = (self._read_counter+1) % 128
1080                 if self._read_counter == 1:
1081                     prefetch = locators_and_ranges(self._segments,
1082                                                    offset + size,
1083                                                    config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1084                                                    limit=(1+prefetch_lookahead))
1085
1086         locs = set()
1087         data = []
1088         for lr in readsegs:
1089             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1090             if block:
1091                 blockview = memoryview(block)
1092                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1093                 locs.add(lr.locator)
1094             else:
1095                 break
1096
1097         if prefetch:
1098             for lr in prefetch:
1099                 if lr.locator not in locs:
1100                     self.parent._my_block_manager().block_prefetch(lr.locator)
1101                     locs.add(lr.locator)
1102
1103         if len(data) == 1:
1104             return data[0] if return_memoryview else data[0].tobytes()
1105         else:
1106             return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1107
1108
1109     @must_be_writable
1110     @synchronized
1111     def writeto(self, offset, data, num_retries):
1112         """Write `data` to the file starting at `offset`.
1113
1114         This will update existing bytes and/or extend the size of the file as
1115         necessary.
1116
1117         """
1118         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1119             data = data.encode()
1120         if len(data) == 0:
1121             return
1122
1123         if offset > self.size():
1124             self.truncate(offset)
1125
1126         if len(data) > config.KEEP_BLOCK_SIZE:
1127             # Chunk it up into smaller writes
1128             n = 0
1129             dataview = memoryview(data)
1130             while n < len(data):
1131                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1132                 n += config.KEEP_BLOCK_SIZE
1133             return
1134
1135         self.set_committed(False)
1136
1137         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1138             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1139
1140         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1141             self._current_bblock.repack_writes()
1142             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1143                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1144                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1145
1146         self._current_bblock.append(data)
1147
1148         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1149
1150         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1151
1152         return len(data)
1153
1154     @synchronized
1155     def flush(self, sync=True, num_retries=0):
1156         """Flush the current bufferblock to Keep.
1157
1158         :sync:
1159           If True, commit block synchronously, wait until buffer block has been written.
1160           If False, commit block asynchronously, return immediately after putting block into
1161           the keep put queue.
1162         """
1163         if self.committed():
1164             return
1165
1166         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1167             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1168                 self._current_bblock.repack_writes()
1169             if self._current_bblock.state() != _BufferBlock.DELETED:
1170                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1171
1172         if sync:
1173             to_delete = set()
1174             for s in self._segments:
1175                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1176                 if bb:
1177                     if bb.state() != _BufferBlock.COMMITTED:
1178                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1179                     to_delete.add(s.locator)
1180                     s.locator = bb.locator()
1181             for s in to_delete:
1182                 # Don't delete the bufferblock if it's owned by many files. It'll be
1183                 # deleted after all of its owners are flush()ed.
1184                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1185                     self.parent._my_block_manager().delete_bufferblock(s)
1186
1187         self.parent.notify(MOD, self.parent, self.name, (self, self))
1188
1189     @must_be_writable
1190     @synchronized
1191     def add_segment(self, blocks, pos, size):
1192         """Add a segment to the end of the file.
1193
1194         `pos` and `offset` reference a section of the stream described by
1195         `blocks` (a list of Range objects)
1196
1197         """
1198         self._add_segment(blocks, pos, size)
1199
1200     def _add_segment(self, blocks, pos, size):
1201         """Internal implementation of add_segment."""
1202         self.set_committed(False)
1203         for lr in locators_and_ranges(blocks, pos, size):
1204             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1205             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1206             self._segments.append(r)
1207
1208     @synchronized
1209     def size(self):
1210         """Get the file size."""
1211         if self._segments:
1212             n = self._segments[-1]
1213             return n.range_start + n.range_size
1214         else:
1215             return 0
1216
1217     @synchronized
1218     def manifest_text(self, stream_name=".", portable_locators=False,
1219                       normalize=False, only_committed=False):
1220         buf = ""
1221         filestream = []
1222         for segment in self._segments:
1223             loc = segment.locator
1224             if self.parent._my_block_manager().is_bufferblock(loc):
1225                 if only_committed:
1226                     continue
1227                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1228             if portable_locators:
1229                 loc = KeepLocator(loc).stripped()
1230             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1231                                  segment.segment_offset, segment.range_size))
1232         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1233         buf += "\n"
1234         return buf
1235
1236     @must_be_writable
1237     @synchronized
1238     def _reparent(self, newparent, newname):
1239         self.set_committed(False)
1240         self.flush(sync=True)
1241         self.parent.remove(self.name)
1242         self.parent = newparent
1243         self.name = newname
1244         self.lock = self.parent.root_collection().lock
1245
1246
1247 class ArvadosFileReader(ArvadosFileReaderBase):
1248     """Wraps ArvadosFile in a file-like object supporting reading only.
1249
1250     Be aware that this class is NOT thread safe as there is no locking around
1251     updating file pointer.
1252
1253     """
1254
1255     def __init__(self, arvadosfile, mode="r", num_retries=None):
1256         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1257         self.arvadosfile = arvadosfile
1258
1259     def size(self):
1260         return self.arvadosfile.size()
1261
1262     def stream_name(self):
1263         return self.arvadosfile.parent.stream_name()
1264
1265     def readinto(self, b):
1266         data = self.read(len(b))
1267         b[:len(data)] = data
1268         return len(data)
1269
1270     @_FileLikeObjectBase._before_close
1271     @retry_method
1272     def read(self, size=-1, num_retries=None, return_memoryview=False):
1273         """Read up to `size` bytes from the file and return the result.
1274
1275         Starts at the current file position.  If `size` is negative or None,
1276         read the entire remainder of the file.
1277
1278         Returns None if the file pointer is at the end of the file.
1279
1280         Returns a `bytes` object, unless `return_memoryview` is True,
1281         in which case it returns a memory view, which may avoid an
1282         unnecessary data copy in some situations.
1283
1284         """
1285         if size < 0 or size is None:
1286             data = []
1287             #
1288             # specify exact=False, return_memoryview=True here so that we
1289             # only copy data once into the final buffer.
1290             #
1291             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1292             while rd:
1293                 data.append(rd)
1294                 self._filepos += len(rd)
1295                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1296             return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1297         else:
1298             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1299             self._filepos += len(data)
1300             return data
1301
1302     @_FileLikeObjectBase._before_close
1303     @retry_method
1304     def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1305         """Read up to `size` bytes from the stream, starting at the specified file offset.
1306
1307         This method does not change the file position.
1308
1309         Returns a `bytes` object, unless `return_memoryview` is True,
1310         in which case it returns a memory view, which may avoid an
1311         unnecessary data copy in some situations.
1312
1313         """
1314         return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
1315
1316     def flush(self):
1317         pass
1318
1319
1320 class ArvadosFileWriter(ArvadosFileReader):
1321     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1322
1323     Be aware that this class is NOT thread safe as there is no locking around
1324     updating file pointer.
1325
1326     """
1327
1328     def __init__(self, arvadosfile, mode, num_retries=None):
1329         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1330         self.arvadosfile.add_writer(self)
1331
1332     def writable(self):
1333         return True
1334
1335     @_FileLikeObjectBase._before_close
1336     @retry_method
1337     def write(self, data, num_retries=None):
1338         if self.mode[0] == "a":
1339             self._filepos = self.size()
1340         self.arvadosfile.writeto(self._filepos, data, num_retries)
1341         self._filepos += len(data)
1342         return len(data)
1343
1344     @_FileLikeObjectBase._before_close
1345     @retry_method
1346     def writelines(self, seq, num_retries=None):
1347         for s in seq:
1348             self.write(s, num_retries=num_retries)
1349
1350     @_FileLikeObjectBase._before_close
1351     def truncate(self, size=None):
1352         if size is None:
1353             size = self._filepos
1354         self.arvadosfile.truncate(size)
1355
1356     @_FileLikeObjectBase._before_close
1357     def flush(self):
1358         self.arvadosfile.flush()
1359
1360     def close(self, flush=True):
1361         if not self.closed:
1362             self.arvadosfile.remove_writer(self, flush)
1363             super(ArvadosFileWriter, self).close()
1364
1365
1366 class WrappableFile(object):
1367     """An interface to an Arvados file that's compatible with io wrappers.
1368
1369     """
1370     def __init__(self, f):
1371         self.f = f
1372         self.closed = False
1373     def close(self):
1374         self.closed = True
1375         return self.f.close()
1376     def flush(self):
1377         return self.f.flush()
1378     def read(self, *args, **kwargs):
1379         return self.f.read(*args, **kwargs)
1380     def readable(self):
1381         return self.f.readable()
1382     def readinto(self, *args, **kwargs):
1383         return self.f.readinto(*args, **kwargs)
1384     def seek(self, *args, **kwargs):
1385         return self.f.seek(*args, **kwargs)
1386     def seekable(self):
1387         return self.f.seekable()
1388     def tell(self):
1389         return self.f.tell()
1390     def writable(self):
1391         return self.f.writable()
1392     def write(self, *args, **kwargs):
1393         return self.f.write(*args, **kwargs)