8784: Fix test for latest firefox.
[arvados.git] / sdk / python / arvados / arvfile.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 from future.utils import listitems, listvalues
5 standard_library.install_aliases()
6 from builtins import range
7 from builtins import object
8 import bz2
9 import collections
10 import copy
11 import errno
12 import functools
13 import hashlib
14 import logging
15 import os
16 import queue
17 import re
18 import sys
19 import threading
20 import uuid
21 import zlib
22
23 from . import config
24 from .errors import KeepWriteError, AssertionError, ArgumentError
25 from .keep import KeepLocator
26 from ._normalize_stream import normalize_stream
27 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
28 from .retry import retry_method
29
30 MOD = "mod"
31 WRITE = "write"
32
33 _logger = logging.getLogger('arvados.arvfile')
34
35 def split(path):
36     """split(path) -> streamname, filename
37
38     Separate the stream name and file name in a /-separated stream path and
39     return a tuple (stream_name, file_name).  If no stream name is available,
40     assume '.'.
41
42     """
43     try:
44         stream_name, file_name = path.rsplit('/', 1)
45     except ValueError:  # No / in string
46         stream_name, file_name = '.', path
47     return stream_name, file_name
48
49
50 class UnownedBlockError(Exception):
51     """Raised when there's an writable block without an owner on the BlockManager."""
52     pass
53
54
55 class _FileLikeObjectBase(object):
56     def __init__(self, name, mode):
57         self.name = name
58         self.mode = mode
59         self.closed = False
60
61     @staticmethod
62     def _before_close(orig_func):
63         @functools.wraps(orig_func)
64         def before_close_wrapper(self, *args, **kwargs):
65             if self.closed:
66                 raise ValueError("I/O operation on closed stream file")
67             return orig_func(self, *args, **kwargs)
68         return before_close_wrapper
69
70     def __enter__(self):
71         return self
72
73     def __exit__(self, exc_type, exc_value, traceback):
74         try:
75             self.close()
76         except Exception:
77             if exc_type is None:
78                 raise
79
80     def close(self):
81         self.closed = True
82
83
84 class ArvadosFileReaderBase(_FileLikeObjectBase):
85     def __init__(self, name, mode, num_retries=None):
86         super(ArvadosFileReaderBase, self).__init__(name, mode)
87         self._binary = 'b' in mode
88         if sys.version_info >= (3, 0) and not self._binary:
89             raise NotImplementedError("text mode {!r} is not implemented".format(mode))
90         self._filepos = 0
91         self.num_retries = num_retries
92         self._readline_cache = (None, None)
93
94     def __iter__(self):
95         while True:
96             data = self.readline()
97             if not data:
98                 break
99             yield data
100
101     def decompressed_name(self):
102         return re.sub('\.(bz2|gz)$', '', self.name)
103
104     @_FileLikeObjectBase._before_close
105     def seek(self, pos, whence=os.SEEK_SET):
106         if whence == os.SEEK_CUR:
107             pos += self._filepos
108         elif whence == os.SEEK_END:
109             pos += self.size()
110         if pos < 0:
111             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
112         self._filepos = pos
113         return self._filepos
114
115     def tell(self):
116         return self._filepos
117
118     def readable(self):
119         return True
120
121     def writable(self):
122         return False
123
124     def seekable(self):
125         return True
126
127     @_FileLikeObjectBase._before_close
128     @retry_method
129     def readall(self, size=2**20, num_retries=None):
130         while True:
131             data = self.read(size, num_retries=num_retries)
132             if len(data) == 0:
133                 break
134             yield data
135
136     @_FileLikeObjectBase._before_close
137     @retry_method
138     def readline(self, size=float('inf'), num_retries=None):
139         cache_pos, cache_data = self._readline_cache
140         if self.tell() == cache_pos:
141             data = [cache_data]
142             self._filepos += len(cache_data)
143         else:
144             data = [b'']
145         data_size = len(data[-1])
146         while (data_size < size) and (b'\n' not in data[-1]):
147             next_read = self.read(2 ** 20, num_retries=num_retries)
148             if not next_read:
149                 break
150             data.append(next_read)
151             data_size += len(next_read)
152         data = b''.join(data)
153         try:
154             nextline_index = data.index(b'\n') + 1
155         except ValueError:
156             nextline_index = len(data)
157         nextline_index = min(nextline_index, size)
158         self._filepos -= len(data) - nextline_index
159         self._readline_cache = (self.tell(), data[nextline_index:])
160         return data[:nextline_index].decode()
161
162     @_FileLikeObjectBase._before_close
163     @retry_method
164     def decompress(self, decompress, size, num_retries=None):
165         for segment in self.readall(size, num_retries=num_retries):
166             data = decompress(segment)
167             if data:
168                 yield data
169
170     @_FileLikeObjectBase._before_close
171     @retry_method
172     def readall_decompressed(self, size=2**20, num_retries=None):
173         self.seek(0)
174         if self.name.endswith('.bz2'):
175             dc = bz2.BZ2Decompressor()
176             return self.decompress(dc.decompress, size,
177                                    num_retries=num_retries)
178         elif self.name.endswith('.gz'):
179             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
180             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
181                                    size, num_retries=num_retries)
182         else:
183             return self.readall(size, num_retries=num_retries)
184
185     @_FileLikeObjectBase._before_close
186     @retry_method
187     def readlines(self, sizehint=float('inf'), num_retries=None):
188         data = []
189         data_size = 0
190         for s in self.readall(num_retries=num_retries):
191             data.append(s)
192             data_size += len(s)
193             if data_size >= sizehint:
194                 break
195         return b''.join(data).decode().splitlines(True)
196
197     def size(self):
198         raise IOError(errno.ENOSYS, "Not implemented")
199
200     def read(self, size, num_retries=None):
201         raise IOError(errno.ENOSYS, "Not implemented")
202
203     def readfrom(self, start, size, num_retries=None):
204         raise IOError(errno.ENOSYS, "Not implemented")
205
206
207 class StreamFileReader(ArvadosFileReaderBase):
208     class _NameAttribute(str):
209         # The Python file API provides a plain .name attribute.
210         # Older SDK provided a name() method.
211         # This class provides both, for maximum compatibility.
212         def __call__(self):
213             return self
214
215     def __init__(self, stream, segments, name):
216         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
217         self._stream = stream
218         self.segments = segments
219
220     def stream_name(self):
221         return self._stream.name()
222
223     def size(self):
224         n = self.segments[-1]
225         return n.range_start + n.range_size
226
227     @_FileLikeObjectBase._before_close
228     @retry_method
229     def read(self, size, num_retries=None):
230         """Read up to 'size' bytes from the stream, starting at the current file position"""
231         if size == 0:
232             return b''
233
234         data = b''
235         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
236         if available_chunks:
237             lr = available_chunks[0]
238             data = self._stream.readfrom(lr.locator+lr.segment_offset,
239                                          lr.segment_size,
240                                          num_retries=num_retries)
241
242         self._filepos += len(data)
243         return data
244
245     @_FileLikeObjectBase._before_close
246     @retry_method
247     def readfrom(self, start, size, num_retries=None):
248         """Read up to 'size' bytes from the stream, starting at 'start'"""
249         if size == 0:
250             return b''
251
252         data = []
253         for lr in locators_and_ranges(self.segments, start, size):
254             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
255                                               num_retries=num_retries))
256         return b''.join(data)
257
258     def as_manifest(self):
259         segs = []
260         for r in self.segments:
261             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
262         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
263
264
265 def synchronized(orig_func):
266     @functools.wraps(orig_func)
267     def synchronized_wrapper(self, *args, **kwargs):
268         with self.lock:
269             return orig_func(self, *args, **kwargs)
270     return synchronized_wrapper
271
272
273 class StateChangeError(Exception):
274     def __init__(self, message, state, nextstate):
275         super(StateChangeError, self).__init__(message)
276         self.state = state
277         self.nextstate = nextstate
278
279 class _BufferBlock(object):
280     """A stand-in for a Keep block that is in the process of being written.
281
282     Writers can append to it, get the size, and compute the Keep locator.
283     There are three valid states:
284
285     WRITABLE
286       Can append to block.
287
288     PENDING
289       Block is in the process of being uploaded to Keep, append is an error.
290
291     COMMITTED
292       The block has been written to Keep, its internal buffer has been
293       released, fetching the block will fetch it via keep client (since we
294       discarded the internal copy), and identifiers referring to the BufferBlock
295       can be replaced with the block locator.
296
297     """
298
299     WRITABLE = 0
300     PENDING = 1
301     COMMITTED = 2
302     ERROR = 3
303     DELETED = 4
304
305     def __init__(self, blockid, starting_capacity, owner):
306         """
307         :blockid:
308           the identifier for this block
309
310         :starting_capacity:
311           the initial buffer capacity
312
313         :owner:
314           ArvadosFile that owns this block
315
316         """
317         self.blockid = blockid
318         self.buffer_block = bytearray(starting_capacity)
319         self.buffer_view = memoryview(self.buffer_block)
320         self.write_pointer = 0
321         self._state = _BufferBlock.WRITABLE
322         self._locator = None
323         self.owner = owner
324         self.lock = threading.Lock()
325         self.wait_for_commit = threading.Event()
326         self.error = None
327
328     @synchronized
329     def append(self, data):
330         """Append some data to the buffer.
331
332         Only valid if the block is in WRITABLE state.  Implements an expanding
333         buffer, doubling capacity as needed to accomdate all the data.
334
335         """
336         if self._state == _BufferBlock.WRITABLE:
337             if not isinstance(data, bytes) and not isinstance(data, memoryview):
338                 data = data.encode()
339             while (self.write_pointer+len(data)) > len(self.buffer_block):
340                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
341                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
342                 self.buffer_block = new_buffer_block
343                 self.buffer_view = memoryview(self.buffer_block)
344             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
345             self.write_pointer += len(data)
346             self._locator = None
347         else:
348             raise AssertionError("Buffer block is not writable")
349
350     STATE_TRANSITIONS = frozenset([
351             (WRITABLE, PENDING),
352             (PENDING, COMMITTED),
353             (PENDING, ERROR),
354             (ERROR, PENDING)])
355
356     @synchronized
357     def set_state(self, nextstate, val=None):
358         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
359             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
360         self._state = nextstate
361
362         if self._state == _BufferBlock.PENDING:
363             self.wait_for_commit.clear()
364
365         if self._state == _BufferBlock.COMMITTED:
366             self._locator = val
367             self.buffer_view = None
368             self.buffer_block = None
369             self.wait_for_commit.set()
370
371         if self._state == _BufferBlock.ERROR:
372             self.error = val
373             self.wait_for_commit.set()
374
375     @synchronized
376     def state(self):
377         return self._state
378
379     def size(self):
380         """The amount of data written to the buffer."""
381         return self.write_pointer
382
383     @synchronized
384     def locator(self):
385         """The Keep locator for this buffer's contents."""
386         if self._locator is None:
387             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
388         return self._locator
389
390     @synchronized
391     def clone(self, new_blockid, owner):
392         if self._state == _BufferBlock.COMMITTED:
393             raise AssertionError("Cannot duplicate committed buffer block")
394         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
395         bufferblock.append(self.buffer_view[0:self.size()])
396         return bufferblock
397
398     @synchronized
399     def clear(self):
400         self._state = _BufferBlock.DELETED
401         self.owner = None
402         self.buffer_block = None
403         self.buffer_view = None
404
405     @synchronized
406     def repack_writes(self):
407         """Optimize buffer block by repacking segments in file sequence.
408
409         When the client makes random writes, they appear in the buffer block in
410         the sequence they were written rather than the sequence they appear in
411         the file.  This makes for inefficient, fragmented manifests.  Attempt
412         to optimize by repacking writes in file sequence.
413
414         """
415         if self._state != _BufferBlock.WRITABLE:
416             raise AssertionError("Cannot repack non-writable block")
417
418         segs = self.owner.segments()
419
420         # Collect the segments that reference the buffer block.
421         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
422
423         # Collect total data referenced by segments (could be smaller than
424         # bufferblock size if a portion of the file was written and
425         # then overwritten).
426         write_total = sum([s.range_size for s in bufferblock_segs])
427
428         if write_total < self.size() or len(bufferblock_segs) > 1:
429             # If there's more than one segment referencing this block, it is
430             # due to out-of-order writes and will produce a fragmented
431             # manifest, so try to optimize by re-packing into a new buffer.
432             contents = self.buffer_view[0:self.write_pointer].tobytes()
433             new_bb = _BufferBlock(None, write_total, None)
434             for t in bufferblock_segs:
435                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
436                 t.segment_offset = new_bb.size() - t.range_size
437
438             self.buffer_block = new_bb.buffer_block
439             self.buffer_view = new_bb.buffer_view
440             self.write_pointer = new_bb.write_pointer
441             self._locator = None
442             new_bb.clear()
443             self.owner.set_segments(segs)
444
445     def __repr__(self):
446         return "<BufferBlock %s>" % (self.blockid)
447
448
449 class NoopLock(object):
450     def __enter__(self):
451         return self
452
453     def __exit__(self, exc_type, exc_value, traceback):
454         pass
455
456     def acquire(self, blocking=False):
457         pass
458
459     def release(self):
460         pass
461
462
463 def must_be_writable(orig_func):
464     @functools.wraps(orig_func)
465     def must_be_writable_wrapper(self, *args, **kwargs):
466         if not self.writable():
467             raise IOError(errno.EROFS, "Collection is read-only.")
468         return orig_func(self, *args, **kwargs)
469     return must_be_writable_wrapper
470
471
472 class _BlockManager(object):
473     """BlockManager handles buffer blocks.
474
475     Also handles background block uploads, and background block prefetch for a
476     Collection of ArvadosFiles.
477
478     """
479
480     DEFAULT_PUT_THREADS = 2
481     DEFAULT_GET_THREADS = 2
482
483     def __init__(self, keep, copies=None, put_threads=None):
484         """keep: KeepClient object to use"""
485         self._keep = keep
486         self._bufferblocks = collections.OrderedDict()
487         self._put_queue = None
488         self._put_threads = None
489         self._prefetch_queue = None
490         self._prefetch_threads = None
491         self.lock = threading.Lock()
492         self.prefetch_enabled = True
493         if put_threads:
494             self.num_put_threads = put_threads
495         else:
496             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
497         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
498         self.copies = copies
499         self._pending_write_size = 0
500         self.threads_lock = threading.Lock()
501         self.padding_block = None
502
503     @synchronized
504     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
505         """Allocate a new, empty bufferblock in WRITABLE state and return it.
506
507         :blockid:
508           optional block identifier, otherwise one will be automatically assigned
509
510         :starting_capacity:
511           optional capacity, otherwise will use default capacity
512
513         :owner:
514           ArvadosFile that owns this block
515
516         """
517         return self._alloc_bufferblock(blockid, starting_capacity, owner)
518
519     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
520         if blockid is None:
521             blockid = str(uuid.uuid4())
522         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
523         self._bufferblocks[bufferblock.blockid] = bufferblock
524         return bufferblock
525
526     @synchronized
527     def dup_block(self, block, owner):
528         """Create a new bufferblock initialized with the content of an existing bufferblock.
529
530         :block:
531           the buffer block to copy.
532
533         :owner:
534           ArvadosFile that owns the new block
535
536         """
537         new_blockid = str(uuid.uuid4())
538         bufferblock = block.clone(new_blockid, owner)
539         self._bufferblocks[bufferblock.blockid] = bufferblock
540         return bufferblock
541
542     @synchronized
543     def is_bufferblock(self, locator):
544         return locator in self._bufferblocks
545
546     def _commit_bufferblock_worker(self):
547         """Background uploader thread."""
548
549         while True:
550             try:
551                 bufferblock = self._put_queue.get()
552                 if bufferblock is None:
553                     return
554
555                 if self.copies is None:
556                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
557                 else:
558                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
559                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
560             except Exception as e:
561                 bufferblock.set_state(_BufferBlock.ERROR, e)
562             finally:
563                 if self._put_queue is not None:
564                     self._put_queue.task_done()
565
566     def start_put_threads(self):
567         with self.threads_lock:
568             if self._put_threads is None:
569                 # Start uploader threads.
570
571                 # If we don't limit the Queue size, the upload queue can quickly
572                 # grow to take up gigabytes of RAM if the writing process is
573                 # generating data more quickly than it can be send to the Keep
574                 # servers.
575                 #
576                 # With two upload threads and a queue size of 2, this means up to 4
577                 # blocks pending.  If they are full 64 MiB blocks, that means up to
578                 # 256 MiB of internal buffering, which is the same size as the
579                 # default download block cache in KeepClient.
580                 self._put_queue = queue.Queue(maxsize=2)
581
582                 self._put_threads = []
583                 for i in range(0, self.num_put_threads):
584                     thread = threading.Thread(target=self._commit_bufferblock_worker)
585                     self._put_threads.append(thread)
586                     thread.daemon = True
587                     thread.start()
588
589     def _block_prefetch_worker(self):
590         """The background downloader thread."""
591         while True:
592             try:
593                 b = self._prefetch_queue.get()
594                 if b is None:
595                     return
596                 self._keep.get(b)
597             except Exception:
598                 _logger.exception("Exception doing block prefetch")
599
600     @synchronized
601     def start_get_threads(self):
602         if self._prefetch_threads is None:
603             self._prefetch_queue = queue.Queue()
604             self._prefetch_threads = []
605             for i in range(0, self.num_get_threads):
606                 thread = threading.Thread(target=self._block_prefetch_worker)
607                 self._prefetch_threads.append(thread)
608                 thread.daemon = True
609                 thread.start()
610
611
612     @synchronized
613     def stop_threads(self):
614         """Shut down and wait for background upload and download threads to finish."""
615
616         if self._put_threads is not None:
617             for t in self._put_threads:
618                 self._put_queue.put(None)
619             for t in self._put_threads:
620                 t.join()
621         self._put_threads = None
622         self._put_queue = None
623
624         if self._prefetch_threads is not None:
625             for t in self._prefetch_threads:
626                 self._prefetch_queue.put(None)
627             for t in self._prefetch_threads:
628                 t.join()
629         self._prefetch_threads = None
630         self._prefetch_queue = None
631
632     def __enter__(self):
633         return self
634
635     def __exit__(self, exc_type, exc_value, traceback):
636         self.stop_threads()
637
638     @synchronized
639     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
640         """Packs small blocks together before uploading"""
641
642         self._pending_write_size += closed_file_size
643
644         # Check if there are enough small blocks for filling up one in full
645         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
646             return
647
648         # Search blocks ready for getting packed together before being
649         # committed to Keep.
650         # A WRITABLE block always has an owner.
651         # A WRITABLE block with its owner.closed() implies that its
652         # size is <= KEEP_BLOCK_SIZE/2.
653         try:
654             small_blocks = [b for b in listvalues(self._bufferblocks)
655                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
656         except AttributeError:
657             # Writable blocks without owner shouldn't exist.
658             raise UnownedBlockError()
659
660         if len(small_blocks) <= 1:
661             # Not enough small blocks for repacking
662             return
663
664         for bb in small_blocks:
665             bb.repack_writes()
666
667         # Update the pending write size count with its true value, just in case
668         # some small file was opened, written and closed several times.
669         self._pending_write_size = sum([b.size() for b in small_blocks])
670
671         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
672             return
673
674         new_bb = self._alloc_bufferblock()
675         new_bb.owner = []
676         files = []
677         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
678             bb = small_blocks.pop(0)
679             new_bb.owner.append(bb.owner)
680             self._pending_write_size -= bb.size()
681             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
682             files.append((bb, new_bb.write_pointer - bb.size()))
683
684         self.commit_bufferblock(new_bb, sync=sync)
685
686         for bb, new_bb_segment_offset in files:
687             newsegs = bb.owner.segments()
688             for s in newsegs:
689                 if s.locator == bb.blockid:
690                     s.locator = new_bb.blockid
691                     s.segment_offset = new_bb_segment_offset+s.segment_offset
692             bb.owner.set_segments(newsegs)
693             self._delete_bufferblock(bb.blockid)
694
695     def commit_bufferblock(self, block, sync):
696         """Initiate a background upload of a bufferblock.
697
698         :block:
699           The block object to upload
700
701         :sync:
702           If `sync` is True, upload the block synchronously.
703           If `sync` is False, upload the block asynchronously.  This will
704           return immediately unless the upload queue is at capacity, in
705           which case it will wait on an upload queue slot.
706
707         """
708         try:
709             # Mark the block as PENDING so to disallow any more appends.
710             block.set_state(_BufferBlock.PENDING)
711         except StateChangeError as e:
712             if e.state == _BufferBlock.PENDING:
713                 if sync:
714                     block.wait_for_commit.wait()
715                 else:
716                     return
717             if block.state() == _BufferBlock.COMMITTED:
718                 return
719             elif block.state() == _BufferBlock.ERROR:
720                 raise block.error
721             else:
722                 raise
723
724         if sync:
725             try:
726                 if self.copies is None:
727                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
728                 else:
729                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
730                 block.set_state(_BufferBlock.COMMITTED, loc)
731             except Exception as e:
732                 block.set_state(_BufferBlock.ERROR, e)
733                 raise
734         else:
735             self.start_put_threads()
736             self._put_queue.put(block)
737
738     @synchronized
739     def get_bufferblock(self, locator):
740         return self._bufferblocks.get(locator)
741
742     @synchronized
743     def get_padding_block(self):
744         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
745         when using truncate() to extend the size of a file.
746
747         For reference (and possible future optimization), the md5sum of the
748         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
749
750         """
751
752         if self.padding_block is None:
753             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
754             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
755             self.commit_bufferblock(self.padding_block, False)
756         return self.padding_block
757
758     @synchronized
759     def delete_bufferblock(self, locator):
760         self._delete_bufferblock(locator)
761
762     def _delete_bufferblock(self, locator):
763         bb = self._bufferblocks[locator]
764         bb.clear()
765         del self._bufferblocks[locator]
766
767     def get_block_contents(self, locator, num_retries, cache_only=False):
768         """Fetch a block.
769
770         First checks to see if the locator is a BufferBlock and return that, if
771         not, passes the request through to KeepClient.get().
772
773         """
774         with self.lock:
775             if locator in self._bufferblocks:
776                 bufferblock = self._bufferblocks[locator]
777                 if bufferblock.state() != _BufferBlock.COMMITTED:
778                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
779                 else:
780                     locator = bufferblock._locator
781         if cache_only:
782             return self._keep.get_from_cache(locator)
783         else:
784             return self._keep.get(locator, num_retries=num_retries)
785
786     def commit_all(self):
787         """Commit all outstanding buffer blocks.
788
789         This is a synchronous call, and will not return until all buffer blocks
790         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
791
792         """
793         self.repack_small_blocks(force=True, sync=True)
794
795         with self.lock:
796             items = listitems(self._bufferblocks)
797
798         for k,v in items:
799             if v.state() != _BufferBlock.COMMITTED and v.owner:
800                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
801                 # state, they're already being committed asynchronously.
802                 if isinstance(v.owner, ArvadosFile):
803                     v.owner.flush(sync=False)
804
805         with self.lock:
806             if self._put_queue is not None:
807                 self._put_queue.join()
808
809                 err = []
810                 for k,v in items:
811                     if v.state() == _BufferBlock.ERROR:
812                         err.append((v.locator(), v.error))
813                 if err:
814                     raise KeepWriteError("Error writing some blocks", err, label="block")
815
816         for k,v in items:
817             # flush again with sync=True to remove committed bufferblocks from
818             # the segments.
819             if v.owner:
820                 if isinstance(v.owner, ArvadosFile):
821                     v.owner.flush(sync=True)
822                 elif isinstance(v.owner, list) and len(v.owner) > 0:
823                     # This bufferblock is referenced by many files as a result
824                     # of repacking small blocks, so don't delete it when flushing
825                     # its owners, just do it after flushing them all.
826                     for owner in v.owner:
827                         owner.flush(sync=True)
828                     self.delete_bufferblock(k)
829
830     def block_prefetch(self, locator):
831         """Initiate a background download of a block.
832
833         This assumes that the underlying KeepClient implements a block cache,
834         so repeated requests for the same block will not result in repeated
835         downloads (unless the block is evicted from the cache.)  This method
836         does not block.
837
838         """
839
840         if not self.prefetch_enabled:
841             return
842
843         if self._keep.get_from_cache(locator) is not None:
844             return
845
846         with self.lock:
847             if locator in self._bufferblocks:
848                 return
849
850         self.start_get_threads()
851         self._prefetch_queue.put(locator)
852
853
854 class ArvadosFile(object):
855     """Represent a file in a Collection.
856
857     ArvadosFile manages the underlying representation of a file in Keep as a
858     sequence of segments spanning a set of blocks, and implements random
859     read/write access.
860
861     This object may be accessed from multiple threads.
862
863     """
864
865     def __init__(self, parent, name, stream=[], segments=[]):
866         """
867         ArvadosFile constructor.
868
869         :stream:
870           a list of Range objects representing a block stream
871
872         :segments:
873           a list of Range objects representing segments
874         """
875         self.parent = parent
876         self.name = name
877         self._writers = set()
878         self._committed = False
879         self._segments = []
880         self.lock = parent.root_collection().lock
881         for s in segments:
882             self._add_segment(stream, s.locator, s.range_size)
883         self._current_bblock = None
884
885     def writable(self):
886         return self.parent.writable()
887
888     @synchronized
889     def permission_expired(self, as_of_dt=None):
890         """Returns True if any of the segment's locators is expired"""
891         for r in self._segments:
892             if KeepLocator(r.locator).permission_expired(as_of_dt):
893                 return True
894         return False
895
896     @synchronized
897     def segments(self):
898         return copy.copy(self._segments)
899
900     @synchronized
901     def clone(self, new_parent, new_name):
902         """Make a copy of this file."""
903         cp = ArvadosFile(new_parent, new_name)
904         cp.replace_contents(self)
905         return cp
906
907     @must_be_writable
908     @synchronized
909     def replace_contents(self, other):
910         """Replace segments of this file with segments from another `ArvadosFile` object."""
911
912         map_loc = {}
913         self._segments = []
914         for other_segment in other.segments():
915             new_loc = other_segment.locator
916             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
917                 if other_segment.locator not in map_loc:
918                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
919                     if bufferblock.state() != _BufferBlock.WRITABLE:
920                         map_loc[other_segment.locator] = bufferblock.locator()
921                     else:
922                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
923                 new_loc = map_loc[other_segment.locator]
924
925             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
926
927         self.set_committed(False)
928
929     def __eq__(self, other):
930         if other is self:
931             return True
932         if not isinstance(other, ArvadosFile):
933             return False
934
935         othersegs = other.segments()
936         with self.lock:
937             if len(self._segments) != len(othersegs):
938                 return False
939             for i in range(0, len(othersegs)):
940                 seg1 = self._segments[i]
941                 seg2 = othersegs[i]
942                 loc1 = seg1.locator
943                 loc2 = seg2.locator
944
945                 if self.parent._my_block_manager().is_bufferblock(loc1):
946                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
947
948                 if other.parent._my_block_manager().is_bufferblock(loc2):
949                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
950
951                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
952                     seg1.range_start != seg2.range_start or
953                     seg1.range_size != seg2.range_size or
954                     seg1.segment_offset != seg2.segment_offset):
955                     return False
956
957         return True
958
959     def __ne__(self, other):
960         return not self.__eq__(other)
961
962     @synchronized
963     def set_segments(self, segs):
964         self._segments = segs
965
966     @synchronized
967     def set_committed(self, value=True):
968         """Set committed flag.
969
970         If value is True, set committed to be True.
971
972         If value is False, set committed to be False for this and all parents.
973         """
974         if value == self._committed:
975             return
976         self._committed = value
977         if self._committed is False and self.parent is not None:
978             self.parent.set_committed(False)
979
980     @synchronized
981     def committed(self):
982         """Get whether this is committed or not."""
983         return self._committed
984
985     @synchronized
986     def add_writer(self, writer):
987         """Add an ArvadosFileWriter reference to the list of writers"""
988         if isinstance(writer, ArvadosFileWriter):
989             self._writers.add(writer)
990
991     @synchronized
992     def remove_writer(self, writer, flush):
993         """
994         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
995         and do some block maintenance tasks.
996         """
997         self._writers.remove(writer)
998
999         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1000             # File writer closed, not small enough for repacking
1001             self.flush()
1002         elif self.closed():
1003             # All writers closed and size is adequate for repacking
1004             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1005
1006     def closed(self):
1007         """
1008         Get whether this is closed or not. When the writers list is empty, the file
1009         is supposed to be closed.
1010         """
1011         return len(self._writers) == 0
1012
1013     @must_be_writable
1014     @synchronized
1015     def truncate(self, size):
1016         """Shrink or expand the size of the file.
1017
1018         If `size` is less than the size of the file, the file contents after
1019         `size` will be discarded.  If `size` is greater than the current size
1020         of the file, it will be filled with zero bytes.
1021
1022         """
1023         if size < self.size():
1024             new_segs = []
1025             for r in self._segments:
1026                 range_end = r.range_start+r.range_size
1027                 if r.range_start >= size:
1028                     # segment is past the trucate size, all done
1029                     break
1030                 elif size < range_end:
1031                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1032                     nr.segment_offset = r.segment_offset
1033                     new_segs.append(nr)
1034                     break
1035                 else:
1036                     new_segs.append(r)
1037
1038             self._segments = new_segs
1039             self.set_committed(False)
1040         elif size > self.size():
1041             padding = self.parent._my_block_manager().get_padding_block()
1042             diff = size - self.size()
1043             while diff > config.KEEP_BLOCK_SIZE:
1044                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1045                 diff -= config.KEEP_BLOCK_SIZE
1046             if diff > 0:
1047                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1048             self.set_committed(False)
1049         else:
1050             # size == self.size()
1051             pass
1052
1053     def readfrom(self, offset, size, num_retries, exact=False):
1054         """Read up to `size` bytes from the file starting at `offset`.
1055
1056         :exact:
1057          If False (default), return less data than requested if the read
1058          crosses a block boundary and the next block isn't cached.  If True,
1059          only return less data than requested when hitting EOF.
1060         """
1061
1062         with self.lock:
1063             if size == 0 or offset >= self.size():
1064                 return b''
1065             readsegs = locators_and_ranges(self._segments, offset, size)
1066             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1067
1068         locs = set()
1069         data = []
1070         for lr in readsegs:
1071             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1072             if block:
1073                 blockview = memoryview(block)
1074                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1075                 locs.add(lr.locator)
1076             else:
1077                 break
1078
1079         for lr in prefetch:
1080             if lr.locator not in locs:
1081                 self.parent._my_block_manager().block_prefetch(lr.locator)
1082                 locs.add(lr.locator)
1083
1084         return b''.join(data)
1085
1086     @must_be_writable
1087     @synchronized
1088     def writeto(self, offset, data, num_retries):
1089         """Write `data` to the file starting at `offset`.
1090
1091         This will update existing bytes and/or extend the size of the file as
1092         necessary.
1093
1094         """
1095         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1096             data = data.encode()
1097         if len(data) == 0:
1098             return
1099
1100         if offset > self.size():
1101             self.truncate(offset)
1102
1103         if len(data) > config.KEEP_BLOCK_SIZE:
1104             # Chunk it up into smaller writes
1105             n = 0
1106             dataview = memoryview(data)
1107             while n < len(data):
1108                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1109                 n += config.KEEP_BLOCK_SIZE
1110             return
1111
1112         self.set_committed(False)
1113
1114         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1115             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1116
1117         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1118             self._current_bblock.repack_writes()
1119             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1120                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1121                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1122
1123         self._current_bblock.append(data)
1124
1125         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1126
1127         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1128
1129         return len(data)
1130
1131     @synchronized
1132     def flush(self, sync=True, num_retries=0):
1133         """Flush the current bufferblock to Keep.
1134
1135         :sync:
1136           If True, commit block synchronously, wait until buffer block has been written.
1137           If False, commit block asynchronously, return immediately after putting block into
1138           the keep put queue.
1139         """
1140         if self.committed():
1141             return
1142
1143         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1144             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1145                 self._current_bblock.repack_writes()
1146             if self._current_bblock.state() != _BufferBlock.DELETED:
1147                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1148
1149         if sync:
1150             to_delete = set()
1151             for s in self._segments:
1152                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1153                 if bb:
1154                     if bb.state() != _BufferBlock.COMMITTED:
1155                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1156                     to_delete.add(s.locator)
1157                     s.locator = bb.locator()
1158             for s in to_delete:
1159                 # Don't delete the bufferblock if it's owned by many files. It'll be
1160                 # deleted after all of its owners are flush()ed.
1161                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1162                     self.parent._my_block_manager().delete_bufferblock(s)
1163
1164         self.parent.notify(MOD, self.parent, self.name, (self, self))
1165
1166     @must_be_writable
1167     @synchronized
1168     def add_segment(self, blocks, pos, size):
1169         """Add a segment to the end of the file.
1170
1171         `pos` and `offset` reference a section of the stream described by
1172         `blocks` (a list of Range objects)
1173
1174         """
1175         self._add_segment(blocks, pos, size)
1176
1177     def _add_segment(self, blocks, pos, size):
1178         """Internal implementation of add_segment."""
1179         self.set_committed(False)
1180         for lr in locators_and_ranges(blocks, pos, size):
1181             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1182             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1183             self._segments.append(r)
1184
1185     @synchronized
1186     def size(self):
1187         """Get the file size."""
1188         if self._segments:
1189             n = self._segments[-1]
1190             return n.range_start + n.range_size
1191         else:
1192             return 0
1193
1194     @synchronized
1195     def manifest_text(self, stream_name=".", portable_locators=False,
1196                       normalize=False, only_committed=False):
1197         buf = ""
1198         filestream = []
1199         for segment in self._segments:
1200             loc = segment.locator
1201             if self.parent._my_block_manager().is_bufferblock(loc):
1202                 if only_committed:
1203                     continue
1204                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1205             if portable_locators:
1206                 loc = KeepLocator(loc).stripped()
1207             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1208                                  segment.segment_offset, segment.range_size))
1209         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1210         buf += "\n"
1211         return buf
1212
1213     @must_be_writable
1214     @synchronized
1215     def _reparent(self, newparent, newname):
1216         self.set_committed(False)
1217         self.flush(sync=True)
1218         self.parent.remove(self.name)
1219         self.parent = newparent
1220         self.name = newname
1221         self.lock = self.parent.root_collection().lock
1222
1223
1224 class ArvadosFileReader(ArvadosFileReaderBase):
1225     """Wraps ArvadosFile in a file-like object supporting reading only.
1226
1227     Be aware that this class is NOT thread safe as there is no locking around
1228     updating file pointer.
1229
1230     """
1231
1232     def __init__(self, arvadosfile, mode="r", num_retries=None):
1233         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1234         self.arvadosfile = arvadosfile
1235
1236     def size(self):
1237         return self.arvadosfile.size()
1238
1239     def stream_name(self):
1240         return self.arvadosfile.parent.stream_name()
1241
1242     @_FileLikeObjectBase._before_close
1243     @retry_method
1244     def read(self, size=None, num_retries=None):
1245         """Read up to `size` bytes from the file and return the result.
1246
1247         Starts at the current file position.  If `size` is None, read the
1248         entire remainder of the file.
1249         """
1250         if size is None:
1251             data = []
1252             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1253             while rd:
1254                 data.append(rd)
1255                 self._filepos += len(rd)
1256                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1257             return b''.join(data)
1258         else:
1259             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1260             self._filepos += len(data)
1261             return data
1262
1263     @_FileLikeObjectBase._before_close
1264     @retry_method
1265     def readfrom(self, offset, size, num_retries=None):
1266         """Read up to `size` bytes from the stream, starting at the specified file offset.
1267
1268         This method does not change the file position.
1269         """
1270         return self.arvadosfile.readfrom(offset, size, num_retries)
1271
1272     def flush(self):
1273         pass
1274
1275
1276 class ArvadosFileWriter(ArvadosFileReader):
1277     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1278
1279     Be aware that this class is NOT thread safe as there is no locking around
1280     updating file pointer.
1281
1282     """
1283
1284     def __init__(self, arvadosfile, mode, num_retries=None):
1285         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1286         self.arvadosfile.add_writer(self)
1287
1288     def writable(self):
1289         return True
1290
1291     @_FileLikeObjectBase._before_close
1292     @retry_method
1293     def write(self, data, num_retries=None):
1294         if self.mode[0] == "a":
1295             self._filepos = self.size()
1296         self.arvadosfile.writeto(self._filepos, data, num_retries)
1297         self._filepos += len(data)
1298         return len(data)
1299
1300     @_FileLikeObjectBase._before_close
1301     @retry_method
1302     def writelines(self, seq, num_retries=None):
1303         for s in seq:
1304             self.write(s, num_retries=num_retries)
1305
1306     @_FileLikeObjectBase._before_close
1307     def truncate(self, size=None):
1308         if size is None:
1309             size = self._filepos
1310         self.arvadosfile.truncate(size)
1311
1312     @_FileLikeObjectBase._before_close
1313     def flush(self):
1314         self.arvadosfile.flush()
1315
1316     def close(self, flush=True):
1317         if not self.closed:
1318             self.arvadosfile.remove_writer(self, flush)
1319             super(ArvadosFileWriter, self).close()