11308: Fix arvfile append mode: write() changes the file pointer.
[arvados.git] / sdk / python / arvados / arvfile.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 from future.utils import listitems, listvalues
5 standard_library.install_aliases()
6 from builtins import range
7 from builtins import object
8 import bz2
9 import collections
10 import copy
11 import errno
12 import functools
13 import hashlib
14 import logging
15 import os
16 import queue
17 import re
18 import sys
19 import threading
20 import uuid
21 import zlib
22
23 from . import config
24 from .errors import KeepWriteError, AssertionError, ArgumentError
25 from .keep import KeepLocator
26 from ._normalize_stream import normalize_stream
27 from ._ranges import locators_and_ranges, replace_range, Range
28 from .retry import retry_method
29
30 MOD = "mod"
31 WRITE = "write"
32
33 _logger = logging.getLogger('arvados.arvfile')
34
35 def split(path):
36     """split(path) -> streamname, filename
37
38     Separate the stream name and file name in a /-separated stream path and
39     return a tuple (stream_name, file_name).  If no stream name is available,
40     assume '.'.
41
42     """
43     try:
44         stream_name, file_name = path.rsplit('/', 1)
45     except ValueError:  # No / in string
46         stream_name, file_name = '.', path
47     return stream_name, file_name
48
49
50 class UnownedBlockError(Exception):
51     """Raised when there's an writable block without an owner on the BlockManager."""
52     pass
53
54
55 class _FileLikeObjectBase(object):
56     def __init__(self, name, mode):
57         self.name = name
58         self.mode = mode
59         self.closed = False
60
61     @staticmethod
62     def _before_close(orig_func):
63         @functools.wraps(orig_func)
64         def before_close_wrapper(self, *args, **kwargs):
65             if self.closed:
66                 raise ValueError("I/O operation on closed stream file")
67             return orig_func(self, *args, **kwargs)
68         return before_close_wrapper
69
70     def __enter__(self):
71         return self
72
73     def __exit__(self, exc_type, exc_value, traceback):
74         try:
75             self.close()
76         except Exception:
77             if exc_type is None:
78                 raise
79
80     def close(self):
81         self.closed = True
82
83
84 class ArvadosFileReaderBase(_FileLikeObjectBase):
85     def __init__(self, name, mode, num_retries=None):
86         super(ArvadosFileReaderBase, self).__init__(name, mode)
87         self._binary = 'b' in mode
88         if sys.version_info >= (3, 0) and not self._binary:
89             raise NotImplementedError("text mode {!r} is not implemented".format(mode))
90         self._filepos = 0
91         self.num_retries = num_retries
92         self._readline_cache = (None, None)
93
94     def __iter__(self):
95         while True:
96             data = self.readline()
97             if not data:
98                 break
99             yield data
100
101     def decompressed_name(self):
102         return re.sub('\.(bz2|gz)$', '', self.name)
103
104     @_FileLikeObjectBase._before_close
105     def seek(self, pos, whence=os.SEEK_SET):
106         if whence == os.SEEK_CUR:
107             pos += self._filepos
108         elif whence == os.SEEK_END:
109             pos += self.size()
110         if pos < 0:
111             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
112         self._filepos = pos
113         return self._filepos
114
115     def tell(self):
116         return self._filepos
117
118     def readable(self):
119         return True
120
121     def writable(self):
122         return False
123
124     def seekable(self):
125         return True
126
127     @_FileLikeObjectBase._before_close
128     @retry_method
129     def readall(self, size=2**20, num_retries=None):
130         while True:
131             data = self.read(size, num_retries=num_retries)
132             if len(data) == 0:
133                 break
134             yield data
135
136     @_FileLikeObjectBase._before_close
137     @retry_method
138     def readline(self, size=float('inf'), num_retries=None):
139         cache_pos, cache_data = self._readline_cache
140         if self.tell() == cache_pos:
141             data = [cache_data]
142             self._filepos += len(cache_data)
143         else:
144             data = [b'']
145         data_size = len(data[-1])
146         while (data_size < size) and (b'\n' not in data[-1]):
147             next_read = self.read(2 ** 20, num_retries=num_retries)
148             if not next_read:
149                 break
150             data.append(next_read)
151             data_size += len(next_read)
152         data = b''.join(data)
153         try:
154             nextline_index = data.index(b'\n') + 1
155         except ValueError:
156             nextline_index = len(data)
157         nextline_index = min(nextline_index, size)
158         self._filepos -= len(data) - nextline_index
159         self._readline_cache = (self.tell(), data[nextline_index:])
160         return data[:nextline_index].decode()
161
162     @_FileLikeObjectBase._before_close
163     @retry_method
164     def decompress(self, decompress, size, num_retries=None):
165         for segment in self.readall(size, num_retries=num_retries):
166             data = decompress(segment)
167             if data:
168                 yield data
169
170     @_FileLikeObjectBase._before_close
171     @retry_method
172     def readall_decompressed(self, size=2**20, num_retries=None):
173         self.seek(0)
174         if self.name.endswith('.bz2'):
175             dc = bz2.BZ2Decompressor()
176             return self.decompress(dc.decompress, size,
177                                    num_retries=num_retries)
178         elif self.name.endswith('.gz'):
179             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
180             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
181                                    size, num_retries=num_retries)
182         else:
183             return self.readall(size, num_retries=num_retries)
184
185     @_FileLikeObjectBase._before_close
186     @retry_method
187     def readlines(self, sizehint=float('inf'), num_retries=None):
188         data = []
189         data_size = 0
190         for s in self.readall(num_retries=num_retries):
191             data.append(s)
192             data_size += len(s)
193             if data_size >= sizehint:
194                 break
195         return b''.join(data).decode().splitlines(True)
196
197     def size(self):
198         raise IOError(errno.ENOSYS, "Not implemented")
199
200     def read(self, size, num_retries=None):
201         raise IOError(errno.ENOSYS, "Not implemented")
202
203     def readfrom(self, start, size, num_retries=None):
204         raise IOError(errno.ENOSYS, "Not implemented")
205
206
207 class StreamFileReader(ArvadosFileReaderBase):
208     class _NameAttribute(str):
209         # The Python file API provides a plain .name attribute.
210         # Older SDK provided a name() method.
211         # This class provides both, for maximum compatibility.
212         def __call__(self):
213             return self
214
215     def __init__(self, stream, segments, name):
216         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
217         self._stream = stream
218         self.segments = segments
219
220     def stream_name(self):
221         return self._stream.name()
222
223     def size(self):
224         n = self.segments[-1]
225         return n.range_start + n.range_size
226
227     @_FileLikeObjectBase._before_close
228     @retry_method
229     def read(self, size, num_retries=None):
230         """Read up to 'size' bytes from the stream, starting at the current file position"""
231         if size == 0:
232             return b''
233
234         data = b''
235         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
236         if available_chunks:
237             lr = available_chunks[0]
238             data = self._stream.readfrom(lr.locator+lr.segment_offset,
239                                          lr.segment_size,
240                                          num_retries=num_retries)
241
242         self._filepos += len(data)
243         return data
244
245     @_FileLikeObjectBase._before_close
246     @retry_method
247     def readfrom(self, start, size, num_retries=None):
248         """Read up to 'size' bytes from the stream, starting at 'start'"""
249         if size == 0:
250             return b''
251
252         data = []
253         for lr in locators_and_ranges(self.segments, start, size):
254             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
255                                               num_retries=num_retries))
256         return b''.join(data)
257
258     def as_manifest(self):
259         segs = []
260         for r in self.segments:
261             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
262         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
263
264
265 def synchronized(orig_func):
266     @functools.wraps(orig_func)
267     def synchronized_wrapper(self, *args, **kwargs):
268         with self.lock:
269             return orig_func(self, *args, **kwargs)
270     return synchronized_wrapper
271
272
273 class StateChangeError(Exception):
274     def __init__(self, message, state, nextstate):
275         super(StateChangeError, self).__init__(message)
276         self.state = state
277         self.nextstate = nextstate
278
279 class _BufferBlock(object):
280     """A stand-in for a Keep block that is in the process of being written.
281
282     Writers can append to it, get the size, and compute the Keep locator.
283     There are three valid states:
284
285     WRITABLE
286       Can append to block.
287
288     PENDING
289       Block is in the process of being uploaded to Keep, append is an error.
290
291     COMMITTED
292       The block has been written to Keep, its internal buffer has been
293       released, fetching the block will fetch it via keep client (since we
294       discarded the internal copy), and identifiers referring to the BufferBlock
295       can be replaced with the block locator.
296
297     """
298
299     WRITABLE = 0
300     PENDING = 1
301     COMMITTED = 2
302     ERROR = 3
303
304     def __init__(self, blockid, starting_capacity, owner):
305         """
306         :blockid:
307           the identifier for this block
308
309         :starting_capacity:
310           the initial buffer capacity
311
312         :owner:
313           ArvadosFile that owns this block
314
315         """
316         self.blockid = blockid
317         self.buffer_block = bytearray(starting_capacity)
318         self.buffer_view = memoryview(self.buffer_block)
319         self.write_pointer = 0
320         self._state = _BufferBlock.WRITABLE
321         self._locator = None
322         self.owner = owner
323         self.lock = threading.Lock()
324         self.wait_for_commit = threading.Event()
325         self.error = None
326
327     @synchronized
328     def append(self, data):
329         """Append some data to the buffer.
330
331         Only valid if the block is in WRITABLE state.  Implements an expanding
332         buffer, doubling capacity as needed to accomdate all the data.
333
334         """
335         if self._state == _BufferBlock.WRITABLE:
336             if not isinstance(data, bytes) and not isinstance(data, memoryview):
337                 data = data.encode()
338             while (self.write_pointer+len(data)) > len(self.buffer_block):
339                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
340                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
341                 self.buffer_block = new_buffer_block
342                 self.buffer_view = memoryview(self.buffer_block)
343             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
344             self.write_pointer += len(data)
345             self._locator = None
346         else:
347             raise AssertionError("Buffer block is not writable")
348
349     STATE_TRANSITIONS = frozenset([
350             (WRITABLE, PENDING),
351             (PENDING, COMMITTED),
352             (PENDING, ERROR),
353             (ERROR, PENDING)])
354
355     @synchronized
356     def set_state(self, nextstate, val=None):
357         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
358             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
359         self._state = nextstate
360
361         if self._state == _BufferBlock.PENDING:
362             self.wait_for_commit.clear()
363
364         if self._state == _BufferBlock.COMMITTED:
365             self._locator = val
366             self.buffer_view = None
367             self.buffer_block = None
368             self.wait_for_commit.set()
369
370         if self._state == _BufferBlock.ERROR:
371             self.error = val
372             self.wait_for_commit.set()
373
374     @synchronized
375     def state(self):
376         return self._state
377
378     def size(self):
379         """The amount of data written to the buffer."""
380         return self.write_pointer
381
382     @synchronized
383     def locator(self):
384         """The Keep locator for this buffer's contents."""
385         if self._locator is None:
386             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
387         return self._locator
388
389     @synchronized
390     def clone(self, new_blockid, owner):
391         if self._state == _BufferBlock.COMMITTED:
392             raise AssertionError("Cannot duplicate committed buffer block")
393         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
394         bufferblock.append(self.buffer_view[0:self.size()])
395         return bufferblock
396
397     @synchronized
398     def clear(self):
399         self.owner = None
400         self.buffer_block = None
401         self.buffer_view = None
402
403
404 class NoopLock(object):
405     def __enter__(self):
406         return self
407
408     def __exit__(self, exc_type, exc_value, traceback):
409         pass
410
411     def acquire(self, blocking=False):
412         pass
413
414     def release(self):
415         pass
416
417
418 def must_be_writable(orig_func):
419     @functools.wraps(orig_func)
420     def must_be_writable_wrapper(self, *args, **kwargs):
421         if not self.writable():
422             raise IOError(errno.EROFS, "Collection is read-only.")
423         return orig_func(self, *args, **kwargs)
424     return must_be_writable_wrapper
425
426
427 class _BlockManager(object):
428     """BlockManager handles buffer blocks.
429
430     Also handles background block uploads, and background block prefetch for a
431     Collection of ArvadosFiles.
432
433     """
434
435     DEFAULT_PUT_THREADS = 2
436     DEFAULT_GET_THREADS = 2
437
438     def __init__(self, keep, copies=None, put_threads=None):
439         """keep: KeepClient object to use"""
440         self._keep = keep
441         self._bufferblocks = collections.OrderedDict()
442         self._put_queue = None
443         self._put_threads = None
444         self._prefetch_queue = None
445         self._prefetch_threads = None
446         self.lock = threading.Lock()
447         self.prefetch_enabled = True
448         if put_threads:
449             self.num_put_threads = put_threads
450         else:
451             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
452         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
453         self.copies = copies
454         self._pending_write_size = 0
455         self.threads_lock = threading.Lock()
456         self.padding_block = None
457
458     @synchronized
459     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
460         """Allocate a new, empty bufferblock in WRITABLE state and return it.
461
462         :blockid:
463           optional block identifier, otherwise one will be automatically assigned
464
465         :starting_capacity:
466           optional capacity, otherwise will use default capacity
467
468         :owner:
469           ArvadosFile that owns this block
470
471         """
472         return self._alloc_bufferblock(blockid, starting_capacity, owner)
473
474     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
475         if blockid is None:
476             blockid = "%s" % uuid.uuid4()
477         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
478         self._bufferblocks[bufferblock.blockid] = bufferblock
479         return bufferblock
480
481     @synchronized
482     def dup_block(self, block, owner):
483         """Create a new bufferblock initialized with the content of an existing bufferblock.
484
485         :block:
486           the buffer block to copy.
487
488         :owner:
489           ArvadosFile that owns the new block
490
491         """
492         new_blockid = "bufferblock%i" % len(self._bufferblocks)
493         bufferblock = block.clone(new_blockid, owner)
494         self._bufferblocks[bufferblock.blockid] = bufferblock
495         return bufferblock
496
497     @synchronized
498     def is_bufferblock(self, locator):
499         return locator in self._bufferblocks
500
501     def _commit_bufferblock_worker(self):
502         """Background uploader thread."""
503
504         while True:
505             try:
506                 bufferblock = self._put_queue.get()
507                 if bufferblock is None:
508                     return
509
510                 if self.copies is None:
511                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
512                 else:
513                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
514                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
515
516             except Exception as e:
517                 bufferblock.set_state(_BufferBlock.ERROR, e)
518             finally:
519                 if self._put_queue is not None:
520                     self._put_queue.task_done()
521
522     def start_put_threads(self):
523         with self.threads_lock:
524             if self._put_threads is None:
525                 # Start uploader threads.
526
527                 # If we don't limit the Queue size, the upload queue can quickly
528                 # grow to take up gigabytes of RAM if the writing process is
529                 # generating data more quickly than it can be send to the Keep
530                 # servers.
531                 #
532                 # With two upload threads and a queue size of 2, this means up to 4
533                 # blocks pending.  If they are full 64 MiB blocks, that means up to
534                 # 256 MiB of internal buffering, which is the same size as the
535                 # default download block cache in KeepClient.
536                 self._put_queue = queue.Queue(maxsize=2)
537
538                 self._put_threads = []
539                 for i in range(0, self.num_put_threads):
540                     thread = threading.Thread(target=self._commit_bufferblock_worker)
541                     self._put_threads.append(thread)
542                     thread.daemon = True
543                     thread.start()
544
545     def _block_prefetch_worker(self):
546         """The background downloader thread."""
547         while True:
548             try:
549                 b = self._prefetch_queue.get()
550                 if b is None:
551                     return
552                 self._keep.get(b)
553             except Exception:
554                 _logger.exception("Exception doing block prefetch")
555
556     @synchronized
557     def start_get_threads(self):
558         if self._prefetch_threads is None:
559             self._prefetch_queue = queue.Queue()
560             self._prefetch_threads = []
561             for i in range(0, self.num_get_threads):
562                 thread = threading.Thread(target=self._block_prefetch_worker)
563                 self._prefetch_threads.append(thread)
564                 thread.daemon = True
565                 thread.start()
566
567
568     @synchronized
569     def stop_threads(self):
570         """Shut down and wait for background upload and download threads to finish."""
571
572         if self._put_threads is not None:
573             for t in self._put_threads:
574                 self._put_queue.put(None)
575             for t in self._put_threads:
576                 t.join()
577         self._put_threads = None
578         self._put_queue = None
579
580         if self._prefetch_threads is not None:
581             for t in self._prefetch_threads:
582                 self._prefetch_queue.put(None)
583             for t in self._prefetch_threads:
584                 t.join()
585         self._prefetch_threads = None
586         self._prefetch_queue = None
587
588     def __enter__(self):
589         return self
590
591     def __exit__(self, exc_type, exc_value, traceback):
592         self.stop_threads()
593
594     @synchronized
595     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
596         """Packs small blocks together before uploading"""
597         self._pending_write_size += closed_file_size
598
599         # Check if there are enough small blocks for filling up one in full
600         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
601
602             # Search blocks ready for getting packed together before being committed to Keep.
603             # A WRITABLE block always has an owner.
604             # A WRITABLE block with its owner.closed() implies that it's
605             # size is <= KEEP_BLOCK_SIZE/2.
606             try:
607                 small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
608             except AttributeError:
609                 # Writable blocks without owner shouldn't exist.
610                 raise UnownedBlockError()
611
612             if len(small_blocks) <= 1:
613                 # Not enough small blocks for repacking
614                 return
615
616             # Update the pending write size count with its true value, just in case
617             # some small file was opened, written and closed several times.
618             self._pending_write_size = sum([b.size() for b in small_blocks])
619             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
620                 return
621
622             new_bb = self._alloc_bufferblock()
623             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
624                 bb = small_blocks.pop(0)
625                 arvfile = bb.owner
626                 self._pending_write_size -= bb.size()
627                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
628                 arvfile.set_segments([Range(new_bb.blockid,
629                                             0,
630                                             bb.size(),
631                                             new_bb.write_pointer - bb.size())])
632                 self._delete_bufferblock(bb.blockid)
633             self.commit_bufferblock(new_bb, sync=sync)
634
635     def commit_bufferblock(self, block, sync):
636         """Initiate a background upload of a bufferblock.
637
638         :block:
639           The block object to upload
640
641         :sync:
642           If `sync` is True, upload the block synchronously.
643           If `sync` is False, upload the block asynchronously.  This will
644           return immediately unless the upload queue is at capacity, in
645           which case it will wait on an upload queue slot.
646
647         """
648         try:
649             # Mark the block as PENDING so to disallow any more appends.
650             block.set_state(_BufferBlock.PENDING)
651         except StateChangeError as e:
652             if e.state == _BufferBlock.PENDING:
653                 if sync:
654                     block.wait_for_commit.wait()
655                 else:
656                     return
657             if block.state() == _BufferBlock.COMMITTED:
658                 return
659             elif block.state() == _BufferBlock.ERROR:
660                 raise block.error
661             else:
662                 raise
663
664         if sync:
665             try:
666                 if self.copies is None:
667                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
668                 else:
669                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
670                 block.set_state(_BufferBlock.COMMITTED, loc)
671             except Exception as e:
672                 block.set_state(_BufferBlock.ERROR, e)
673                 raise
674         else:
675             self.start_put_threads()
676             self._put_queue.put(block)
677
678     @synchronized
679     def get_bufferblock(self, locator):
680         return self._bufferblocks.get(locator)
681
682     @synchronized
683     def get_padding_block(self):
684         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
685         when using truncate() to extend the size of a file.
686
687         For reference (and possible future optimization), the md5sum of the
688         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
689
690         """
691
692         if self.padding_block is None:
693             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
694             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
695             self.commit_bufferblock(self.padding_block, False)
696         return self.padding_block
697
698     @synchronized
699     def delete_bufferblock(self, locator):
700         self._delete_bufferblock(locator)
701
702     def _delete_bufferblock(self, locator):
703         bb = self._bufferblocks[locator]
704         bb.clear()
705         del self._bufferblocks[locator]
706
707     def get_block_contents(self, locator, num_retries, cache_only=False):
708         """Fetch a block.
709
710         First checks to see if the locator is a BufferBlock and return that, if
711         not, passes the request through to KeepClient.get().
712
713         """
714         with self.lock:
715             if locator in self._bufferblocks:
716                 bufferblock = self._bufferblocks[locator]
717                 if bufferblock.state() != _BufferBlock.COMMITTED:
718                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
719                 else:
720                     locator = bufferblock._locator
721         if cache_only:
722             return self._keep.get_from_cache(locator)
723         else:
724             return self._keep.get(locator, num_retries=num_retries)
725
726     def commit_all(self):
727         """Commit all outstanding buffer blocks.
728
729         This is a synchronous call, and will not return until all buffer blocks
730         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
731
732         """
733         self.repack_small_blocks(force=True, sync=True)
734
735         with self.lock:
736             items = listitems(self._bufferblocks)
737
738         for k,v in items:
739             if v.state() != _BufferBlock.COMMITTED and v.owner:
740                 v.owner.flush(sync=False)
741
742         with self.lock:
743             if self._put_queue is not None:
744                 self._put_queue.join()
745
746                 err = []
747                 for k,v in items:
748                     if v.state() == _BufferBlock.ERROR:
749                         err.append((v.locator(), v.error))
750                 if err:
751                     raise KeepWriteError("Error writing some blocks", err, label="block")
752
753         for k,v in items:
754             # flush again with sync=True to remove committed bufferblocks from
755             # the segments.
756             if v.owner:
757                 v.owner.flush(sync=True)
758
759     def block_prefetch(self, locator):
760         """Initiate a background download of a block.
761
762         This assumes that the underlying KeepClient implements a block cache,
763         so repeated requests for the same block will not result in repeated
764         downloads (unless the block is evicted from the cache.)  This method
765         does not block.
766
767         """
768
769         if not self.prefetch_enabled:
770             return
771
772         if self._keep.get_from_cache(locator) is not None:
773             return
774
775         with self.lock:
776             if locator in self._bufferblocks:
777                 return
778
779         self.start_get_threads()
780         self._prefetch_queue.put(locator)
781
782
783 class ArvadosFile(object):
784     """Represent a file in a Collection.
785
786     ArvadosFile manages the underlying representation of a file in Keep as a
787     sequence of segments spanning a set of blocks, and implements random
788     read/write access.
789
790     This object may be accessed from multiple threads.
791
792     """
793
794     def __init__(self, parent, name, stream=[], segments=[]):
795         """
796         ArvadosFile constructor.
797
798         :stream:
799           a list of Range objects representing a block stream
800
801         :segments:
802           a list of Range objects representing segments
803         """
804         self.parent = parent
805         self.name = name
806         self._writers = set()
807         self._committed = False
808         self._segments = []
809         self.lock = parent.root_collection().lock
810         for s in segments:
811             self._add_segment(stream, s.locator, s.range_size)
812         self._current_bblock = None
813
814     def writable(self):
815         return self.parent.writable()
816
817     @synchronized
818     def permission_expired(self, as_of_dt=None):
819         """Returns True if any of the segment's locators is expired"""
820         for r in self._segments:
821             if KeepLocator(r.locator).permission_expired(as_of_dt):
822                 return True
823         return False
824
825     @synchronized
826     def segments(self):
827         return copy.copy(self._segments)
828
829     @synchronized
830     def clone(self, new_parent, new_name):
831         """Make a copy of this file."""
832         cp = ArvadosFile(new_parent, new_name)
833         cp.replace_contents(self)
834         return cp
835
836     @must_be_writable
837     @synchronized
838     def replace_contents(self, other):
839         """Replace segments of this file with segments from another `ArvadosFile` object."""
840
841         map_loc = {}
842         self._segments = []
843         for other_segment in other.segments():
844             new_loc = other_segment.locator
845             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
846                 if other_segment.locator not in map_loc:
847                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
848                     if bufferblock.state() != _BufferBlock.WRITABLE:
849                         map_loc[other_segment.locator] = bufferblock.locator()
850                     else:
851                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
852                 new_loc = map_loc[other_segment.locator]
853
854             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
855
856         self.set_committed(False)
857
858     def __eq__(self, other):
859         if other is self:
860             return True
861         if not isinstance(other, ArvadosFile):
862             return False
863
864         othersegs = other.segments()
865         with self.lock:
866             if len(self._segments) != len(othersegs):
867                 return False
868             for i in range(0, len(othersegs)):
869                 seg1 = self._segments[i]
870                 seg2 = othersegs[i]
871                 loc1 = seg1.locator
872                 loc2 = seg2.locator
873
874                 if self.parent._my_block_manager().is_bufferblock(loc1):
875                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
876
877                 if other.parent._my_block_manager().is_bufferblock(loc2):
878                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
879
880                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
881                     seg1.range_start != seg2.range_start or
882                     seg1.range_size != seg2.range_size or
883                     seg1.segment_offset != seg2.segment_offset):
884                     return False
885
886         return True
887
888     def __ne__(self, other):
889         return not self.__eq__(other)
890
891     @synchronized
892     def set_segments(self, segs):
893         self._segments = segs
894
895     @synchronized
896     def set_committed(self, value=True):
897         """Set committed flag.
898
899         If value is True, set committed to be True.
900
901         If value is False, set committed to be False for this and all parents.
902         """
903         if value == self._committed:
904             return
905         self._committed = value
906         if self._committed is False and self.parent is not None:
907             self.parent.set_committed(False)
908
909     @synchronized
910     def committed(self):
911         """Get whether this is committed or not."""
912         return self._committed
913
914     @synchronized
915     def add_writer(self, writer):
916         """Add an ArvadosFileWriter reference to the list of writers"""
917         if isinstance(writer, ArvadosFileWriter):
918             self._writers.add(writer)
919
920     @synchronized
921     def remove_writer(self, writer, flush):
922         """
923         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
924         and do some block maintenance tasks.
925         """
926         self._writers.remove(writer)
927
928         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
929             # File writer closed, not small enough for repacking
930             self.flush()
931         elif self.closed():
932             # All writers closed and size is adequate for repacking
933             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
934
935     def closed(self):
936         """
937         Get whether this is closed or not. When the writers list is empty, the file
938         is supposed to be closed.
939         """
940         return len(self._writers) == 0
941
942     @must_be_writable
943     @synchronized
944     def truncate(self, size):
945         """Shrink or expand the size of the file.
946
947         If `size` is less than the size of the file, the file contents after
948         `size` will be discarded.  If `size` is greater than the current size
949         of the file, it will be filled with zero bytes.
950
951         """
952         if size < self.size():
953             new_segs = []
954             for r in self._segments:
955                 range_end = r.range_start+r.range_size
956                 if r.range_start >= size:
957                     # segment is past the trucate size, all done
958                     break
959                 elif size < range_end:
960                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
961                     nr.segment_offset = r.segment_offset
962                     new_segs.append(nr)
963                     break
964                 else:
965                     new_segs.append(r)
966
967             self._segments = new_segs
968             self.set_committed(False)
969         elif size > self.size():
970             padding = self.parent._my_block_manager().get_padding_block()
971             diff = size - self.size()
972             while diff > config.KEEP_BLOCK_SIZE:
973                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
974                 diff -= config.KEEP_BLOCK_SIZE
975             if diff > 0:
976                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
977             self.set_committed(False)
978         else:
979             # size == self.size()
980             pass
981
982     def readfrom(self, offset, size, num_retries, exact=False):
983         """Read up to `size` bytes from the file starting at `offset`.
984
985         :exact:
986          If False (default), return less data than requested if the read
987          crosses a block boundary and the next block isn't cached.  If True,
988          only return less data than requested when hitting EOF.
989         """
990
991         with self.lock:
992             if size == 0 or offset >= self.size():
993                 return b''
994             readsegs = locators_and_ranges(self._segments, offset, size)
995             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
996
997         locs = set()
998         data = []
999         for lr in readsegs:
1000             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1001             if block:
1002                 blockview = memoryview(block)
1003                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1004                 locs.add(lr.locator)
1005             else:
1006                 break
1007
1008         for lr in prefetch:
1009             if lr.locator not in locs:
1010                 self.parent._my_block_manager().block_prefetch(lr.locator)
1011                 locs.add(lr.locator)
1012
1013         return b''.join(data)
1014
1015     def _repack_writes(self, num_retries):
1016         """Optimize buffer block by repacking segments in file sequence.
1017
1018         When the client makes random writes, they appear in the buffer block in
1019         the sequence they were written rather than the sequence they appear in
1020         the file.  This makes for inefficient, fragmented manifests.  Attempt
1021         to optimize by repacking writes in file sequence.
1022
1023         """
1024         segs = self._segments
1025
1026         # Collect the segments that reference the buffer block.
1027         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1028
1029         # Collect total data referenced by segments (could be smaller than
1030         # bufferblock size if a portion of the file was written and
1031         # then overwritten).
1032         write_total = sum([s.range_size for s in bufferblock_segs])
1033
1034         if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
1035             # If there's more than one segment referencing this block, it is
1036             # due to out-of-order writes and will produce a fragmented
1037             # manifest, so try to optimize by re-packing into a new buffer.
1038             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1039             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1040             for t in bufferblock_segs:
1041                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1042                 t.segment_offset = new_bb.size() - t.range_size
1043
1044             self._current_bblock = new_bb
1045
1046     @must_be_writable
1047     @synchronized
1048     def writeto(self, offset, data, num_retries):
1049         """Write `data` to the file starting at `offset`.
1050
1051         This will update existing bytes and/or extend the size of the file as
1052         necessary.
1053
1054         """
1055         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1056             data = data.encode()
1057         if len(data) == 0:
1058             return
1059
1060         if offset > self.size():
1061             self.truncate(offset)
1062
1063         if len(data) > config.KEEP_BLOCK_SIZE:
1064             # Chunk it up into smaller writes
1065             n = 0
1066             dataview = memoryview(data)
1067             while n < len(data):
1068                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1069                 n += config.KEEP_BLOCK_SIZE
1070             return
1071
1072         self.set_committed(False)
1073
1074         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1075             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1076
1077         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1078             self._repack_writes(num_retries)
1079             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1080                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1081                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1082
1083         self._current_bblock.append(data)
1084
1085         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1086
1087         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1088
1089         return len(data)
1090
1091     @synchronized
1092     def flush(self, sync=True, num_retries=0):
1093         """Flush the current bufferblock to Keep.
1094
1095         :sync:
1096           If True, commit block synchronously, wait until buffer block has been written.
1097           If False, commit block asynchronously, return immediately after putting block into
1098           the keep put queue.
1099         """
1100         if self.committed():
1101             return
1102
1103         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1104             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1105                 self._repack_writes(num_retries)
1106             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1107
1108         if sync:
1109             to_delete = set()
1110             for s in self._segments:
1111                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1112                 if bb:
1113                     if bb.state() != _BufferBlock.COMMITTED:
1114                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1115                     to_delete.add(s.locator)
1116                     s.locator = bb.locator()
1117             for s in to_delete:
1118                self.parent._my_block_manager().delete_bufferblock(s)
1119
1120         self.parent.notify(MOD, self.parent, self.name, (self, self))
1121
1122     @must_be_writable
1123     @synchronized
1124     def add_segment(self, blocks, pos, size):
1125         """Add a segment to the end of the file.
1126
1127         `pos` and `offset` reference a section of the stream described by
1128         `blocks` (a list of Range objects)
1129
1130         """
1131         self._add_segment(blocks, pos, size)
1132
1133     def _add_segment(self, blocks, pos, size):
1134         """Internal implementation of add_segment."""
1135         self.set_committed(False)
1136         for lr in locators_and_ranges(blocks, pos, size):
1137             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1138             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1139             self._segments.append(r)
1140
1141     @synchronized
1142     def size(self):
1143         """Get the file size."""
1144         if self._segments:
1145             n = self._segments[-1]
1146             return n.range_start + n.range_size
1147         else:
1148             return 0
1149
1150     @synchronized
1151     def manifest_text(self, stream_name=".", portable_locators=False,
1152                       normalize=False, only_committed=False):
1153         buf = ""
1154         filestream = []
1155         for segment in self.segments:
1156             loc = segment.locator
1157             if self.parent._my_block_manager().is_bufferblock(loc):
1158                 if only_committed:
1159                     continue
1160                 loc = self._bufferblocks[loc].calculate_locator()
1161             if portable_locators:
1162                 loc = KeepLocator(loc).stripped()
1163             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1164                                  segment.segment_offset, segment.range_size))
1165         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1166         buf += "\n"
1167         return buf
1168
1169     @must_be_writable
1170     @synchronized
1171     def _reparent(self, newparent, newname):
1172         self.set_committed(False)
1173         self.flush(sync=True)
1174         self.parent.remove(self.name)
1175         self.parent = newparent
1176         self.name = newname
1177         self.lock = self.parent.root_collection().lock
1178
1179
1180 class ArvadosFileReader(ArvadosFileReaderBase):
1181     """Wraps ArvadosFile in a file-like object supporting reading only.
1182
1183     Be aware that this class is NOT thread safe as there is no locking around
1184     updating file pointer.
1185
1186     """
1187
1188     def __init__(self, arvadosfile, mode="r", num_retries=None):
1189         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1190         self.arvadosfile = arvadosfile
1191
1192     def size(self):
1193         return self.arvadosfile.size()
1194
1195     def stream_name(self):
1196         return self.arvadosfile.parent.stream_name()
1197
1198     @_FileLikeObjectBase._before_close
1199     @retry_method
1200     def read(self, size=None, num_retries=None):
1201         """Read up to `size` bytes from the file and return the result.
1202
1203         Starts at the current file position.  If `size` is None, read the
1204         entire remainder of the file.
1205         """
1206         if size is None:
1207             data = []
1208             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1209             while rd:
1210                 data.append(rd)
1211                 self._filepos += len(rd)
1212                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1213             return b''.join(data)
1214         else:
1215             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1216             self._filepos += len(data)
1217             return data
1218
1219     @_FileLikeObjectBase._before_close
1220     @retry_method
1221     def readfrom(self, offset, size, num_retries=None):
1222         """Read up to `size` bytes from the stream, starting at the specified file offset.
1223
1224         This method does not change the file position.
1225         """
1226         return self.arvadosfile.readfrom(offset, size, num_retries)
1227
1228     def flush(self):
1229         pass
1230
1231
1232 class ArvadosFileWriter(ArvadosFileReader):
1233     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1234
1235     Be aware that this class is NOT thread safe as there is no locking around
1236     updating file pointer.
1237
1238     """
1239
1240     def __init__(self, arvadosfile, mode, num_retries=None):
1241         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1242         self.arvadosfile.add_writer(self)
1243
1244     def writable(self):
1245         return True
1246
1247     @_FileLikeObjectBase._before_close
1248     @retry_method
1249     def write(self, data, num_retries=None):
1250         if self.mode[0] == "a":
1251             self._filepos = self.size()
1252         self.arvadosfile.writeto(self._filepos, data, num_retries)
1253         self._filepos += len(data)
1254         return len(data)
1255
1256     @_FileLikeObjectBase._before_close
1257     @retry_method
1258     def writelines(self, seq, num_retries=None):
1259         for s in seq:
1260             self.write(s, num_retries=num_retries)
1261
1262     @_FileLikeObjectBase._before_close
1263     def truncate(self, size=None):
1264         if size is None:
1265             size = self._filepos
1266         self.arvadosfile.truncate(size)
1267
1268     @_FileLikeObjectBase._before_close
1269     def flush(self):
1270         self.arvadosfile.flush()
1271
1272     def close(self, flush=True):
1273         if not self.closed:
1274             self.arvadosfile.remove_writer(self, flush)
1275             super(ArvadosFileWriter, self).close()