Merge branch '13006-api-is_a-filter' into 13006-sync-groups-is_a-filter
[arvados.git] / sdk / python / arvados / arvfile.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
12 import bz2
13 import collections
14 import copy
15 import errno
16 import functools
17 import hashlib
18 import logging
19 import os
20 import queue
21 import re
22 import sys
23 import threading
24 import uuid
25 import zlib
26
27 from . import config
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
33
34 MOD = "mod"
35 WRITE = "write"
36
37 _logger = logging.getLogger('arvados.arvfile')
38
39 def split(path):
40     """split(path) -> streamname, filename
41
42     Separate the stream name and file name in a /-separated stream path and
43     return a tuple (stream_name, file_name).  If no stream name is available,
44     assume '.'.
45
46     """
47     try:
48         stream_name, file_name = path.rsplit('/', 1)
49     except ValueError:  # No / in string
50         stream_name, file_name = '.', path
51     return stream_name, file_name
52
53
54 class UnownedBlockError(Exception):
55     """Raised when there's an writable block without an owner on the BlockManager."""
56     pass
57
58
59 class _FileLikeObjectBase(object):
60     def __init__(self, name, mode):
61         self.name = name
62         self.mode = mode
63         self.closed = False
64
65     @staticmethod
66     def _before_close(orig_func):
67         @functools.wraps(orig_func)
68         def before_close_wrapper(self, *args, **kwargs):
69             if self.closed:
70                 raise ValueError("I/O operation on closed stream file")
71             return orig_func(self, *args, **kwargs)
72         return before_close_wrapper
73
74     def __enter__(self):
75         return self
76
77     def __exit__(self, exc_type, exc_value, traceback):
78         try:
79             self.close()
80         except Exception:
81             if exc_type is None:
82                 raise
83
84     def close(self):
85         self.closed = True
86
87
88 class ArvadosFileReaderBase(_FileLikeObjectBase):
89     def __init__(self, name, mode, num_retries=None):
90         super(ArvadosFileReaderBase, self).__init__(name, mode)
91         self._filepos = 0
92         self.num_retries = num_retries
93         self._readline_cache = (None, None)
94
95     def __iter__(self):
96         while True:
97             data = self.readline()
98             if not data:
99                 break
100             yield data
101
102     def decompressed_name(self):
103         return re.sub('\.(bz2|gz)$', '', self.name)
104
105     @_FileLikeObjectBase._before_close
106     def seek(self, pos, whence=os.SEEK_SET):
107         if whence == os.SEEK_CUR:
108             pos += self._filepos
109         elif whence == os.SEEK_END:
110             pos += self.size()
111         if pos < 0:
112             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
113         self._filepos = pos
114         return self._filepos
115
116     def tell(self):
117         return self._filepos
118
119     def readable(self):
120         return True
121
122     def writable(self):
123         return False
124
125     def seekable(self):
126         return True
127
128     @_FileLikeObjectBase._before_close
129     @retry_method
130     def readall(self, size=2**20, num_retries=None):
131         while True:
132             data = self.read(size, num_retries=num_retries)
133             if len(data) == 0:
134                 break
135             yield data
136
137     @_FileLikeObjectBase._before_close
138     @retry_method
139     def readline(self, size=float('inf'), num_retries=None):
140         cache_pos, cache_data = self._readline_cache
141         if self.tell() == cache_pos:
142             data = [cache_data]
143             self._filepos += len(cache_data)
144         else:
145             data = [b'']
146         data_size = len(data[-1])
147         while (data_size < size) and (b'\n' not in data[-1]):
148             next_read = self.read(2 ** 20, num_retries=num_retries)
149             if not next_read:
150                 break
151             data.append(next_read)
152             data_size += len(next_read)
153         data = b''.join(data)
154         try:
155             nextline_index = data.index(b'\n') + 1
156         except ValueError:
157             nextline_index = len(data)
158         nextline_index = min(nextline_index, size)
159         self._filepos -= len(data) - nextline_index
160         self._readline_cache = (self.tell(), data[nextline_index:])
161         return data[:nextline_index].decode()
162
163     @_FileLikeObjectBase._before_close
164     @retry_method
165     def decompress(self, decompress, size, num_retries=None):
166         for segment in self.readall(size, num_retries=num_retries):
167             data = decompress(segment)
168             if data:
169                 yield data
170
171     @_FileLikeObjectBase._before_close
172     @retry_method
173     def readall_decompressed(self, size=2**20, num_retries=None):
174         self.seek(0)
175         if self.name.endswith('.bz2'):
176             dc = bz2.BZ2Decompressor()
177             return self.decompress(dc.decompress, size,
178                                    num_retries=num_retries)
179         elif self.name.endswith('.gz'):
180             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
181             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
182                                    size, num_retries=num_retries)
183         else:
184             return self.readall(size, num_retries=num_retries)
185
186     @_FileLikeObjectBase._before_close
187     @retry_method
188     def readlines(self, sizehint=float('inf'), num_retries=None):
189         data = []
190         data_size = 0
191         for s in self.readall(num_retries=num_retries):
192             data.append(s)
193             data_size += len(s)
194             if data_size >= sizehint:
195                 break
196         return b''.join(data).decode().splitlines(True)
197
198     def size(self):
199         raise IOError(errno.ENOSYS, "Not implemented")
200
201     def read(self, size, num_retries=None):
202         raise IOError(errno.ENOSYS, "Not implemented")
203
204     def readfrom(self, start, size, num_retries=None):
205         raise IOError(errno.ENOSYS, "Not implemented")
206
207
208 class StreamFileReader(ArvadosFileReaderBase):
209     class _NameAttribute(str):
210         # The Python file API provides a plain .name attribute.
211         # Older SDK provided a name() method.
212         # This class provides both, for maximum compatibility.
213         def __call__(self):
214             return self
215
216     def __init__(self, stream, segments, name):
217         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
218         self._stream = stream
219         self.segments = segments
220
221     def stream_name(self):
222         return self._stream.name()
223
224     def size(self):
225         n = self.segments[-1]
226         return n.range_start + n.range_size
227
228     @_FileLikeObjectBase._before_close
229     @retry_method
230     def read(self, size, num_retries=None):
231         """Read up to 'size' bytes from the stream, starting at the current file position"""
232         if size == 0:
233             return b''
234
235         data = b''
236         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
237         if available_chunks:
238             lr = available_chunks[0]
239             data = self._stream.readfrom(lr.locator+lr.segment_offset,
240                                          lr.segment_size,
241                                          num_retries=num_retries)
242
243         self._filepos += len(data)
244         return data
245
246     @_FileLikeObjectBase._before_close
247     @retry_method
248     def readfrom(self, start, size, num_retries=None):
249         """Read up to 'size' bytes from the stream, starting at 'start'"""
250         if size == 0:
251             return b''
252
253         data = []
254         for lr in locators_and_ranges(self.segments, start, size):
255             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
256                                               num_retries=num_retries))
257         return b''.join(data)
258
259     def as_manifest(self):
260         segs = []
261         for r in self.segments:
262             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
263         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
264
265
266 def synchronized(orig_func):
267     @functools.wraps(orig_func)
268     def synchronized_wrapper(self, *args, **kwargs):
269         with self.lock:
270             return orig_func(self, *args, **kwargs)
271     return synchronized_wrapper
272
273
274 class StateChangeError(Exception):
275     def __init__(self, message, state, nextstate):
276         super(StateChangeError, self).__init__(message)
277         self.state = state
278         self.nextstate = nextstate
279
280 class _BufferBlock(object):
281     """A stand-in for a Keep block that is in the process of being written.
282
283     Writers can append to it, get the size, and compute the Keep locator.
284     There are three valid states:
285
286     WRITABLE
287       Can append to block.
288
289     PENDING
290       Block is in the process of being uploaded to Keep, append is an error.
291
292     COMMITTED
293       The block has been written to Keep, its internal buffer has been
294       released, fetching the block will fetch it via keep client (since we
295       discarded the internal copy), and identifiers referring to the BufferBlock
296       can be replaced with the block locator.
297
298     """
299
300     WRITABLE = 0
301     PENDING = 1
302     COMMITTED = 2
303     ERROR = 3
304     DELETED = 4
305
306     def __init__(self, blockid, starting_capacity, owner):
307         """
308         :blockid:
309           the identifier for this block
310
311         :starting_capacity:
312           the initial buffer capacity
313
314         :owner:
315           ArvadosFile that owns this block
316
317         """
318         self.blockid = blockid
319         self.buffer_block = bytearray(starting_capacity)
320         self.buffer_view = memoryview(self.buffer_block)
321         self.write_pointer = 0
322         self._state = _BufferBlock.WRITABLE
323         self._locator = None
324         self.owner = owner
325         self.lock = threading.Lock()
326         self.wait_for_commit = threading.Event()
327         self.error = None
328
329     @synchronized
330     def append(self, data):
331         """Append some data to the buffer.
332
333         Only valid if the block is in WRITABLE state.  Implements an expanding
334         buffer, doubling capacity as needed to accomdate all the data.
335
336         """
337         if self._state == _BufferBlock.WRITABLE:
338             if not isinstance(data, bytes) and not isinstance(data, memoryview):
339                 data = data.encode()
340             while (self.write_pointer+len(data)) > len(self.buffer_block):
341                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
342                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
343                 self.buffer_block = new_buffer_block
344                 self.buffer_view = memoryview(self.buffer_block)
345             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
346             self.write_pointer += len(data)
347             self._locator = None
348         else:
349             raise AssertionError("Buffer block is not writable")
350
351     STATE_TRANSITIONS = frozenset([
352             (WRITABLE, PENDING),
353             (PENDING, COMMITTED),
354             (PENDING, ERROR),
355             (ERROR, PENDING)])
356
357     @synchronized
358     def set_state(self, nextstate, val=None):
359         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
360             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
361         self._state = nextstate
362
363         if self._state == _BufferBlock.PENDING:
364             self.wait_for_commit.clear()
365
366         if self._state == _BufferBlock.COMMITTED:
367             self._locator = val
368             self.buffer_view = None
369             self.buffer_block = None
370             self.wait_for_commit.set()
371
372         if self._state == _BufferBlock.ERROR:
373             self.error = val
374             self.wait_for_commit.set()
375
376     @synchronized
377     def state(self):
378         return self._state
379
380     def size(self):
381         """The amount of data written to the buffer."""
382         return self.write_pointer
383
384     @synchronized
385     def locator(self):
386         """The Keep locator for this buffer's contents."""
387         if self._locator is None:
388             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
389         return self._locator
390
391     @synchronized
392     def clone(self, new_blockid, owner):
393         if self._state == _BufferBlock.COMMITTED:
394             raise AssertionError("Cannot duplicate committed buffer block")
395         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
396         bufferblock.append(self.buffer_view[0:self.size()])
397         return bufferblock
398
399     @synchronized
400     def clear(self):
401         self._state = _BufferBlock.DELETED
402         self.owner = None
403         self.buffer_block = None
404         self.buffer_view = None
405
406     @synchronized
407     def repack_writes(self):
408         """Optimize buffer block by repacking segments in file sequence.
409
410         When the client makes random writes, they appear in the buffer block in
411         the sequence they were written rather than the sequence they appear in
412         the file.  This makes for inefficient, fragmented manifests.  Attempt
413         to optimize by repacking writes in file sequence.
414
415         """
416         if self._state != _BufferBlock.WRITABLE:
417             raise AssertionError("Cannot repack non-writable block")
418
419         segs = self.owner.segments()
420
421         # Collect the segments that reference the buffer block.
422         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
423
424         # Collect total data referenced by segments (could be smaller than
425         # bufferblock size if a portion of the file was written and
426         # then overwritten).
427         write_total = sum([s.range_size for s in bufferblock_segs])
428
429         if write_total < self.size() or len(bufferblock_segs) > 1:
430             # If there's more than one segment referencing this block, it is
431             # due to out-of-order writes and will produce a fragmented
432             # manifest, so try to optimize by re-packing into a new buffer.
433             contents = self.buffer_view[0:self.write_pointer].tobytes()
434             new_bb = _BufferBlock(None, write_total, None)
435             for t in bufferblock_segs:
436                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
437                 t.segment_offset = new_bb.size() - t.range_size
438
439             self.buffer_block = new_bb.buffer_block
440             self.buffer_view = new_bb.buffer_view
441             self.write_pointer = new_bb.write_pointer
442             self._locator = None
443             new_bb.clear()
444             self.owner.set_segments(segs)
445
446     def __repr__(self):
447         return "<BufferBlock %s>" % (self.blockid)
448
449
450 class NoopLock(object):
451     def __enter__(self):
452         return self
453
454     def __exit__(self, exc_type, exc_value, traceback):
455         pass
456
457     def acquire(self, blocking=False):
458         pass
459
460     def release(self):
461         pass
462
463
464 def must_be_writable(orig_func):
465     @functools.wraps(orig_func)
466     def must_be_writable_wrapper(self, *args, **kwargs):
467         if not self.writable():
468             raise IOError(errno.EROFS, "Collection is read-only.")
469         return orig_func(self, *args, **kwargs)
470     return must_be_writable_wrapper
471
472
473 class _BlockManager(object):
474     """BlockManager handles buffer blocks.
475
476     Also handles background block uploads, and background block prefetch for a
477     Collection of ArvadosFiles.
478
479     """
480
481     DEFAULT_PUT_THREADS = 2
482     DEFAULT_GET_THREADS = 2
483
484     def __init__(self, keep, copies=None, put_threads=None):
485         """keep: KeepClient object to use"""
486         self._keep = keep
487         self._bufferblocks = collections.OrderedDict()
488         self._put_queue = None
489         self._put_threads = None
490         self._prefetch_queue = None
491         self._prefetch_threads = None
492         self.lock = threading.Lock()
493         self.prefetch_enabled = True
494         if put_threads:
495             self.num_put_threads = put_threads
496         else:
497             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
498         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
499         self.copies = copies
500         self._pending_write_size = 0
501         self.threads_lock = threading.Lock()
502         self.padding_block = None
503
504     @synchronized
505     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
506         """Allocate a new, empty bufferblock in WRITABLE state and return it.
507
508         :blockid:
509           optional block identifier, otherwise one will be automatically assigned
510
511         :starting_capacity:
512           optional capacity, otherwise will use default capacity
513
514         :owner:
515           ArvadosFile that owns this block
516
517         """
518         return self._alloc_bufferblock(blockid, starting_capacity, owner)
519
520     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
521         if blockid is None:
522             blockid = str(uuid.uuid4())
523         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
524         self._bufferblocks[bufferblock.blockid] = bufferblock
525         return bufferblock
526
527     @synchronized
528     def dup_block(self, block, owner):
529         """Create a new bufferblock initialized with the content of an existing bufferblock.
530
531         :block:
532           the buffer block to copy.
533
534         :owner:
535           ArvadosFile that owns the new block
536
537         """
538         new_blockid = str(uuid.uuid4())
539         bufferblock = block.clone(new_blockid, owner)
540         self._bufferblocks[bufferblock.blockid] = bufferblock
541         return bufferblock
542
543     @synchronized
544     def is_bufferblock(self, locator):
545         return locator in self._bufferblocks
546
547     def _commit_bufferblock_worker(self):
548         """Background uploader thread."""
549
550         while True:
551             try:
552                 bufferblock = self._put_queue.get()
553                 if bufferblock is None:
554                     return
555
556                 if self.copies is None:
557                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
558                 else:
559                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
560                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
561             except Exception as e:
562                 bufferblock.set_state(_BufferBlock.ERROR, e)
563             finally:
564                 if self._put_queue is not None:
565                     self._put_queue.task_done()
566
567     def start_put_threads(self):
568         with self.threads_lock:
569             if self._put_threads is None:
570                 # Start uploader threads.
571
572                 # If we don't limit the Queue size, the upload queue can quickly
573                 # grow to take up gigabytes of RAM if the writing process is
574                 # generating data more quickly than it can be send to the Keep
575                 # servers.
576                 #
577                 # With two upload threads and a queue size of 2, this means up to 4
578                 # blocks pending.  If they are full 64 MiB blocks, that means up to
579                 # 256 MiB of internal buffering, which is the same size as the
580                 # default download block cache in KeepClient.
581                 self._put_queue = queue.Queue(maxsize=2)
582
583                 self._put_threads = []
584                 for i in range(0, self.num_put_threads):
585                     thread = threading.Thread(target=self._commit_bufferblock_worker)
586                     self._put_threads.append(thread)
587                     thread.daemon = True
588                     thread.start()
589
590     def _block_prefetch_worker(self):
591         """The background downloader thread."""
592         while True:
593             try:
594                 b = self._prefetch_queue.get()
595                 if b is None:
596                     return
597                 self._keep.get(b)
598             except Exception:
599                 _logger.exception("Exception doing block prefetch")
600
601     @synchronized
602     def start_get_threads(self):
603         if self._prefetch_threads is None:
604             self._prefetch_queue = queue.Queue()
605             self._prefetch_threads = []
606             for i in range(0, self.num_get_threads):
607                 thread = threading.Thread(target=self._block_prefetch_worker)
608                 self._prefetch_threads.append(thread)
609                 thread.daemon = True
610                 thread.start()
611
612
613     @synchronized
614     def stop_threads(self):
615         """Shut down and wait for background upload and download threads to finish."""
616
617         if self._put_threads is not None:
618             for t in self._put_threads:
619                 self._put_queue.put(None)
620             for t in self._put_threads:
621                 t.join()
622         self._put_threads = None
623         self._put_queue = None
624
625         if self._prefetch_threads is not None:
626             for t in self._prefetch_threads:
627                 self._prefetch_queue.put(None)
628             for t in self._prefetch_threads:
629                 t.join()
630         self._prefetch_threads = None
631         self._prefetch_queue = None
632
633     def __enter__(self):
634         return self
635
636     def __exit__(self, exc_type, exc_value, traceback):
637         self.stop_threads()
638
639     @synchronized
640     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
641         """Packs small blocks together before uploading"""
642
643         self._pending_write_size += closed_file_size
644
645         # Check if there are enough small blocks for filling up one in full
646         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
647             return
648
649         # Search blocks ready for getting packed together before being
650         # committed to Keep.
651         # A WRITABLE block always has an owner.
652         # A WRITABLE block with its owner.closed() implies that its
653         # size is <= KEEP_BLOCK_SIZE/2.
654         try:
655             small_blocks = [b for b in listvalues(self._bufferblocks)
656                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
657         except AttributeError:
658             # Writable blocks without owner shouldn't exist.
659             raise UnownedBlockError()
660
661         if len(small_blocks) <= 1:
662             # Not enough small blocks for repacking
663             return
664
665         for bb in small_blocks:
666             bb.repack_writes()
667
668         # Update the pending write size count with its true value, just in case
669         # some small file was opened, written and closed several times.
670         self._pending_write_size = sum([b.size() for b in small_blocks])
671
672         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
673             return
674
675         new_bb = self._alloc_bufferblock()
676         new_bb.owner = []
677         files = []
678         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
679             bb = small_blocks.pop(0)
680             new_bb.owner.append(bb.owner)
681             self._pending_write_size -= bb.size()
682             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
683             files.append((bb, new_bb.write_pointer - bb.size()))
684
685         self.commit_bufferblock(new_bb, sync=sync)
686
687         for bb, new_bb_segment_offset in files:
688             newsegs = bb.owner.segments()
689             for s in newsegs:
690                 if s.locator == bb.blockid:
691                     s.locator = new_bb.blockid
692                     s.segment_offset = new_bb_segment_offset+s.segment_offset
693             bb.owner.set_segments(newsegs)
694             self._delete_bufferblock(bb.blockid)
695
696     def commit_bufferblock(self, block, sync):
697         """Initiate a background upload of a bufferblock.
698
699         :block:
700           The block object to upload
701
702         :sync:
703           If `sync` is True, upload the block synchronously.
704           If `sync` is False, upload the block asynchronously.  This will
705           return immediately unless the upload queue is at capacity, in
706           which case it will wait on an upload queue slot.
707
708         """
709         try:
710             # Mark the block as PENDING so to disallow any more appends.
711             block.set_state(_BufferBlock.PENDING)
712         except StateChangeError as e:
713             if e.state == _BufferBlock.PENDING:
714                 if sync:
715                     block.wait_for_commit.wait()
716                 else:
717                     return
718             if block.state() == _BufferBlock.COMMITTED:
719                 return
720             elif block.state() == _BufferBlock.ERROR:
721                 raise block.error
722             else:
723                 raise
724
725         if sync:
726             try:
727                 if self.copies is None:
728                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
729                 else:
730                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
731                 block.set_state(_BufferBlock.COMMITTED, loc)
732             except Exception as e:
733                 block.set_state(_BufferBlock.ERROR, e)
734                 raise
735         else:
736             self.start_put_threads()
737             self._put_queue.put(block)
738
739     @synchronized
740     def get_bufferblock(self, locator):
741         return self._bufferblocks.get(locator)
742
743     @synchronized
744     def get_padding_block(self):
745         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
746         when using truncate() to extend the size of a file.
747
748         For reference (and possible future optimization), the md5sum of the
749         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
750
751         """
752
753         if self.padding_block is None:
754             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
755             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
756             self.commit_bufferblock(self.padding_block, False)
757         return self.padding_block
758
759     @synchronized
760     def delete_bufferblock(self, locator):
761         self._delete_bufferblock(locator)
762
763     def _delete_bufferblock(self, locator):
764         bb = self._bufferblocks[locator]
765         bb.clear()
766         del self._bufferblocks[locator]
767
768     def get_block_contents(self, locator, num_retries, cache_only=False):
769         """Fetch a block.
770
771         First checks to see if the locator is a BufferBlock and return that, if
772         not, passes the request through to KeepClient.get().
773
774         """
775         with self.lock:
776             if locator in self._bufferblocks:
777                 bufferblock = self._bufferblocks[locator]
778                 if bufferblock.state() != _BufferBlock.COMMITTED:
779                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
780                 else:
781                     locator = bufferblock._locator
782         if cache_only:
783             return self._keep.get_from_cache(locator)
784         else:
785             return self._keep.get(locator, num_retries=num_retries)
786
787     def commit_all(self):
788         """Commit all outstanding buffer blocks.
789
790         This is a synchronous call, and will not return until all buffer blocks
791         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
792
793         """
794         self.repack_small_blocks(force=True, sync=True)
795
796         with self.lock:
797             items = listitems(self._bufferblocks)
798
799         for k,v in items:
800             if v.state() != _BufferBlock.COMMITTED and v.owner:
801                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
802                 # state, they're already being committed asynchronously.
803                 if isinstance(v.owner, ArvadosFile):
804                     v.owner.flush(sync=False)
805
806         with self.lock:
807             if self._put_queue is not None:
808                 self._put_queue.join()
809
810                 err = []
811                 for k,v in items:
812                     if v.state() == _BufferBlock.ERROR:
813                         err.append((v.locator(), v.error))
814                 if err:
815                     raise KeepWriteError("Error writing some blocks", err, label="block")
816
817         for k,v in items:
818             # flush again with sync=True to remove committed bufferblocks from
819             # the segments.
820             if v.owner:
821                 if isinstance(v.owner, ArvadosFile):
822                     v.owner.flush(sync=True)
823                 elif isinstance(v.owner, list) and len(v.owner) > 0:
824                     # This bufferblock is referenced by many files as a result
825                     # of repacking small blocks, so don't delete it when flushing
826                     # its owners, just do it after flushing them all.
827                     for owner in v.owner:
828                         owner.flush(sync=True)
829                     self.delete_bufferblock(k)
830
831     def block_prefetch(self, locator):
832         """Initiate a background download of a block.
833
834         This assumes that the underlying KeepClient implements a block cache,
835         so repeated requests for the same block will not result in repeated
836         downloads (unless the block is evicted from the cache.)  This method
837         does not block.
838
839         """
840
841         if not self.prefetch_enabled:
842             return
843
844         if self._keep.get_from_cache(locator) is not None:
845             return
846
847         with self.lock:
848             if locator in self._bufferblocks:
849                 return
850
851         self.start_get_threads()
852         self._prefetch_queue.put(locator)
853
854
855 class ArvadosFile(object):
856     """Represent a file in a Collection.
857
858     ArvadosFile manages the underlying representation of a file in Keep as a
859     sequence of segments spanning a set of blocks, and implements random
860     read/write access.
861
862     This object may be accessed from multiple threads.
863
864     """
865
866     __slots__ = ('parent', 'name', '_writers', '_committed',
867                  '_segments', 'lock', '_current_bblock', 'fuse_entry')
868
869     def __init__(self, parent, name, stream=[], segments=[]):
870         """
871         ArvadosFile constructor.
872
873         :stream:
874           a list of Range objects representing a block stream
875
876         :segments:
877           a list of Range objects representing segments
878         """
879         self.parent = parent
880         self.name = name
881         self._writers = set()
882         self._committed = False
883         self._segments = []
884         self.lock = parent.root_collection().lock
885         for s in segments:
886             self._add_segment(stream, s.locator, s.range_size)
887         self._current_bblock = None
888
889     def writable(self):
890         return self.parent.writable()
891
892     @synchronized
893     def permission_expired(self, as_of_dt=None):
894         """Returns True if any of the segment's locators is expired"""
895         for r in self._segments:
896             if KeepLocator(r.locator).permission_expired(as_of_dt):
897                 return True
898         return False
899
900     @synchronized
901     def has_remote_blocks(self):
902         """Returns True if any of the segment's locators has a +R signature"""
903
904         for s in self._segments:
905             if '+R' in s.locator:
906                 return True
907         return False
908
909     @synchronized
910     def _copy_remote_blocks(self, remote_blocks={}):
911         """Ask Keep to copy remote blocks and point to their local copies.
912
913         This is called from the parent Collection.
914
915         :remote_blocks:
916             Shared cache of remote to local block mappings. This is used to avoid
917             doing extra work when blocks are shared by more than one file in
918             different subdirectories.
919         """
920
921         for s in self._segments:
922             if '+R' in s.locator:
923                 try:
924                     loc = remote_blocks[s.locator]
925                 except KeyError:
926                     loc = self.parent._my_keep().refresh_signature(s.locator)
927                     remote_blocks[s.locator] = loc
928                 s.locator = loc
929                 self.parent.set_committed(False)
930         return remote_blocks
931
932     @synchronized
933     def segments(self):
934         return copy.copy(self._segments)
935
936     @synchronized
937     def clone(self, new_parent, new_name):
938         """Make a copy of this file."""
939         cp = ArvadosFile(new_parent, new_name)
940         cp.replace_contents(self)
941         return cp
942
943     @must_be_writable
944     @synchronized
945     def replace_contents(self, other):
946         """Replace segments of this file with segments from another `ArvadosFile` object."""
947
948         map_loc = {}
949         self._segments = []
950         for other_segment in other.segments():
951             new_loc = other_segment.locator
952             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
953                 if other_segment.locator not in map_loc:
954                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
955                     if bufferblock.state() != _BufferBlock.WRITABLE:
956                         map_loc[other_segment.locator] = bufferblock.locator()
957                     else:
958                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
959                 new_loc = map_loc[other_segment.locator]
960
961             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
962
963         self.set_committed(False)
964
965     def __eq__(self, other):
966         if other is self:
967             return True
968         if not isinstance(other, ArvadosFile):
969             return False
970
971         othersegs = other.segments()
972         with self.lock:
973             if len(self._segments) != len(othersegs):
974                 return False
975             for i in range(0, len(othersegs)):
976                 seg1 = self._segments[i]
977                 seg2 = othersegs[i]
978                 loc1 = seg1.locator
979                 loc2 = seg2.locator
980
981                 if self.parent._my_block_manager().is_bufferblock(loc1):
982                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
983
984                 if other.parent._my_block_manager().is_bufferblock(loc2):
985                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
986
987                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
988                     seg1.range_start != seg2.range_start or
989                     seg1.range_size != seg2.range_size or
990                     seg1.segment_offset != seg2.segment_offset):
991                     return False
992
993         return True
994
995     def __ne__(self, other):
996         return not self.__eq__(other)
997
998     @synchronized
999     def set_segments(self, segs):
1000         self._segments = segs
1001
1002     @synchronized
1003     def set_committed(self, value=True):
1004         """Set committed flag.
1005
1006         If value is True, set committed to be True.
1007
1008         If value is False, set committed to be False for this and all parents.
1009         """
1010         if value == self._committed:
1011             return
1012         self._committed = value
1013         if self._committed is False and self.parent is not None:
1014             self.parent.set_committed(False)
1015
1016     @synchronized
1017     def committed(self):
1018         """Get whether this is committed or not."""
1019         return self._committed
1020
1021     @synchronized
1022     def add_writer(self, writer):
1023         """Add an ArvadosFileWriter reference to the list of writers"""
1024         if isinstance(writer, ArvadosFileWriter):
1025             self._writers.add(writer)
1026
1027     @synchronized
1028     def remove_writer(self, writer, flush):
1029         """
1030         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
1031         and do some block maintenance tasks.
1032         """
1033         self._writers.remove(writer)
1034
1035         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1036             # File writer closed, not small enough for repacking
1037             self.flush()
1038         elif self.closed():
1039             # All writers closed and size is adequate for repacking
1040             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1041
1042     def closed(self):
1043         """
1044         Get whether this is closed or not. When the writers list is empty, the file
1045         is supposed to be closed.
1046         """
1047         return len(self._writers) == 0
1048
1049     @must_be_writable
1050     @synchronized
1051     def truncate(self, size):
1052         """Shrink or expand the size of the file.
1053
1054         If `size` is less than the size of the file, the file contents after
1055         `size` will be discarded.  If `size` is greater than the current size
1056         of the file, it will be filled with zero bytes.
1057
1058         """
1059         if size < self.size():
1060             new_segs = []
1061             for r in self._segments:
1062                 range_end = r.range_start+r.range_size
1063                 if r.range_start >= size:
1064                     # segment is past the trucate size, all done
1065                     break
1066                 elif size < range_end:
1067                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1068                     nr.segment_offset = r.segment_offset
1069                     new_segs.append(nr)
1070                     break
1071                 else:
1072                     new_segs.append(r)
1073
1074             self._segments = new_segs
1075             self.set_committed(False)
1076         elif size > self.size():
1077             padding = self.parent._my_block_manager().get_padding_block()
1078             diff = size - self.size()
1079             while diff > config.KEEP_BLOCK_SIZE:
1080                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1081                 diff -= config.KEEP_BLOCK_SIZE
1082             if diff > 0:
1083                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1084             self.set_committed(False)
1085         else:
1086             # size == self.size()
1087             pass
1088
1089     def readfrom(self, offset, size, num_retries, exact=False):
1090         """Read up to `size` bytes from the file starting at `offset`.
1091
1092         :exact:
1093          If False (default), return less data than requested if the read
1094          crosses a block boundary and the next block isn't cached.  If True,
1095          only return less data than requested when hitting EOF.
1096         """
1097
1098         with self.lock:
1099             if size == 0 or offset >= self.size():
1100                 return b''
1101             readsegs = locators_and_ranges(self._segments, offset, size)
1102             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1103
1104         locs = set()
1105         data = []
1106         for lr in readsegs:
1107             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1108             if block:
1109                 blockview = memoryview(block)
1110                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1111                 locs.add(lr.locator)
1112             else:
1113                 break
1114
1115         for lr in prefetch:
1116             if lr.locator not in locs:
1117                 self.parent._my_block_manager().block_prefetch(lr.locator)
1118                 locs.add(lr.locator)
1119
1120         return b''.join(data)
1121
1122     @must_be_writable
1123     @synchronized
1124     def writeto(self, offset, data, num_retries):
1125         """Write `data` to the file starting at `offset`.
1126
1127         This will update existing bytes and/or extend the size of the file as
1128         necessary.
1129
1130         """
1131         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1132             data = data.encode()
1133         if len(data) == 0:
1134             return
1135
1136         if offset > self.size():
1137             self.truncate(offset)
1138
1139         if len(data) > config.KEEP_BLOCK_SIZE:
1140             # Chunk it up into smaller writes
1141             n = 0
1142             dataview = memoryview(data)
1143             while n < len(data):
1144                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1145                 n += config.KEEP_BLOCK_SIZE
1146             return
1147
1148         self.set_committed(False)
1149
1150         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1151             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1152
1153         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1154             self._current_bblock.repack_writes()
1155             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1156                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1157                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1158
1159         self._current_bblock.append(data)
1160
1161         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1162
1163         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1164
1165         return len(data)
1166
1167     @synchronized
1168     def flush(self, sync=True, num_retries=0):
1169         """Flush the current bufferblock to Keep.
1170
1171         :sync:
1172           If True, commit block synchronously, wait until buffer block has been written.
1173           If False, commit block asynchronously, return immediately after putting block into
1174           the keep put queue.
1175         """
1176         if self.committed():
1177             return
1178
1179         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1180             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1181                 self._current_bblock.repack_writes()
1182             if self._current_bblock.state() != _BufferBlock.DELETED:
1183                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1184
1185         if sync:
1186             to_delete = set()
1187             for s in self._segments:
1188                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1189                 if bb:
1190                     if bb.state() != _BufferBlock.COMMITTED:
1191                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1192                     to_delete.add(s.locator)
1193                     s.locator = bb.locator()
1194             for s in to_delete:
1195                 # Don't delete the bufferblock if it's owned by many files. It'll be
1196                 # deleted after all of its owners are flush()ed.
1197                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1198                     self.parent._my_block_manager().delete_bufferblock(s)
1199
1200         self.parent.notify(MOD, self.parent, self.name, (self, self))
1201
1202     @must_be_writable
1203     @synchronized
1204     def add_segment(self, blocks, pos, size):
1205         """Add a segment to the end of the file.
1206
1207         `pos` and `offset` reference a section of the stream described by
1208         `blocks` (a list of Range objects)
1209
1210         """
1211         self._add_segment(blocks, pos, size)
1212
1213     def _add_segment(self, blocks, pos, size):
1214         """Internal implementation of add_segment."""
1215         self.set_committed(False)
1216         for lr in locators_and_ranges(blocks, pos, size):
1217             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1218             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1219             self._segments.append(r)
1220
1221     @synchronized
1222     def size(self):
1223         """Get the file size."""
1224         if self._segments:
1225             n = self._segments[-1]
1226             return n.range_start + n.range_size
1227         else:
1228             return 0
1229
1230     @synchronized
1231     def manifest_text(self, stream_name=".", portable_locators=False,
1232                       normalize=False, only_committed=False):
1233         buf = ""
1234         filestream = []
1235         for segment in self._segments:
1236             loc = segment.locator
1237             if self.parent._my_block_manager().is_bufferblock(loc):
1238                 if only_committed:
1239                     continue
1240                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1241             if portable_locators:
1242                 loc = KeepLocator(loc).stripped()
1243             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1244                                  segment.segment_offset, segment.range_size))
1245         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1246         buf += "\n"
1247         return buf
1248
1249     @must_be_writable
1250     @synchronized
1251     def _reparent(self, newparent, newname):
1252         self.set_committed(False)
1253         self.flush(sync=True)
1254         self.parent.remove(self.name)
1255         self.parent = newparent
1256         self.name = newname
1257         self.lock = self.parent.root_collection().lock
1258
1259
1260 class ArvadosFileReader(ArvadosFileReaderBase):
1261     """Wraps ArvadosFile in a file-like object supporting reading only.
1262
1263     Be aware that this class is NOT thread safe as there is no locking around
1264     updating file pointer.
1265
1266     """
1267
1268     def __init__(self, arvadosfile, mode="r", num_retries=None):
1269         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1270         self.arvadosfile = arvadosfile
1271
1272     def size(self):
1273         return self.arvadosfile.size()
1274
1275     def stream_name(self):
1276         return self.arvadosfile.parent.stream_name()
1277
1278     def readinto(self, b):
1279         data = self.read(len(b))
1280         b[:len(data)] = data
1281         return len(data)
1282
1283     @_FileLikeObjectBase._before_close
1284     @retry_method
1285     def read(self, size=None, num_retries=None):
1286         """Read up to `size` bytes from the file and return the result.
1287
1288         Starts at the current file position.  If `size` is None, read the
1289         entire remainder of the file.
1290         """
1291         if size is None:
1292             data = []
1293             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1294             while rd:
1295                 data.append(rd)
1296                 self._filepos += len(rd)
1297                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1298             return b''.join(data)
1299         else:
1300             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1301             self._filepos += len(data)
1302             return data
1303
1304     @_FileLikeObjectBase._before_close
1305     @retry_method
1306     def readfrom(self, offset, size, num_retries=None):
1307         """Read up to `size` bytes from the stream, starting at the specified file offset.
1308
1309         This method does not change the file position.
1310         """
1311         return self.arvadosfile.readfrom(offset, size, num_retries)
1312
1313     def flush(self):
1314         pass
1315
1316
1317 class ArvadosFileWriter(ArvadosFileReader):
1318     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1319
1320     Be aware that this class is NOT thread safe as there is no locking around
1321     updating file pointer.
1322
1323     """
1324
1325     def __init__(self, arvadosfile, mode, num_retries=None):
1326         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1327         self.arvadosfile.add_writer(self)
1328
1329     def writable(self):
1330         return True
1331
1332     @_FileLikeObjectBase._before_close
1333     @retry_method
1334     def write(self, data, num_retries=None):
1335         if self.mode[0] == "a":
1336             self._filepos = self.size()
1337         self.arvadosfile.writeto(self._filepos, data, num_retries)
1338         self._filepos += len(data)
1339         return len(data)
1340
1341     @_FileLikeObjectBase._before_close
1342     @retry_method
1343     def writelines(self, seq, num_retries=None):
1344         for s in seq:
1345             self.write(s, num_retries=num_retries)
1346
1347     @_FileLikeObjectBase._before_close
1348     def truncate(self, size=None):
1349         if size is None:
1350             size = self._filepos
1351         self.arvadosfile.truncate(size)
1352
1353     @_FileLikeObjectBase._before_close
1354     def flush(self):
1355         self.arvadosfile.flush()
1356
1357     def close(self, flush=True):
1358         if not self.closed:
1359             self.arvadosfile.remove_writer(self, flush)
1360             super(ArvadosFileWriter, self).close()
1361
1362
1363 class WrappableFile(object):
1364     """An interface to an Arvados file that's compatible with io wrappers.
1365
1366     """
1367     def __init__(self, f):
1368         self.f = f
1369         self.closed = False
1370     def close(self):
1371         self.closed = True
1372         return self.f.close()
1373     def flush(self):
1374         return self.f.flush()
1375     def read(self, *args, **kwargs):
1376         return self.f.read(*args, **kwargs)
1377     def readable(self):
1378         return self.f.readable()
1379     def readinto(self, *args, **kwargs):
1380         return self.f.readinto(*args, **kwargs)
1381     def seek(self, *args, **kwargs):
1382         return self.f.seek(*args, **kwargs)
1383     def seekable(self):
1384         return self.f.seekable()
1385     def tell(self):
1386         return self.f.tell()
1387     def writable(self):
1388         return self.f.writable()
1389     def write(self, *args, **kwargs):
1390         return self.f.write(*args, **kwargs)