Apply change made to version-at-commit.sh to arvados_version.py
[arvados.git] / sdk / python / arvados / arvfile.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import bz2
6 import collections
7 import copy
8 import errno
9 import functools
10 import hashlib
11 import logging
12 import os
13 import queue
14 import re
15 import sys
16 import threading
17 import uuid
18 import zlib
19
20 from . import config
21 from ._internal import streams
22 from .errors import KeepWriteError, AssertionError, ArgumentError
23 from .keep import KeepLocator
24 from .retry import retry_method
25
26 MOD = "mod"
27 WRITE = "write"
28
29 _logger = logging.getLogger('arvados.arvfile')
30
31 def split(path):
32     """split(path) -> streamname, filename
33
34     Separate the stream name and file name in a /-separated stream path and
35     return a tuple (stream_name, file_name).  If no stream name is available,
36     assume '.'.
37
38     """
39     try:
40         stream_name, file_name = path.rsplit('/', 1)
41     except ValueError:  # No / in string
42         stream_name, file_name = '.', path
43     return stream_name, file_name
44
45
46 class UnownedBlockError(Exception):
47     """Raised when there's an writable block without an owner on the BlockManager."""
48     pass
49
50
51 class _FileLikeObjectBase(object):
52     def __init__(self, name, mode):
53         self.name = name
54         self.mode = mode
55         self.closed = False
56
57     @staticmethod
58     def _before_close(orig_func):
59         @functools.wraps(orig_func)
60         def before_close_wrapper(self, *args, **kwargs):
61             if self.closed:
62                 raise ValueError("I/O operation on closed stream file")
63             return orig_func(self, *args, **kwargs)
64         return before_close_wrapper
65
66     def __enter__(self):
67         return self
68
69     def __exit__(self, exc_type, exc_value, traceback):
70         try:
71             self.close()
72         except Exception:
73             if exc_type is None:
74                 raise
75
76     def close(self):
77         self.closed = True
78
79
80 class ArvadosFileReaderBase(_FileLikeObjectBase):
81     def __init__(self, name, mode, num_retries=None):
82         super(ArvadosFileReaderBase, self).__init__(name, mode)
83         self._filepos = 0
84         self.num_retries = num_retries
85         self._readline_cache = (None, None)
86
87     def __iter__(self):
88         while True:
89             data = self.readline()
90             if not data:
91                 break
92             yield data
93
94     def decompressed_name(self):
95         return re.sub(r'\.(bz2|gz)$', '', self.name)
96
97     @_FileLikeObjectBase._before_close
98     def seek(self, pos, whence=os.SEEK_SET):
99         if whence == os.SEEK_CUR:
100             pos += self._filepos
101         elif whence == os.SEEK_END:
102             pos += self.size()
103         if pos < 0:
104             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
105         self._filepos = pos
106         return self._filepos
107
108     def tell(self):
109         return self._filepos
110
111     def readable(self):
112         return True
113
114     def writable(self):
115         return False
116
117     def seekable(self):
118         return True
119
120     @_FileLikeObjectBase._before_close
121     @retry_method
122     def readall(self, size=2**20, num_retries=None):
123         while True:
124             data = self.read(size, num_retries=num_retries)
125             if len(data) == 0:
126                 break
127             yield data
128
129     @_FileLikeObjectBase._before_close
130     @retry_method
131     def readline(self, size=float('inf'), num_retries=None):
132         cache_pos, cache_data = self._readline_cache
133         if self.tell() == cache_pos:
134             data = [cache_data]
135             self._filepos += len(cache_data)
136         else:
137             data = [b'']
138         data_size = len(data[-1])
139         while (data_size < size) and (b'\n' not in data[-1]):
140             next_read = self.read(2 ** 20, num_retries=num_retries)
141             if not next_read:
142                 break
143             data.append(next_read)
144             data_size += len(next_read)
145         data = b''.join(data)
146         try:
147             nextline_index = data.index(b'\n') + 1
148         except ValueError:
149             nextline_index = len(data)
150         nextline_index = min(nextline_index, size)
151         self._filepos -= len(data) - nextline_index
152         self._readline_cache = (self.tell(), data[nextline_index:])
153         return data[:nextline_index].decode()
154
155     @_FileLikeObjectBase._before_close
156     @retry_method
157     def decompress(self, decompress, size, num_retries=None):
158         for segment in self.readall(size, num_retries=num_retries):
159             data = decompress(segment)
160             if data:
161                 yield data
162
163     @_FileLikeObjectBase._before_close
164     @retry_method
165     def readall_decompressed(self, size=2**20, num_retries=None):
166         self.seek(0)
167         if self.name.endswith('.bz2'):
168             dc = bz2.BZ2Decompressor()
169             return self.decompress(dc.decompress, size,
170                                    num_retries=num_retries)
171         elif self.name.endswith('.gz'):
172             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
173             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
174                                    size, num_retries=num_retries)
175         else:
176             return self.readall(size, num_retries=num_retries)
177
178     @_FileLikeObjectBase._before_close
179     @retry_method
180     def readlines(self, sizehint=float('inf'), num_retries=None):
181         data = []
182         data_size = 0
183         for s in self.readall(num_retries=num_retries):
184             data.append(s)
185             data_size += len(s)
186             if data_size >= sizehint:
187                 break
188         return b''.join(data).decode().splitlines(True)
189
190     def size(self):
191         raise IOError(errno.ENOSYS, "Not implemented")
192
193     def read(self, size, num_retries=None):
194         raise IOError(errno.ENOSYS, "Not implemented")
195
196     def readfrom(self, start, size, num_retries=None):
197         raise IOError(errno.ENOSYS, "Not implemented")
198
199
200 def synchronized(orig_func):
201     @functools.wraps(orig_func)
202     def synchronized_wrapper(self, *args, **kwargs):
203         with self.lock:
204             return orig_func(self, *args, **kwargs)
205     return synchronized_wrapper
206
207
208 class StateChangeError(Exception):
209     def __init__(self, message, state, nextstate):
210         super(StateChangeError, self).__init__(message)
211         self.state = state
212         self.nextstate = nextstate
213
214 class _BufferBlock(object):
215     """A stand-in for a Keep block that is in the process of being written.
216
217     Writers can append to it, get the size, and compute the Keep locator.
218     There are three valid states:
219
220     WRITABLE
221       Can append to block.
222
223     PENDING
224       Block is in the process of being uploaded to Keep, append is an error.
225
226     COMMITTED
227       The block has been written to Keep, its internal buffer has been
228       released, fetching the block will fetch it via keep client (since we
229       discarded the internal copy), and identifiers referring to the BufferBlock
230       can be replaced with the block locator.
231
232     """
233
234     WRITABLE = 0
235     PENDING = 1
236     COMMITTED = 2
237     ERROR = 3
238     DELETED = 4
239
240     def __init__(self, blockid, starting_capacity, owner):
241         """
242         :blockid:
243           the identifier for this block
244
245         :starting_capacity:
246           the initial buffer capacity
247
248         :owner:
249           ArvadosFile that owns this block
250
251         """
252         self.blockid = blockid
253         self.buffer_block = bytearray(starting_capacity)
254         self.buffer_view = memoryview(self.buffer_block)
255         self.write_pointer = 0
256         self._state = _BufferBlock.WRITABLE
257         self._locator = None
258         self.owner = owner
259         self.lock = threading.Lock()
260         self.wait_for_commit = threading.Event()
261         self.error = None
262
263     @synchronized
264     def append(self, data):
265         """Append some data to the buffer.
266
267         Only valid if the block is in WRITABLE state.  Implements an expanding
268         buffer, doubling capacity as needed to accomdate all the data.
269
270         """
271         if self._state == _BufferBlock.WRITABLE:
272             if not isinstance(data, bytes) and not isinstance(data, memoryview):
273                 data = data.encode()
274             while (self.write_pointer+len(data)) > len(self.buffer_block):
275                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
276                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
277                 self.buffer_block = new_buffer_block
278                 self.buffer_view = memoryview(self.buffer_block)
279             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
280             self.write_pointer += len(data)
281             self._locator = None
282         else:
283             raise AssertionError("Buffer block is not writable")
284
285     STATE_TRANSITIONS = frozenset([
286             (WRITABLE, PENDING),
287             (PENDING, COMMITTED),
288             (PENDING, ERROR),
289             (ERROR, PENDING)])
290
291     @synchronized
292     def set_state(self, nextstate, val=None):
293         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
294             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
295         self._state = nextstate
296
297         if self._state == _BufferBlock.PENDING:
298             self.wait_for_commit.clear()
299
300         if self._state == _BufferBlock.COMMITTED:
301             self._locator = val
302             self.buffer_view = None
303             self.buffer_block = None
304             self.wait_for_commit.set()
305
306         if self._state == _BufferBlock.ERROR:
307             self.error = val
308             self.wait_for_commit.set()
309
310     @synchronized
311     def state(self):
312         return self._state
313
314     def size(self):
315         """The amount of data written to the buffer."""
316         return self.write_pointer
317
318     @synchronized
319     def locator(self):
320         """The Keep locator for this buffer's contents."""
321         if self._locator is None:
322             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
323         return self._locator
324
325     @synchronized
326     def clone(self, new_blockid, owner):
327         if self._state == _BufferBlock.COMMITTED:
328             raise AssertionError("Cannot duplicate committed buffer block")
329         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
330         bufferblock.append(self.buffer_view[0:self.size()])
331         return bufferblock
332
333     @synchronized
334     def clear(self):
335         self._state = _BufferBlock.DELETED
336         self.owner = None
337         self.buffer_block = None
338         self.buffer_view = None
339
340     @synchronized
341     def repack_writes(self):
342         """Optimize buffer block by repacking segments in file sequence.
343
344         When the client makes random writes, they appear in the buffer block in
345         the sequence they were written rather than the sequence they appear in
346         the file.  This makes for inefficient, fragmented manifests.  Attempt
347         to optimize by repacking writes in file sequence.
348
349         """
350         if self._state != _BufferBlock.WRITABLE:
351             raise AssertionError("Cannot repack non-writable block")
352
353         segs = self.owner.segments()
354
355         # Collect the segments that reference the buffer block.
356         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
357
358         # Collect total data referenced by segments (could be smaller than
359         # bufferblock size if a portion of the file was written and
360         # then overwritten).
361         write_total = sum([s.range_size for s in bufferblock_segs])
362
363         if write_total < self.size() or len(bufferblock_segs) > 1:
364             # If there's more than one segment referencing this block, it is
365             # due to out-of-order writes and will produce a fragmented
366             # manifest, so try to optimize by re-packing into a new buffer.
367             contents = self.buffer_view[0:self.write_pointer].tobytes()
368             new_bb = _BufferBlock(None, write_total, None)
369             for t in bufferblock_segs:
370                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
371                 t.segment_offset = new_bb.size() - t.range_size
372
373             self.buffer_block = new_bb.buffer_block
374             self.buffer_view = new_bb.buffer_view
375             self.write_pointer = new_bb.write_pointer
376             self._locator = None
377             new_bb.clear()
378             self.owner.set_segments(segs)
379
380     def __repr__(self):
381         return "<BufferBlock %s>" % (self.blockid)
382
383
384 class NoopLock(object):
385     def __enter__(self):
386         return self
387
388     def __exit__(self, exc_type, exc_value, traceback):
389         pass
390
391     def acquire(self, blocking=False):
392         pass
393
394     def release(self):
395         pass
396
397
398 def must_be_writable(orig_func):
399     @functools.wraps(orig_func)
400     def must_be_writable_wrapper(self, *args, **kwargs):
401         if not self.writable():
402             raise IOError(errno.EROFS, "Collection is read-only.")
403         return orig_func(self, *args, **kwargs)
404     return must_be_writable_wrapper
405
406
407 class _BlockManager(object):
408     """BlockManager handles buffer blocks.
409
410     Also handles background block uploads, and background block prefetch for a
411     Collection of ArvadosFiles.
412
413     """
414
415     DEFAULT_PUT_THREADS = 2
416
417     def __init__(self, keep,
418                  copies=None,
419                  put_threads=None,
420                  num_retries=None,
421                  storage_classes_func=None):
422         """keep: KeepClient object to use"""
423         self._keep = keep
424         self._bufferblocks = collections.OrderedDict()
425         self._put_queue = None
426         self._put_threads = None
427         self.lock = threading.Lock()
428         self.prefetch_lookahead = self._keep.num_prefetch_threads
429         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
430         self.copies = copies
431         self.storage_classes = storage_classes_func or (lambda: [])
432         self._pending_write_size = 0
433         self.threads_lock = threading.Lock()
434         self.padding_block = None
435         self.num_retries = num_retries
436
437     @synchronized
438     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
439         """Allocate a new, empty bufferblock in WRITABLE state and return it.
440
441         :blockid:
442           optional block identifier, otherwise one will be automatically assigned
443
444         :starting_capacity:
445           optional capacity, otherwise will use default capacity
446
447         :owner:
448           ArvadosFile that owns this block
449
450         """
451         return self._alloc_bufferblock(blockid, starting_capacity, owner)
452
453     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
454         if blockid is None:
455             blockid = str(uuid.uuid4())
456         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
457         self._bufferblocks[bufferblock.blockid] = bufferblock
458         return bufferblock
459
460     @synchronized
461     def dup_block(self, block, owner):
462         """Create a new bufferblock initialized with the content of an existing bufferblock.
463
464         :block:
465           the buffer block to copy.
466
467         :owner:
468           ArvadosFile that owns the new block
469
470         """
471         new_blockid = str(uuid.uuid4())
472         bufferblock = block.clone(new_blockid, owner)
473         self._bufferblocks[bufferblock.blockid] = bufferblock
474         return bufferblock
475
476     @synchronized
477     def is_bufferblock(self, locator):
478         return locator in self._bufferblocks
479
480     def _commit_bufferblock_worker(self):
481         """Background uploader thread."""
482
483         while True:
484             try:
485                 bufferblock = self._put_queue.get()
486                 if bufferblock is None:
487                     return
488
489                 if self.copies is None:
490                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
491                 else:
492                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
493                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
494             except Exception as e:
495                 bufferblock.set_state(_BufferBlock.ERROR, e)
496             finally:
497                 if self._put_queue is not None:
498                     self._put_queue.task_done()
499
500     def start_put_threads(self):
501         with self.threads_lock:
502             if self._put_threads is None:
503                 # Start uploader threads.
504
505                 # If we don't limit the Queue size, the upload queue can quickly
506                 # grow to take up gigabytes of RAM if the writing process is
507                 # generating data more quickly than it can be sent to the Keep
508                 # servers.
509                 #
510                 # With two upload threads and a queue size of 2, this means up to 4
511                 # blocks pending.  If they are full 64 MiB blocks, that means up to
512                 # 256 MiB of internal buffering, which is the same size as the
513                 # default download block cache in KeepClient.
514                 self._put_queue = queue.Queue(maxsize=2)
515
516                 self._put_threads = []
517                 for i in range(0, self.num_put_threads):
518                     thread = threading.Thread(target=self._commit_bufferblock_worker)
519                     self._put_threads.append(thread)
520                     thread.daemon = True
521                     thread.start()
522
523     @synchronized
524     def stop_threads(self):
525         """Shut down and wait for background upload and download threads to finish."""
526
527         if self._put_threads is not None:
528             for t in self._put_threads:
529                 self._put_queue.put(None)
530             for t in self._put_threads:
531                 t.join()
532         self._put_threads = None
533         self._put_queue = None
534
535     def __enter__(self):
536         return self
537
538     def __exit__(self, exc_type, exc_value, traceback):
539         self.stop_threads()
540
541     @synchronized
542     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
543         """Packs small blocks together before uploading"""
544
545         self._pending_write_size += closed_file_size
546
547         # Check if there are enough small blocks for filling up one in full
548         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
549             return
550
551         # Search blocks ready for getting packed together before being
552         # committed to Keep.
553         # A WRITABLE block always has an owner.
554         # A WRITABLE block with its owner.closed() implies that its
555         # size is <= KEEP_BLOCK_SIZE/2.
556         try:
557             small_blocks = [b for b in self._bufferblocks.values()
558                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
559         except AttributeError:
560             # Writable blocks without owner shouldn't exist.
561             raise UnownedBlockError()
562
563         if len(small_blocks) <= 1:
564             # Not enough small blocks for repacking
565             return
566
567         for bb in small_blocks:
568             bb.repack_writes()
569
570         # Update the pending write size count with its true value, just in case
571         # some small file was opened, written and closed several times.
572         self._pending_write_size = sum([b.size() for b in small_blocks])
573
574         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
575             return
576
577         new_bb = self._alloc_bufferblock()
578         new_bb.owner = []
579         files = []
580         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
581             bb = small_blocks.pop(0)
582             new_bb.owner.append(bb.owner)
583             self._pending_write_size -= bb.size()
584             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
585             files.append((bb, new_bb.write_pointer - bb.size()))
586
587         self.commit_bufferblock(new_bb, sync=sync)
588
589         for bb, new_bb_segment_offset in files:
590             newsegs = bb.owner.segments()
591             for s in newsegs:
592                 if s.locator == bb.blockid:
593                     s.locator = new_bb.blockid
594                     s.segment_offset = new_bb_segment_offset+s.segment_offset
595             bb.owner.set_segments(newsegs)
596             self._delete_bufferblock(bb.blockid)
597
598     def commit_bufferblock(self, block, sync):
599         """Initiate a background upload of a bufferblock.
600
601         :block:
602           The block object to upload
603
604         :sync:
605           If `sync` is True, upload the block synchronously.
606           If `sync` is False, upload the block asynchronously.  This will
607           return immediately unless the upload queue is at capacity, in
608           which case it will wait on an upload queue slot.
609
610         """
611         try:
612             # Mark the block as PENDING so to disallow any more appends.
613             block.set_state(_BufferBlock.PENDING)
614         except StateChangeError as e:
615             if e.state == _BufferBlock.PENDING:
616                 if sync:
617                     block.wait_for_commit.wait()
618                 else:
619                     return
620             if block.state() == _BufferBlock.COMMITTED:
621                 return
622             elif block.state() == _BufferBlock.ERROR:
623                 raise block.error
624             else:
625                 raise
626
627         if sync:
628             try:
629                 if self.copies is None:
630                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
631                 else:
632                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
633                 block.set_state(_BufferBlock.COMMITTED, loc)
634             except Exception as e:
635                 block.set_state(_BufferBlock.ERROR, e)
636                 raise
637         else:
638             self.start_put_threads()
639             self._put_queue.put(block)
640
641     @synchronized
642     def get_bufferblock(self, locator):
643         return self._bufferblocks.get(locator)
644
645     @synchronized
646     def get_padding_block(self):
647         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
648         when using truncate() to extend the size of a file.
649
650         For reference (and possible future optimization), the md5sum of the
651         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
652
653         """
654
655         if self.padding_block is None:
656             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
657             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
658             self.commit_bufferblock(self.padding_block, False)
659         return self.padding_block
660
661     @synchronized
662     def delete_bufferblock(self, locator):
663         self._delete_bufferblock(locator)
664
665     def _delete_bufferblock(self, locator):
666         if locator in self._bufferblocks:
667             bb = self._bufferblocks[locator]
668             bb.clear()
669             del self._bufferblocks[locator]
670
671     def get_block_contents(self, locator, num_retries, cache_only=False):
672         """Fetch a block.
673
674         First checks to see if the locator is a BufferBlock and return that, if
675         not, passes the request through to KeepClient.get().
676
677         """
678         with self.lock:
679             if locator in self._bufferblocks:
680                 bufferblock = self._bufferblocks[locator]
681                 if bufferblock.state() != _BufferBlock.COMMITTED:
682                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
683                 else:
684                     locator = bufferblock._locator
685         if cache_only:
686             return self._keep.get_from_cache(locator)
687         else:
688             return self._keep.get(locator, num_retries=num_retries)
689
690     def commit_all(self):
691         """Commit all outstanding buffer blocks.
692
693         This is a synchronous call, and will not return until all buffer blocks
694         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
695
696         """
697         self.repack_small_blocks(force=True, sync=True)
698
699         with self.lock:
700             items = list(self._bufferblocks.items())
701
702         for k,v in items:
703             if v.state() != _BufferBlock.COMMITTED and v.owner:
704                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
705                 # state, they're already being committed asynchronously.
706                 if isinstance(v.owner, ArvadosFile):
707                     v.owner.flush(sync=False)
708
709         with self.lock:
710             if self._put_queue is not None:
711                 self._put_queue.join()
712
713                 err = []
714                 for k,v in items:
715                     if v.state() == _BufferBlock.ERROR:
716                         err.append((v.locator(), v.error))
717                 if err:
718                     raise KeepWriteError("Error writing some blocks", err, label="block")
719
720         for k,v in items:
721             # flush again with sync=True to remove committed bufferblocks from
722             # the segments.
723             if v.owner:
724                 if isinstance(v.owner, ArvadosFile):
725                     v.owner.flush(sync=True)
726                 elif isinstance(v.owner, list) and len(v.owner) > 0:
727                     # This bufferblock is referenced by many files as a result
728                     # of repacking small blocks, so don't delete it when flushing
729                     # its owners, just do it after flushing them all.
730                     for owner in v.owner:
731                         owner.flush(sync=True)
732                     self.delete_bufferblock(k)
733
734         self.stop_threads()
735
736     def block_prefetch(self, locator):
737         """Initiate a background download of a block.
738         """
739
740         if not self.prefetch_lookahead:
741             return
742
743         with self.lock:
744             if locator in self._bufferblocks:
745                 return
746
747         self._keep.block_prefetch(locator)
748
749
750 class ArvadosFile(object):
751     """Represent a file in a Collection.
752
753     ArvadosFile manages the underlying representation of a file in Keep as a
754     sequence of segments spanning a set of blocks, and implements random
755     read/write access.
756
757     This object may be accessed from multiple threads.
758
759     """
760
761     __slots__ = ('parent', 'name', '_writers', '_committed',
762                  '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
763
764     def __init__(self, parent, name, stream=[], segments=[]):
765         """
766         ArvadosFile constructor.
767
768         :stream:
769           a list of Range objects representing a block stream
770
771         :segments:
772           a list of Range objects representing segments
773         """
774         self.parent = parent
775         self.name = name
776         self._writers = set()
777         self._committed = False
778         self._segments = []
779         self.lock = parent.root_collection().lock
780         for s in segments:
781             self._add_segment(stream, s.locator, s.range_size)
782         self._current_bblock = None
783         self._read_counter = 0
784
785     def writable(self):
786         return self.parent.writable()
787
788     @synchronized
789     def permission_expired(self, as_of_dt=None):
790         """Returns True if any of the segment's locators is expired"""
791         for r in self._segments:
792             if KeepLocator(r.locator).permission_expired(as_of_dt):
793                 return True
794         return False
795
796     @synchronized
797     def has_remote_blocks(self):
798         """Returns True if any of the segment's locators has a +R signature"""
799
800         for s in self._segments:
801             if '+R' in s.locator:
802                 return True
803         return False
804
805     @synchronized
806     def _copy_remote_blocks(self, remote_blocks={}):
807         """Ask Keep to copy remote blocks and point to their local copies.
808
809         This is called from the parent Collection.
810
811         :remote_blocks:
812             Shared cache of remote to local block mappings. This is used to avoid
813             doing extra work when blocks are shared by more than one file in
814             different subdirectories.
815         """
816
817         for s in self._segments:
818             if '+R' in s.locator:
819                 try:
820                     loc = remote_blocks[s.locator]
821                 except KeyError:
822                     loc = self.parent._my_keep().refresh_signature(s.locator)
823                     remote_blocks[s.locator] = loc
824                 s.locator = loc
825                 self.parent.set_committed(False)
826         return remote_blocks
827
828     @synchronized
829     def segments(self):
830         return copy.copy(self._segments)
831
832     @synchronized
833     def clone(self, new_parent, new_name):
834         """Make a copy of this file."""
835         cp = ArvadosFile(new_parent, new_name)
836         cp.replace_contents(self)
837         return cp
838
839     @must_be_writable
840     @synchronized
841     def replace_contents(self, other):
842         """Replace segments of this file with segments from another `ArvadosFile` object."""
843
844         map_loc = {}
845         self._segments = []
846         for other_segment in other.segments():
847             new_loc = other_segment.locator
848             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
849                 if other_segment.locator not in map_loc:
850                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
851                     if bufferblock.state() != _BufferBlock.WRITABLE:
852                         map_loc[other_segment.locator] = bufferblock.locator()
853                     else:
854                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
855                 new_loc = map_loc[other_segment.locator]
856
857             self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
858
859         self.set_committed(False)
860
861     def __eq__(self, other):
862         if other is self:
863             return True
864         if not isinstance(other, ArvadosFile):
865             return False
866
867         othersegs = other.segments()
868         with self.lock:
869             if len(self._segments) != len(othersegs):
870                 return False
871             for i in range(0, len(othersegs)):
872                 seg1 = self._segments[i]
873                 seg2 = othersegs[i]
874                 loc1 = seg1.locator
875                 loc2 = seg2.locator
876
877                 if self.parent._my_block_manager().is_bufferblock(loc1):
878                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
879
880                 if other.parent._my_block_manager().is_bufferblock(loc2):
881                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
882
883                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
884                     seg1.range_start != seg2.range_start or
885                     seg1.range_size != seg2.range_size or
886                     seg1.segment_offset != seg2.segment_offset):
887                     return False
888
889         return True
890
891     def __ne__(self, other):
892         return not self.__eq__(other)
893
894     @synchronized
895     def set_segments(self, segs):
896         self._segments = segs
897
898     @synchronized
899     def set_committed(self, value=True):
900         """Set committed flag.
901
902         If value is True, set committed to be True.
903
904         If value is False, set committed to be False for this and all parents.
905         """
906         if value == self._committed:
907             return
908         self._committed = value
909         if self._committed is False and self.parent is not None:
910             self.parent.set_committed(False)
911
912     @synchronized
913     def committed(self):
914         """Get whether this is committed or not."""
915         return self._committed
916
917     @synchronized
918     def add_writer(self, writer):
919         """Add an ArvadosFileWriter reference to the list of writers"""
920         if isinstance(writer, ArvadosFileWriter):
921             self._writers.add(writer)
922
923     @synchronized
924     def remove_writer(self, writer, flush):
925         """
926         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
927         and do some block maintenance tasks.
928         """
929         self._writers.remove(writer)
930
931         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
932             # File writer closed, not small enough for repacking
933             self.flush()
934         elif self.closed():
935             # All writers closed and size is adequate for repacking
936             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
937
938     def closed(self):
939         """
940         Get whether this is closed or not. When the writers list is empty, the file
941         is supposed to be closed.
942         """
943         return len(self._writers) == 0
944
945     @must_be_writable
946     @synchronized
947     def truncate(self, size):
948         """Shrink or expand the size of the file.
949
950         If `size` is less than the size of the file, the file contents after
951         `size` will be discarded.  If `size` is greater than the current size
952         of the file, it will be filled with zero bytes.
953
954         """
955         if size < self.size():
956             new_segs = []
957             for r in self._segments:
958                 range_end = r.range_start+r.range_size
959                 if r.range_start >= size:
960                     # segment is past the trucate size, all done
961                     break
962                 elif size < range_end:
963                     nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0)
964                     nr.segment_offset = r.segment_offset
965                     new_segs.append(nr)
966                     break
967                 else:
968                     new_segs.append(r)
969
970             self._segments = new_segs
971             self.set_committed(False)
972         elif size > self.size():
973             padding = self.parent._my_block_manager().get_padding_block()
974             diff = size - self.size()
975             while diff > config.KEEP_BLOCK_SIZE:
976                 self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
977                 diff -= config.KEEP_BLOCK_SIZE
978             if diff > 0:
979                 self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0))
980             self.set_committed(False)
981         else:
982             # size == self.size()
983             pass
984
985     def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
986         """Read up to `size` bytes from the file starting at `offset`.
987
988         Arguments:
989
990         * exact: bool --- If False (default), return less data than
991          requested if the read crosses a block boundary and the next
992          block isn't cached.  If True, only return less data than
993          requested when hitting EOF.
994
995         * return_memoryview: bool --- If False (default) return a
996           `bytes` object, which may entail making a copy in some
997           situations.  If True, return a `memoryview` object which may
998           avoid making a copy, but may be incompatible with code
999           expecting a `bytes` object.
1000
1001         """
1002
1003         with self.lock:
1004             if size == 0 or offset >= self.size():
1005                 return memoryview(b'') if return_memoryview else b''
1006             readsegs = streams.locators_and_ranges(self._segments, offset, size)
1007
1008             prefetch = None
1009             prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1010             if prefetch_lookahead:
1011                 # Doing prefetch on every read() call is surprisingly expensive
1012                 # when we're trying to deliver data at 600+ MiBps and want
1013                 # the read() fast path to be as lightweight as possible.
1014                 #
1015                 # Only prefetching every 128 read operations
1016                 # dramatically reduces the overhead while still
1017                 # getting the benefit of prefetching (e.g. when
1018                 # reading 128 KiB at a time, it checks for prefetch
1019                 # every 16 MiB).
1020                 self._read_counter = (self._read_counter+1) % 128
1021                 if self._read_counter == 1:
1022                     prefetch = streams.locators_and_ranges(
1023                         self._segments,
1024                         offset + size,
1025                         config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1026                         limit=(1+prefetch_lookahead),
1027                     )
1028
1029         locs = set()
1030         data = []
1031         for lr in readsegs:
1032             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1033             if block:
1034                 blockview = memoryview(block)
1035                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1036                 locs.add(lr.locator)
1037             else:
1038                 break
1039
1040         if prefetch:
1041             for lr in prefetch:
1042                 if lr.locator not in locs:
1043                     self.parent._my_block_manager().block_prefetch(lr.locator)
1044                     locs.add(lr.locator)
1045
1046         if len(data) == 1:
1047             return data[0] if return_memoryview else data[0].tobytes()
1048         else:
1049             return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1050
1051
1052     @must_be_writable
1053     @synchronized
1054     def writeto(self, offset, data, num_retries):
1055         """Write `data` to the file starting at `offset`.
1056
1057         This will update existing bytes and/or extend the size of the file as
1058         necessary.
1059
1060         """
1061         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1062             data = data.encode()
1063         if len(data) == 0:
1064             return
1065
1066         if offset > self.size():
1067             self.truncate(offset)
1068
1069         if len(data) > config.KEEP_BLOCK_SIZE:
1070             # Chunk it up into smaller writes
1071             n = 0
1072             dataview = memoryview(data)
1073             while n < len(data):
1074                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1075                 n += config.KEEP_BLOCK_SIZE
1076             return
1077
1078         self.set_committed(False)
1079
1080         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1081             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1082
1083         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1084             self._current_bblock.repack_writes()
1085             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1086                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1087                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1088
1089         self._current_bblock.append(data)
1090         streams.replace_range(
1091             self._segments,
1092             offset,
1093             len(data),
1094             self._current_bblock.blockid,
1095             self._current_bblock.write_pointer - len(data),
1096         )
1097         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1098         return len(data)
1099
1100     @synchronized
1101     def flush(self, sync=True, num_retries=0):
1102         """Flush the current bufferblock to Keep.
1103
1104         :sync:
1105           If True, commit block synchronously, wait until buffer block has been written.
1106           If False, commit block asynchronously, return immediately after putting block into
1107           the keep put queue.
1108         """
1109         if self.committed():
1110             return
1111
1112         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1113             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1114                 self._current_bblock.repack_writes()
1115             if self._current_bblock.state() != _BufferBlock.DELETED:
1116                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1117
1118         if sync:
1119             to_delete = set()
1120             for s in self._segments:
1121                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1122                 if bb:
1123                     if bb.state() != _BufferBlock.COMMITTED:
1124                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1125                     to_delete.add(s.locator)
1126                     s.locator = bb.locator()
1127             for s in to_delete:
1128                 # Don't delete the bufferblock if it's owned by many files. It'll be
1129                 # deleted after all of its owners are flush()ed.
1130                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1131                     self.parent._my_block_manager().delete_bufferblock(s)
1132
1133         self.parent.notify(MOD, self.parent, self.name, (self, self))
1134
1135     @must_be_writable
1136     @synchronized
1137     def add_segment(self, blocks, pos, size):
1138         """Add a segment to the end of the file.
1139
1140         `pos` and `offset` reference a section of the stream described by
1141         `blocks` (a list of Range objects)
1142
1143         """
1144         self._add_segment(blocks, pos, size)
1145
1146     def _add_segment(self, blocks, pos, size):
1147         """Internal implementation of add_segment."""
1148         self.set_committed(False)
1149         for lr in streams.locators_and_ranges(blocks, pos, size):
1150             last = self._segments[-1] if self._segments else streams.Range(0, 0, 0, 0)
1151             r = streams.Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1152             self._segments.append(r)
1153
1154     @synchronized
1155     def size(self):
1156         """Get the file size."""
1157         if self._segments:
1158             n = self._segments[-1]
1159             return n.range_start + n.range_size
1160         else:
1161             return 0
1162
1163     @synchronized
1164     def manifest_text(self, stream_name=".", portable_locators=False,
1165                       normalize=False, only_committed=False):
1166         buf = ""
1167         filestream = []
1168         for segment in self._segments:
1169             loc = segment.locator
1170             if self.parent._my_block_manager().is_bufferblock(loc):
1171                 if only_committed:
1172                     continue
1173                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1174             if portable_locators:
1175                 loc = KeepLocator(loc).stripped()
1176             filestream.append(streams.LocatorAndRange(
1177                 loc,
1178                 KeepLocator(loc).size,
1179                 segment.segment_offset,
1180                 segment.range_size,
1181             ))
1182         buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream}))
1183         buf += "\n"
1184         return buf
1185
1186     @must_be_writable
1187     @synchronized
1188     def _reparent(self, newparent, newname):
1189         self.set_committed(False)
1190         self.flush(sync=True)
1191         self.parent.remove(self.name)
1192         self.parent = newparent
1193         self.name = newname
1194         self.lock = self.parent.root_collection().lock
1195
1196
1197 class ArvadosFileReader(ArvadosFileReaderBase):
1198     """Wraps ArvadosFile in a file-like object supporting reading only.
1199
1200     Be aware that this class is NOT thread safe as there is no locking around
1201     updating file pointer.
1202
1203     """
1204
1205     def __init__(self, arvadosfile, mode="r", num_retries=None):
1206         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1207         self.arvadosfile = arvadosfile
1208
1209     def size(self):
1210         return self.arvadosfile.size()
1211
1212     def stream_name(self):
1213         return self.arvadosfile.parent.stream_name()
1214
1215     def readinto(self, b):
1216         data = self.read(len(b))
1217         b[:len(data)] = data
1218         return len(data)
1219
1220     @_FileLikeObjectBase._before_close
1221     @retry_method
1222     def read(self, size=-1, num_retries=None, return_memoryview=False):
1223         """Read up to `size` bytes from the file and return the result.
1224
1225         Starts at the current file position.  If `size` is negative or None,
1226         read the entire remainder of the file.
1227
1228         Returns None if the file pointer is at the end of the file.
1229
1230         Returns a `bytes` object, unless `return_memoryview` is True,
1231         in which case it returns a memory view, which may avoid an
1232         unnecessary data copy in some situations.
1233
1234         """
1235         if size < 0 or size is None:
1236             data = []
1237             #
1238             # specify exact=False, return_memoryview=True here so that we
1239             # only copy data once into the final buffer.
1240             #
1241             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1242             while rd:
1243                 data.append(rd)
1244                 self._filepos += len(rd)
1245                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1246             return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1247         else:
1248             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1249             self._filepos += len(data)
1250             return data
1251
1252     @_FileLikeObjectBase._before_close
1253     @retry_method
1254     def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1255         """Read up to `size` bytes from the stream, starting at the specified file offset.
1256
1257         This method does not change the file position.
1258
1259         Returns a `bytes` object, unless `return_memoryview` is True,
1260         in which case it returns a memory view, which may avoid an
1261         unnecessary data copy in some situations.
1262
1263         """
1264         return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
1265
1266     def flush(self):
1267         pass
1268
1269
1270 class ArvadosFileWriter(ArvadosFileReader):
1271     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1272
1273     Be aware that this class is NOT thread safe as there is no locking around
1274     updating file pointer.
1275
1276     """
1277
1278     def __init__(self, arvadosfile, mode, num_retries=None):
1279         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1280         self.arvadosfile.add_writer(self)
1281
1282     def writable(self):
1283         return True
1284
1285     @_FileLikeObjectBase._before_close
1286     @retry_method
1287     def write(self, data, num_retries=None):
1288         if self.mode[0] == "a":
1289             self._filepos = self.size()
1290         self.arvadosfile.writeto(self._filepos, data, num_retries)
1291         self._filepos += len(data)
1292         return len(data)
1293
1294     @_FileLikeObjectBase._before_close
1295     @retry_method
1296     def writelines(self, seq, num_retries=None):
1297         for s in seq:
1298             self.write(s, num_retries=num_retries)
1299
1300     @_FileLikeObjectBase._before_close
1301     def truncate(self, size=None):
1302         if size is None:
1303             size = self._filepos
1304         self.arvadosfile.truncate(size)
1305
1306     @_FileLikeObjectBase._before_close
1307     def flush(self):
1308         self.arvadosfile.flush()
1309
1310     def close(self, flush=True):
1311         if not self.closed:
1312             self.arvadosfile.remove_writer(self, flush)
1313             super(ArvadosFileWriter, self).close()
1314
1315
1316 class WrappableFile(object):
1317     """An interface to an Arvados file that's compatible with io wrappers.
1318
1319     """
1320     def __init__(self, f):
1321         self.f = f
1322         self.closed = False
1323     def close(self):
1324         self.closed = True
1325         return self.f.close()
1326     def flush(self):
1327         return self.f.flush()
1328     def read(self, *args, **kwargs):
1329         return self.f.read(*args, **kwargs)
1330     def readable(self):
1331         return self.f.readable()
1332     def readinto(self, *args, **kwargs):
1333         return self.f.readinto(*args, **kwargs)
1334     def seek(self, *args, **kwargs):
1335         return self.f.seek(*args, **kwargs)
1336     def seekable(self):
1337         return self.f.seekable()
1338     def tell(self):
1339         return self.f.tell()
1340     def writable(self):
1341         return self.f.writable()
1342     def write(self, *args, **kwargs):
1343         return self.f.write(*args, **kwargs)