b21ebd331769109ca0cca3b23e2c45f17ed2c9ed
[arvados.git] / sdk / python / arvados / arvfile.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
12 import bz2
13 import collections
14 import copy
15 import errno
16 import functools
17 import hashlib
18 import logging
19 import os
20 import queue
21 import re
22 import sys
23 import threading
24 import uuid
25 import zlib
26
27 from . import config
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
33
34 MOD = "mod"
35 WRITE = "write"
36
37 _logger = logging.getLogger('arvados.arvfile')
38
39 def split(path):
40     """split(path) -> streamname, filename
41
42     Separate the stream name and file name in a /-separated stream path and
43     return a tuple (stream_name, file_name).  If no stream name is available,
44     assume '.'.
45
46     """
47     try:
48         stream_name, file_name = path.rsplit('/', 1)
49     except ValueError:  # No / in string
50         stream_name, file_name = '.', path
51     return stream_name, file_name
52
53
54 class UnownedBlockError(Exception):
55     """Raised when there's an writable block without an owner on the BlockManager."""
56     pass
57
58
59 class _FileLikeObjectBase(object):
60     def __init__(self, name, mode):
61         self.name = name
62         self.mode = mode
63         self.closed = False
64
65     @staticmethod
66     def _before_close(orig_func):
67         @functools.wraps(orig_func)
68         def before_close_wrapper(self, *args, **kwargs):
69             if self.closed:
70                 raise ValueError("I/O operation on closed stream file")
71             return orig_func(self, *args, **kwargs)
72         return before_close_wrapper
73
74     def __enter__(self):
75         return self
76
77     def __exit__(self, exc_type, exc_value, traceback):
78         try:
79             self.close()
80         except Exception:
81             if exc_type is None:
82                 raise
83
84     def close(self):
85         self.closed = True
86
87
88 class ArvadosFileReaderBase(_FileLikeObjectBase):
89     def __init__(self, name, mode, num_retries=None):
90         super(ArvadosFileReaderBase, self).__init__(name, mode)
91         self._filepos = 0
92         self.num_retries = num_retries
93         self._readline_cache = (None, None)
94
95     def __iter__(self):
96         while True:
97             data = self.readline()
98             if not data:
99                 break
100             yield data
101
102     def decompressed_name(self):
103         return re.sub('\.(bz2|gz)$', '', self.name)
104
105     @_FileLikeObjectBase._before_close
106     def seek(self, pos, whence=os.SEEK_SET):
107         if whence == os.SEEK_CUR:
108             pos += self._filepos
109         elif whence == os.SEEK_END:
110             pos += self.size()
111         if pos < 0:
112             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
113         self._filepos = pos
114         return self._filepos
115
116     def tell(self):
117         return self._filepos
118
119     def readable(self):
120         return True
121
122     def writable(self):
123         return False
124
125     def seekable(self):
126         return True
127
128     @_FileLikeObjectBase._before_close
129     @retry_method
130     def readall(self, size=2**20, num_retries=None):
131         while True:
132             data = self.read(size, num_retries=num_retries)
133             if len(data) == 0:
134                 break
135             yield data
136
137     @_FileLikeObjectBase._before_close
138     @retry_method
139     def readline(self, size=float('inf'), num_retries=None):
140         cache_pos, cache_data = self._readline_cache
141         if self.tell() == cache_pos:
142             data = [cache_data]
143             self._filepos += len(cache_data)
144         else:
145             data = [b'']
146         data_size = len(data[-1])
147         while (data_size < size) and (b'\n' not in data[-1]):
148             next_read = self.read(2 ** 20, num_retries=num_retries)
149             if not next_read:
150                 break
151             data.append(next_read)
152             data_size += len(next_read)
153         data = b''.join(data)
154         try:
155             nextline_index = data.index(b'\n') + 1
156         except ValueError:
157             nextline_index = len(data)
158         nextline_index = min(nextline_index, size)
159         self._filepos -= len(data) - nextline_index
160         self._readline_cache = (self.tell(), data[nextline_index:])
161         return data[:nextline_index].decode()
162
163     @_FileLikeObjectBase._before_close
164     @retry_method
165     def decompress(self, decompress, size, num_retries=None):
166         for segment in self.readall(size, num_retries=num_retries):
167             data = decompress(segment)
168             if data:
169                 yield data
170
171     @_FileLikeObjectBase._before_close
172     @retry_method
173     def readall_decompressed(self, size=2**20, num_retries=None):
174         self.seek(0)
175         if self.name.endswith('.bz2'):
176             dc = bz2.BZ2Decompressor()
177             return self.decompress(dc.decompress, size,
178                                    num_retries=num_retries)
179         elif self.name.endswith('.gz'):
180             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
181             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
182                                    size, num_retries=num_retries)
183         else:
184             return self.readall(size, num_retries=num_retries)
185
186     @_FileLikeObjectBase._before_close
187     @retry_method
188     def readlines(self, sizehint=float('inf'), num_retries=None):
189         data = []
190         data_size = 0
191         for s in self.readall(num_retries=num_retries):
192             data.append(s)
193             data_size += len(s)
194             if data_size >= sizehint:
195                 break
196         return b''.join(data).decode().splitlines(True)
197
198     def size(self):
199         raise IOError(errno.ENOSYS, "Not implemented")
200
201     def read(self, size, num_retries=None):
202         raise IOError(errno.ENOSYS, "Not implemented")
203
204     def readfrom(self, start, size, num_retries=None):
205         raise IOError(errno.ENOSYS, "Not implemented")
206
207
208 class StreamFileReader(ArvadosFileReaderBase):
209     class _NameAttribute(str):
210         # The Python file API provides a plain .name attribute.
211         # Older SDK provided a name() method.
212         # This class provides both, for maximum compatibility.
213         def __call__(self):
214             return self
215
216     def __init__(self, stream, segments, name):
217         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
218         self._stream = stream
219         self.segments = segments
220
221     def stream_name(self):
222         return self._stream.name()
223
224     def size(self):
225         n = self.segments[-1]
226         return n.range_start + n.range_size
227
228     @_FileLikeObjectBase._before_close
229     @retry_method
230     def read(self, size, num_retries=None):
231         """Read up to 'size' bytes from the stream, starting at the current file position"""
232         if size == 0:
233             return b''
234
235         data = b''
236         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
237         if available_chunks:
238             lr = available_chunks[0]
239             data = self._stream.readfrom(lr.locator+lr.segment_offset,
240                                          lr.segment_size,
241                                          num_retries=num_retries)
242
243         self._filepos += len(data)
244         return data
245
246     @_FileLikeObjectBase._before_close
247     @retry_method
248     def readfrom(self, start, size, num_retries=None):
249         """Read up to 'size' bytes from the stream, starting at 'start'"""
250         if size == 0:
251             return b''
252
253         data = []
254         for lr in locators_and_ranges(self.segments, start, size):
255             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
256                                               num_retries=num_retries))
257         return b''.join(data)
258
259     def as_manifest(self):
260         segs = []
261         for r in self.segments:
262             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
263         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
264
265
266 def synchronized(orig_func):
267     @functools.wraps(orig_func)
268     def synchronized_wrapper(self, *args, **kwargs):
269         with self.lock:
270             return orig_func(self, *args, **kwargs)
271     return synchronized_wrapper
272
273
274 class StateChangeError(Exception):
275     def __init__(self, message, state, nextstate):
276         super(StateChangeError, self).__init__(message)
277         self.state = state
278         self.nextstate = nextstate
279
280 class _BufferBlock(object):
281     """A stand-in for a Keep block that is in the process of being written.
282
283     Writers can append to it, get the size, and compute the Keep locator.
284     There are three valid states:
285
286     WRITABLE
287       Can append to block.
288
289     PENDING
290       Block is in the process of being uploaded to Keep, append is an error.
291
292     COMMITTED
293       The block has been written to Keep, its internal buffer has been
294       released, fetching the block will fetch it via keep client (since we
295       discarded the internal copy), and identifiers referring to the BufferBlock
296       can be replaced with the block locator.
297
298     """
299
300     WRITABLE = 0
301     PENDING = 1
302     COMMITTED = 2
303     ERROR = 3
304     DELETED = 4
305
306     def __init__(self, blockid, starting_capacity, owner):
307         """
308         :blockid:
309           the identifier for this block
310
311         :starting_capacity:
312           the initial buffer capacity
313
314         :owner:
315           ArvadosFile that owns this block
316
317         """
318         self.blockid = blockid
319         self.buffer_block = bytearray(starting_capacity)
320         self.buffer_view = memoryview(self.buffer_block)
321         self.write_pointer = 0
322         self._state = _BufferBlock.WRITABLE
323         self._locator = None
324         self.owner = owner
325         self.lock = threading.Lock()
326         self.wait_for_commit = threading.Event()
327         self.error = None
328
329     @synchronized
330     def append(self, data):
331         """Append some data to the buffer.
332
333         Only valid if the block is in WRITABLE state.  Implements an expanding
334         buffer, doubling capacity as needed to accomdate all the data.
335
336         """
337         if self._state == _BufferBlock.WRITABLE:
338             if not isinstance(data, bytes) and not isinstance(data, memoryview):
339                 data = data.encode()
340             while (self.write_pointer+len(data)) > len(self.buffer_block):
341                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
342                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
343                 self.buffer_block = new_buffer_block
344                 self.buffer_view = memoryview(self.buffer_block)
345             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
346             self.write_pointer += len(data)
347             self._locator = None
348         else:
349             raise AssertionError("Buffer block is not writable")
350
351     STATE_TRANSITIONS = frozenset([
352             (WRITABLE, PENDING),
353             (PENDING, COMMITTED),
354             (PENDING, ERROR),
355             (ERROR, PENDING)])
356
357     @synchronized
358     def set_state(self, nextstate, val=None):
359         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
360             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
361         self._state = nextstate
362
363         if self._state == _BufferBlock.PENDING:
364             self.wait_for_commit.clear()
365
366         if self._state == _BufferBlock.COMMITTED:
367             self._locator = val
368             self.buffer_view = None
369             self.buffer_block = None
370             self.wait_for_commit.set()
371
372         if self._state == _BufferBlock.ERROR:
373             self.error = val
374             self.wait_for_commit.set()
375
376     @synchronized
377     def state(self):
378         return self._state
379
380     def size(self):
381         """The amount of data written to the buffer."""
382         return self.write_pointer
383
384     @synchronized
385     def locator(self):
386         """The Keep locator for this buffer's contents."""
387         if self._locator is None:
388             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
389         return self._locator
390
391     @synchronized
392     def clone(self, new_blockid, owner):
393         if self._state == _BufferBlock.COMMITTED:
394             raise AssertionError("Cannot duplicate committed buffer block")
395         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
396         bufferblock.append(self.buffer_view[0:self.size()])
397         return bufferblock
398
399     @synchronized
400     def clear(self):
401         self._state = _BufferBlock.DELETED
402         self.owner = None
403         self.buffer_block = None
404         self.buffer_view = None
405
406     @synchronized
407     def repack_writes(self):
408         """Optimize buffer block by repacking segments in file sequence.
409
410         When the client makes random writes, they appear in the buffer block in
411         the sequence they were written rather than the sequence they appear in
412         the file.  This makes for inefficient, fragmented manifests.  Attempt
413         to optimize by repacking writes in file sequence.
414
415         """
416         if self._state != _BufferBlock.WRITABLE:
417             raise AssertionError("Cannot repack non-writable block")
418
419         segs = self.owner.segments()
420
421         # Collect the segments that reference the buffer block.
422         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
423
424         # Collect total data referenced by segments (could be smaller than
425         # bufferblock size if a portion of the file was written and
426         # then overwritten).
427         write_total = sum([s.range_size for s in bufferblock_segs])
428
429         if write_total < self.size() or len(bufferblock_segs) > 1:
430             # If there's more than one segment referencing this block, it is
431             # due to out-of-order writes and will produce a fragmented
432             # manifest, so try to optimize by re-packing into a new buffer.
433             contents = self.buffer_view[0:self.write_pointer].tobytes()
434             new_bb = _BufferBlock(None, write_total, None)
435             for t in bufferblock_segs:
436                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
437                 t.segment_offset = new_bb.size() - t.range_size
438
439             self.buffer_block = new_bb.buffer_block
440             self.buffer_view = new_bb.buffer_view
441             self.write_pointer = new_bb.write_pointer
442             self._locator = None
443             new_bb.clear()
444             self.owner.set_segments(segs)
445
446     def __repr__(self):
447         return "<BufferBlock %s>" % (self.blockid)
448
449
450 class NoopLock(object):
451     def __enter__(self):
452         return self
453
454     def __exit__(self, exc_type, exc_value, traceback):
455         pass
456
457     def acquire(self, blocking=False):
458         pass
459
460     def release(self):
461         pass
462
463
464 def must_be_writable(orig_func):
465     @functools.wraps(orig_func)
466     def must_be_writable_wrapper(self, *args, **kwargs):
467         if not self.writable():
468             raise IOError(errno.EROFS, "Collection is read-only.")
469         return orig_func(self, *args, **kwargs)
470     return must_be_writable_wrapper
471
472
473 class _BlockManager(object):
474     """BlockManager handles buffer blocks.
475
476     Also handles background block uploads, and background block prefetch for a
477     Collection of ArvadosFiles.
478
479     """
480
481     DEFAULT_PUT_THREADS = 2
482     DEFAULT_GET_THREADS = 4
483
484     def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
485         """keep: KeepClient object to use"""
486         self._keep = keep
487         self._bufferblocks = collections.OrderedDict()
488         self._put_queue = None
489         self._put_threads = None
490         self._prefetch_queue = None
491         self._prefetch_threads = None
492         self.lock = threading.Lock()
493         self.prefetch_enabled = True
494         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
495         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
496         self.copies = copies
497         self.storage_classes = storage_classes_func or (lambda: [])
498         self._pending_write_size = 0
499         self.threads_lock = threading.Lock()
500         self.padding_block = None
501         self.num_retries = num_retries
502
503     @synchronized
504     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
505         """Allocate a new, empty bufferblock in WRITABLE state and return it.
506
507         :blockid:
508           optional block identifier, otherwise one will be automatically assigned
509
510         :starting_capacity:
511           optional capacity, otherwise will use default capacity
512
513         :owner:
514           ArvadosFile that owns this block
515
516         """
517         return self._alloc_bufferblock(blockid, starting_capacity, owner)
518
519     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
520         if blockid is None:
521             blockid = str(uuid.uuid4())
522         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
523         self._bufferblocks[bufferblock.blockid] = bufferblock
524         return bufferblock
525
526     @synchronized
527     def dup_block(self, block, owner):
528         """Create a new bufferblock initialized with the content of an existing bufferblock.
529
530         :block:
531           the buffer block to copy.
532
533         :owner:
534           ArvadosFile that owns the new block
535
536         """
537         new_blockid = str(uuid.uuid4())
538         bufferblock = block.clone(new_blockid, owner)
539         self._bufferblocks[bufferblock.blockid] = bufferblock
540         return bufferblock
541
542     @synchronized
543     def is_bufferblock(self, locator):
544         return locator in self._bufferblocks
545
546     def _commit_bufferblock_worker(self):
547         """Background uploader thread."""
548
549         while True:
550             try:
551                 bufferblock = self._put_queue.get()
552                 if bufferblock is None:
553                     return
554
555                 if self.copies is None:
556                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
557                 else:
558                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
559                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
560             except Exception as e:
561                 bufferblock.set_state(_BufferBlock.ERROR, e)
562             finally:
563                 if self._put_queue is not None:
564                     self._put_queue.task_done()
565
566     def start_put_threads(self):
567         with self.threads_lock:
568             if self._put_threads is None:
569                 # Start uploader threads.
570
571                 # If we don't limit the Queue size, the upload queue can quickly
572                 # grow to take up gigabytes of RAM if the writing process is
573                 # generating data more quickly than it can be sent to the Keep
574                 # servers.
575                 #
576                 # With two upload threads and a queue size of 2, this means up to 4
577                 # blocks pending.  If they are full 64 MiB blocks, that means up to
578                 # 256 MiB of internal buffering, which is the same size as the
579                 # default download block cache in KeepClient.
580                 self._put_queue = queue.Queue(maxsize=2)
581
582                 self._put_threads = []
583                 for i in range(0, self.num_put_threads):
584                     thread = threading.Thread(target=self._commit_bufferblock_worker)
585                     self._put_threads.append(thread)
586                     thread.daemon = True
587                     thread.start()
588
589     def _block_prefetch_worker(self):
590         """The background downloader thread."""
591         while True:
592             try:
593                 b = self._prefetch_queue.get()
594                 if b is None:
595                     return
596                 if self._keep.has_cache_slot(b):
597                     continue
598                 _logger.debug("prefetching %s", b)
599                 self._keep.get(b)
600             except Exception:
601                 _logger.exception("Exception doing block prefetch")
602
603     @synchronized
604     def start_get_threads(self):
605         if self._prefetch_threads is None:
606             self._prefetch_queue = queue.Queue()
607             self._prefetch_threads = []
608             for i in range(0, self.num_get_threads):
609                 thread = threading.Thread(target=self._block_prefetch_worker)
610                 self._prefetch_threads.append(thread)
611                 thread.daemon = True
612                 thread.start()
613
614
615     @synchronized
616     def stop_threads(self):
617         """Shut down and wait for background upload and download threads to finish."""
618
619         if self._put_threads is not None:
620             for t in self._put_threads:
621                 self._put_queue.put(None)
622             for t in self._put_threads:
623                 t.join()
624         self._put_threads = None
625         self._put_queue = None
626
627         if self._prefetch_threads is not None:
628             for t in self._prefetch_threads:
629                 self._prefetch_queue.put(None)
630             for t in self._prefetch_threads:
631                 t.join()
632         self._prefetch_threads = None
633         self._prefetch_queue = None
634
635     def __enter__(self):
636         return self
637
638     def __exit__(self, exc_type, exc_value, traceback):
639         self.stop_threads()
640
641     @synchronized
642     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
643         """Packs small blocks together before uploading"""
644
645         self._pending_write_size += closed_file_size
646
647         # Check if there are enough small blocks for filling up one in full
648         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
649             return
650
651         # Search blocks ready for getting packed together before being
652         # committed to Keep.
653         # A WRITABLE block always has an owner.
654         # A WRITABLE block with its owner.closed() implies that its
655         # size is <= KEEP_BLOCK_SIZE/2.
656         try:
657             small_blocks = [b for b in listvalues(self._bufferblocks)
658                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
659         except AttributeError:
660             # Writable blocks without owner shouldn't exist.
661             raise UnownedBlockError()
662
663         if len(small_blocks) <= 1:
664             # Not enough small blocks for repacking
665             return
666
667         for bb in small_blocks:
668             bb.repack_writes()
669
670         # Update the pending write size count with its true value, just in case
671         # some small file was opened, written and closed several times.
672         self._pending_write_size = sum([b.size() for b in small_blocks])
673
674         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
675             return
676
677         new_bb = self._alloc_bufferblock()
678         new_bb.owner = []
679         files = []
680         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
681             bb = small_blocks.pop(0)
682             new_bb.owner.append(bb.owner)
683             self._pending_write_size -= bb.size()
684             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
685             files.append((bb, new_bb.write_pointer - bb.size()))
686
687         self.commit_bufferblock(new_bb, sync=sync)
688
689         for bb, new_bb_segment_offset in files:
690             newsegs = bb.owner.segments()
691             for s in newsegs:
692                 if s.locator == bb.blockid:
693                     s.locator = new_bb.blockid
694                     s.segment_offset = new_bb_segment_offset+s.segment_offset
695             bb.owner.set_segments(newsegs)
696             self._delete_bufferblock(bb.blockid)
697
698     def commit_bufferblock(self, block, sync):
699         """Initiate a background upload of a bufferblock.
700
701         :block:
702           The block object to upload
703
704         :sync:
705           If `sync` is True, upload the block synchronously.
706           If `sync` is False, upload the block asynchronously.  This will
707           return immediately unless the upload queue is at capacity, in
708           which case it will wait on an upload queue slot.
709
710         """
711         try:
712             # Mark the block as PENDING so to disallow any more appends.
713             block.set_state(_BufferBlock.PENDING)
714         except StateChangeError as e:
715             if e.state == _BufferBlock.PENDING:
716                 if sync:
717                     block.wait_for_commit.wait()
718                 else:
719                     return
720             if block.state() == _BufferBlock.COMMITTED:
721                 return
722             elif block.state() == _BufferBlock.ERROR:
723                 raise block.error
724             else:
725                 raise
726
727         if sync:
728             try:
729                 if self.copies is None:
730                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
731                 else:
732                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
733                 block.set_state(_BufferBlock.COMMITTED, loc)
734             except Exception as e:
735                 block.set_state(_BufferBlock.ERROR, e)
736                 raise
737         else:
738             self.start_put_threads()
739             self._put_queue.put(block)
740
741     @synchronized
742     def get_bufferblock(self, locator):
743         return self._bufferblocks.get(locator)
744
745     @synchronized
746     def get_padding_block(self):
747         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
748         when using truncate() to extend the size of a file.
749
750         For reference (and possible future optimization), the md5sum of the
751         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
752
753         """
754
755         if self.padding_block is None:
756             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
757             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
758             self.commit_bufferblock(self.padding_block, False)
759         return self.padding_block
760
761     @synchronized
762     def delete_bufferblock(self, locator):
763         self._delete_bufferblock(locator)
764
765     def _delete_bufferblock(self, locator):
766         if locator in self._bufferblocks:
767             bb = self._bufferblocks[locator]
768             bb.clear()
769             del self._bufferblocks[locator]
770
771     def get_block_contents(self, locator, num_retries, cache_only=False):
772         """Fetch a block.
773
774         First checks to see if the locator is a BufferBlock and return that, if
775         not, passes the request through to KeepClient.get().
776
777         """
778         with self.lock:
779             if locator in self._bufferblocks:
780                 bufferblock = self._bufferblocks[locator]
781                 if bufferblock.state() != _BufferBlock.COMMITTED:
782                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
783                 else:
784                     locator = bufferblock._locator
785         if cache_only:
786             return self._keep.get_from_cache(locator)
787         else:
788             return self._keep.get(locator, num_retries=num_retries)
789
790     def commit_all(self):
791         """Commit all outstanding buffer blocks.
792
793         This is a synchronous call, and will not return until all buffer blocks
794         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
795
796         """
797         self.repack_small_blocks(force=True, sync=True)
798
799         with self.lock:
800             items = listitems(self._bufferblocks)
801
802         for k,v in items:
803             if v.state() != _BufferBlock.COMMITTED and v.owner:
804                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
805                 # state, they're already being committed asynchronously.
806                 if isinstance(v.owner, ArvadosFile):
807                     v.owner.flush(sync=False)
808
809         with self.lock:
810             if self._put_queue is not None:
811                 self._put_queue.join()
812
813                 err = []
814                 for k,v in items:
815                     if v.state() == _BufferBlock.ERROR:
816                         err.append((v.locator(), v.error))
817                 if err:
818                     raise KeepWriteError("Error writing some blocks", err, label="block")
819
820         for k,v in items:
821             # flush again with sync=True to remove committed bufferblocks from
822             # the segments.
823             if v.owner:
824                 if isinstance(v.owner, ArvadosFile):
825                     v.owner.flush(sync=True)
826                 elif isinstance(v.owner, list) and len(v.owner) > 0:
827                     # This bufferblock is referenced by many files as a result
828                     # of repacking small blocks, so don't delete it when flushing
829                     # its owners, just do it after flushing them all.
830                     for owner in v.owner:
831                         owner.flush(sync=True)
832                     self.delete_bufferblock(k)
833
834     def block_prefetch(self, locator):
835         """Initiate a background download of a block.
836
837         This assumes that the underlying KeepClient implements a block cache,
838         so repeated requests for the same block will not result in repeated
839         downloads (unless the block is evicted from the cache.)  This method
840         does not block.
841
842         """
843
844         if not self.prefetch_enabled:
845             return
846
847         if self._keep.has_cache_slot(locator):
848             return
849
850         with self.lock:
851             if locator in self._bufferblocks:
852                 return
853
854         self.start_get_threads()
855         # _logger.debug("pushing %s to prefetch", locator)
856         self._prefetch_queue.put(locator)
857
858
859 class ArvadosFile(object):
860     """Represent a file in a Collection.
861
862     ArvadosFile manages the underlying representation of a file in Keep as a
863     sequence of segments spanning a set of blocks, and implements random
864     read/write access.
865
866     This object may be accessed from multiple threads.
867
868     """
869
870     __slots__ = ('parent', 'name', '_writers', '_committed',
871                  '_segments', 'lock', '_current_bblock', 'fuse_entry')
872
873     def __init__(self, parent, name, stream=[], segments=[]):
874         """
875         ArvadosFile constructor.
876
877         :stream:
878           a list of Range objects representing a block stream
879
880         :segments:
881           a list of Range objects representing segments
882         """
883         self.parent = parent
884         self.name = name
885         self._writers = set()
886         self._committed = False
887         self._segments = []
888         self.lock = parent.root_collection().lock
889         for s in segments:
890             self._add_segment(stream, s.locator, s.range_size)
891         self._current_bblock = None
892
893     def writable(self):
894         return self.parent.writable()
895
896     @synchronized
897     def permission_expired(self, as_of_dt=None):
898         """Returns True if any of the segment's locators is expired"""
899         for r in self._segments:
900             if KeepLocator(r.locator).permission_expired(as_of_dt):
901                 return True
902         return False
903
904     @synchronized
905     def has_remote_blocks(self):
906         """Returns True if any of the segment's locators has a +R signature"""
907
908         for s in self._segments:
909             if '+R' in s.locator:
910                 return True
911         return False
912
913     @synchronized
914     def _copy_remote_blocks(self, remote_blocks={}):
915         """Ask Keep to copy remote blocks and point to their local copies.
916
917         This is called from the parent Collection.
918
919         :remote_blocks:
920             Shared cache of remote to local block mappings. This is used to avoid
921             doing extra work when blocks are shared by more than one file in
922             different subdirectories.
923         """
924
925         for s in self._segments:
926             if '+R' in s.locator:
927                 try:
928                     loc = remote_blocks[s.locator]
929                 except KeyError:
930                     loc = self.parent._my_keep().refresh_signature(s.locator)
931                     remote_blocks[s.locator] = loc
932                 s.locator = loc
933                 self.parent.set_committed(False)
934         return remote_blocks
935
936     @synchronized
937     def segments(self):
938         return copy.copy(self._segments)
939
940     @synchronized
941     def clone(self, new_parent, new_name):
942         """Make a copy of this file."""
943         cp = ArvadosFile(new_parent, new_name)
944         cp.replace_contents(self)
945         return cp
946
947     @must_be_writable
948     @synchronized
949     def replace_contents(self, other):
950         """Replace segments of this file with segments from another `ArvadosFile` object."""
951
952         map_loc = {}
953         self._segments = []
954         for other_segment in other.segments():
955             new_loc = other_segment.locator
956             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
957                 if other_segment.locator not in map_loc:
958                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
959                     if bufferblock.state() != _BufferBlock.WRITABLE:
960                         map_loc[other_segment.locator] = bufferblock.locator()
961                     else:
962                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
963                 new_loc = map_loc[other_segment.locator]
964
965             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
966
967         self.set_committed(False)
968
969     def __eq__(self, other):
970         if other is self:
971             return True
972         if not isinstance(other, ArvadosFile):
973             return False
974
975         othersegs = other.segments()
976         with self.lock:
977             if len(self._segments) != len(othersegs):
978                 return False
979             for i in range(0, len(othersegs)):
980                 seg1 = self._segments[i]
981                 seg2 = othersegs[i]
982                 loc1 = seg1.locator
983                 loc2 = seg2.locator
984
985                 if self.parent._my_block_manager().is_bufferblock(loc1):
986                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
987
988                 if other.parent._my_block_manager().is_bufferblock(loc2):
989                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
990
991                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
992                     seg1.range_start != seg2.range_start or
993                     seg1.range_size != seg2.range_size or
994                     seg1.segment_offset != seg2.segment_offset):
995                     return False
996
997         return True
998
999     def __ne__(self, other):
1000         return not self.__eq__(other)
1001
1002     @synchronized
1003     def set_segments(self, segs):
1004         self._segments = segs
1005
1006     @synchronized
1007     def set_committed(self, value=True):
1008         """Set committed flag.
1009
1010         If value is True, set committed to be True.
1011
1012         If value is False, set committed to be False for this and all parents.
1013         """
1014         if value == self._committed:
1015             return
1016         self._committed = value
1017         if self._committed is False and self.parent is not None:
1018             self.parent.set_committed(False)
1019
1020     @synchronized
1021     def committed(self):
1022         """Get whether this is committed or not."""
1023         return self._committed
1024
1025     @synchronized
1026     def add_writer(self, writer):
1027         """Add an ArvadosFileWriter reference to the list of writers"""
1028         if isinstance(writer, ArvadosFileWriter):
1029             self._writers.add(writer)
1030
1031     @synchronized
1032     def remove_writer(self, writer, flush):
1033         """
1034         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
1035         and do some block maintenance tasks.
1036         """
1037         self._writers.remove(writer)
1038
1039         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1040             # File writer closed, not small enough for repacking
1041             self.flush()
1042         elif self.closed():
1043             # All writers closed and size is adequate for repacking
1044             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1045
1046     def closed(self):
1047         """
1048         Get whether this is closed or not. When the writers list is empty, the file
1049         is supposed to be closed.
1050         """
1051         return len(self._writers) == 0
1052
1053     @must_be_writable
1054     @synchronized
1055     def truncate(self, size):
1056         """Shrink or expand the size of the file.
1057
1058         If `size` is less than the size of the file, the file contents after
1059         `size` will be discarded.  If `size` is greater than the current size
1060         of the file, it will be filled with zero bytes.
1061
1062         """
1063         if size < self.size():
1064             new_segs = []
1065             for r in self._segments:
1066                 range_end = r.range_start+r.range_size
1067                 if r.range_start >= size:
1068                     # segment is past the trucate size, all done
1069                     break
1070                 elif size < range_end:
1071                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1072                     nr.segment_offset = r.segment_offset
1073                     new_segs.append(nr)
1074                     break
1075                 else:
1076                     new_segs.append(r)
1077
1078             self._segments = new_segs
1079             self.set_committed(False)
1080         elif size > self.size():
1081             padding = self.parent._my_block_manager().get_padding_block()
1082             diff = size - self.size()
1083             while diff > config.KEEP_BLOCK_SIZE:
1084                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1085                 diff -= config.KEEP_BLOCK_SIZE
1086             if diff > 0:
1087                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1088             self.set_committed(False)
1089         else:
1090             # size == self.size()
1091             pass
1092
1093     def readfrom(self, offset, size, num_retries, exact=False):
1094         """Read up to `size` bytes from the file starting at `offset`.
1095
1096         :exact:
1097          If False (default), return less data than requested if the read
1098          crosses a block boundary and the next block isn't cached.  If True,
1099          only return less data than requested when hitting EOF.
1100         """
1101
1102         with self.lock:
1103             if size == 0 or offset >= self.size():
1104                 return b''
1105             readsegs = locators_and_ranges(self._segments, offset, size)
1106             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE*4, limit=32)
1107
1108         locs = set()
1109         data = []
1110         for lr in readsegs:
1111             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1112             if block:
1113                 blockview = memoryview(block)
1114                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1115                 locs.add(lr.locator)
1116             else:
1117                 break
1118
1119         for lr in prefetch:
1120             if lr.locator not in locs:
1121                 self.parent._my_block_manager().block_prefetch(lr.locator)
1122                 locs.add(lr.locator)
1123
1124         if len(data) == 1:
1125             return data[0]
1126         else:
1127             return b''.join(data)
1128
1129     @must_be_writable
1130     @synchronized
1131     def writeto(self, offset, data, num_retries):
1132         """Write `data` to the file starting at `offset`.
1133
1134         This will update existing bytes and/or extend the size of the file as
1135         necessary.
1136
1137         """
1138         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1139             data = data.encode()
1140         if len(data) == 0:
1141             return
1142
1143         if offset > self.size():
1144             self.truncate(offset)
1145
1146         if len(data) > config.KEEP_BLOCK_SIZE:
1147             # Chunk it up into smaller writes
1148             n = 0
1149             dataview = memoryview(data)
1150             while n < len(data):
1151                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1152                 n += config.KEEP_BLOCK_SIZE
1153             return
1154
1155         self.set_committed(False)
1156
1157         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1158             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1159
1160         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1161             self._current_bblock.repack_writes()
1162             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1163                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1164                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1165
1166         self._current_bblock.append(data)
1167
1168         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1169
1170         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1171
1172         return len(data)
1173
1174     @synchronized
1175     def flush(self, sync=True, num_retries=0):
1176         """Flush the current bufferblock to Keep.
1177
1178         :sync:
1179           If True, commit block synchronously, wait until buffer block has been written.
1180           If False, commit block asynchronously, return immediately after putting block into
1181           the keep put queue.
1182         """
1183         if self.committed():
1184             return
1185
1186         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1187             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1188                 self._current_bblock.repack_writes()
1189             if self._current_bblock.state() != _BufferBlock.DELETED:
1190                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1191
1192         if sync:
1193             to_delete = set()
1194             for s in self._segments:
1195                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1196                 if bb:
1197                     if bb.state() != _BufferBlock.COMMITTED:
1198                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1199                     to_delete.add(s.locator)
1200                     s.locator = bb.locator()
1201             for s in to_delete:
1202                 # Don't delete the bufferblock if it's owned by many files. It'll be
1203                 # deleted after all of its owners are flush()ed.
1204                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1205                     self.parent._my_block_manager().delete_bufferblock(s)
1206
1207         self.parent.notify(MOD, self.parent, self.name, (self, self))
1208
1209     @must_be_writable
1210     @synchronized
1211     def add_segment(self, blocks, pos, size):
1212         """Add a segment to the end of the file.
1213
1214         `pos` and `offset` reference a section of the stream described by
1215         `blocks` (a list of Range objects)
1216
1217         """
1218         self._add_segment(blocks, pos, size)
1219
1220     def _add_segment(self, blocks, pos, size):
1221         """Internal implementation of add_segment."""
1222         self.set_committed(False)
1223         for lr in locators_and_ranges(blocks, pos, size):
1224             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1225             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1226             self._segments.append(r)
1227
1228     @synchronized
1229     def size(self):
1230         """Get the file size."""
1231         if self._segments:
1232             n = self._segments[-1]
1233             return n.range_start + n.range_size
1234         else:
1235             return 0
1236
1237     @synchronized
1238     def manifest_text(self, stream_name=".", portable_locators=False,
1239                       normalize=False, only_committed=False):
1240         buf = ""
1241         filestream = []
1242         for segment in self._segments:
1243             loc = segment.locator
1244             if self.parent._my_block_manager().is_bufferblock(loc):
1245                 if only_committed:
1246                     continue
1247                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1248             if portable_locators:
1249                 loc = KeepLocator(loc).stripped()
1250             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1251                                  segment.segment_offset, segment.range_size))
1252         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1253         buf += "\n"
1254         return buf
1255
1256     @must_be_writable
1257     @synchronized
1258     def _reparent(self, newparent, newname):
1259         self.set_committed(False)
1260         self.flush(sync=True)
1261         self.parent.remove(self.name)
1262         self.parent = newparent
1263         self.name = newname
1264         self.lock = self.parent.root_collection().lock
1265
1266
1267 class ArvadosFileReader(ArvadosFileReaderBase):
1268     """Wraps ArvadosFile in a file-like object supporting reading only.
1269
1270     Be aware that this class is NOT thread safe as there is no locking around
1271     updating file pointer.
1272
1273     """
1274
1275     def __init__(self, arvadosfile, mode="r", num_retries=None):
1276         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1277         self.arvadosfile = arvadosfile
1278
1279     def size(self):
1280         return self.arvadosfile.size()
1281
1282     def stream_name(self):
1283         return self.arvadosfile.parent.stream_name()
1284
1285     def readinto(self, b):
1286         data = self.read(len(b))
1287         b[:len(data)] = data
1288         return len(data)
1289
1290     @_FileLikeObjectBase._before_close
1291     @retry_method
1292     def read(self, size=None, num_retries=None):
1293         """Read up to `size` bytes from the file and return the result.
1294
1295         Starts at the current file position.  If `size` is None, read the
1296         entire remainder of the file.
1297         """
1298         if size is None:
1299             data = []
1300             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1301             while rd:
1302                 data.append(rd)
1303                 self._filepos += len(rd)
1304                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1305             return b''.join(data)
1306         else:
1307             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1308             self._filepos += len(data)
1309             return data
1310
1311     @_FileLikeObjectBase._before_close
1312     @retry_method
1313     def readfrom(self, offset, size, num_retries=None):
1314         """Read up to `size` bytes from the stream, starting at the specified file offset.
1315
1316         This method does not change the file position.
1317         """
1318         return self.arvadosfile.readfrom(offset, size, num_retries)
1319
1320     def flush(self):
1321         pass
1322
1323
1324 class ArvadosFileWriter(ArvadosFileReader):
1325     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1326
1327     Be aware that this class is NOT thread safe as there is no locking around
1328     updating file pointer.
1329
1330     """
1331
1332     def __init__(self, arvadosfile, mode, num_retries=None):
1333         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1334         self.arvadosfile.add_writer(self)
1335
1336     def writable(self):
1337         return True
1338
1339     @_FileLikeObjectBase._before_close
1340     @retry_method
1341     def write(self, data, num_retries=None):
1342         if self.mode[0] == "a":
1343             self._filepos = self.size()
1344         self.arvadosfile.writeto(self._filepos, data, num_retries)
1345         self._filepos += len(data)
1346         return len(data)
1347
1348     @_FileLikeObjectBase._before_close
1349     @retry_method
1350     def writelines(self, seq, num_retries=None):
1351         for s in seq:
1352             self.write(s, num_retries=num_retries)
1353
1354     @_FileLikeObjectBase._before_close
1355     def truncate(self, size=None):
1356         if size is None:
1357             size = self._filepos
1358         self.arvadosfile.truncate(size)
1359
1360     @_FileLikeObjectBase._before_close
1361     def flush(self):
1362         self.arvadosfile.flush()
1363
1364     def close(self, flush=True):
1365         if not self.closed:
1366             self.arvadosfile.remove_writer(self, flush)
1367             super(ArvadosFileWriter, self).close()
1368
1369
1370 class WrappableFile(object):
1371     """An interface to an Arvados file that's compatible with io wrappers.
1372
1373     """
1374     def __init__(self, f):
1375         self.f = f
1376         self.closed = False
1377     def close(self):
1378         self.closed = True
1379         return self.f.close()
1380     def flush(self):
1381         return self.f.flush()
1382     def read(self, *args, **kwargs):
1383         return self.f.read(*args, **kwargs)
1384     def readable(self):
1385         return self.f.readable()
1386     def readinto(self, *args, **kwargs):
1387         return self.f.readinto(*args, **kwargs)
1388     def seek(self, *args, **kwargs):
1389         return self.f.seek(*args, **kwargs)
1390     def seekable(self):
1391         return self.f.seekable()
1392     def tell(self):
1393         return self.f.tell()
1394     def writable(self):
1395         return self.f.writable()
1396     def write(self, *args, **kwargs):
1397         return self.f.write(*args, **kwargs)