11308: Raise exception on invalid/unsupported open() mode.
[arvados.git] / sdk / python / arvados / arvfile.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import range
6 from builtins import object
7 import bz2
8 import collections
9 import copy
10 import errno
11 import functools
12 import hashlib
13 import logging
14 import os
15 import queue
16 import re
17 import sys
18 import threading
19 import uuid
20 import zlib
21
22 from . import config
23 from .errors import KeepWriteError, AssertionError, ArgumentError
24 from .keep import KeepLocator
25 from ._normalize_stream import normalize_stream
26 from ._ranges import locators_and_ranges, replace_range, Range
27 from .retry import retry_method
28
29 MOD = "mod"
30 WRITE = "write"
31
32 _logger = logging.getLogger('arvados.arvfile')
33
34 def split(path):
35     """split(path) -> streamname, filename
36
37     Separate the stream name and file name in a /-separated stream path and
38     return a tuple (stream_name, file_name).  If no stream name is available,
39     assume '.'.
40
41     """
42     try:
43         stream_name, file_name = path.rsplit('/', 1)
44     except ValueError:  # No / in string
45         stream_name, file_name = '.', path
46     return stream_name, file_name
47
48
49 class UnownedBlockError(Exception):
50     """Raised when there's an writable block without an owner on the BlockManager."""
51     pass
52
53
54 class _FileLikeObjectBase(object):
55     def __init__(self, name, mode):
56         self.name = name
57         self.mode = mode
58         self.closed = False
59
60     @staticmethod
61     def _before_close(orig_func):
62         @functools.wraps(orig_func)
63         def before_close_wrapper(self, *args, **kwargs):
64             if self.closed:
65                 raise ValueError("I/O operation on closed stream file")
66             return orig_func(self, *args, **kwargs)
67         return before_close_wrapper
68
69     def __enter__(self):
70         return self
71
72     def __exit__(self, exc_type, exc_value, traceback):
73         try:
74             self.close()
75         except Exception:
76             if exc_type is None:
77                 raise
78
79     def close(self):
80         self.closed = True
81
82
83 class ArvadosFileReaderBase(_FileLikeObjectBase):
84     def __init__(self, name, mode, num_retries=None):
85         super(ArvadosFileReaderBase, self).__init__(name, mode)
86         self._binary = 'b' in mode
87         if sys.version_info >= (3, 0) and not self._binary:
88             raise NotImplementedError("text mode {!r} is not implemented".format(mode))
89         self._filepos = 0
90         self.num_retries = num_retries
91         self._readline_cache = (None, None)
92
93     def __iter__(self):
94         while True:
95             data = self.readline()
96             if not data:
97                 break
98             yield data
99
100     def decompressed_name(self):
101         return re.sub('\.(bz2|gz)$', '', self.name)
102
103     @_FileLikeObjectBase._before_close
104     def seek(self, pos, whence=os.SEEK_SET):
105         if whence == os.SEEK_CUR:
106             pos += self._filepos
107         elif whence == os.SEEK_END:
108             pos += self.size()
109         self._filepos = min(max(pos, 0), self.size())
110
111     def tell(self):
112         return self._filepos
113
114     @_FileLikeObjectBase._before_close
115     @retry_method
116     def readall(self, size=2**20, num_retries=None):
117         while True:
118             data = self.read(size, num_retries=num_retries)
119             if len(data) == 0:
120                 break
121             yield data
122
123     @_FileLikeObjectBase._before_close
124     @retry_method
125     def readline(self, size=float('inf'), num_retries=None):
126         cache_pos, cache_data = self._readline_cache
127         if self.tell() == cache_pos:
128             data = [cache_data]
129             self._filepos += len(cache_data)
130         else:
131             data = [b'']
132         data_size = len(data[-1])
133         while (data_size < size) and (b'\n' not in data[-1]):
134             next_read = self.read(2 ** 20, num_retries=num_retries)
135             if not next_read:
136                 break
137             data.append(next_read)
138             data_size += len(next_read)
139         data = b''.join(data)
140         try:
141             nextline_index = data.index(b'\n') + 1
142         except ValueError:
143             nextline_index = len(data)
144         nextline_index = min(nextline_index, size)
145         self._filepos -= len(data) - nextline_index
146         self._readline_cache = (self.tell(), data[nextline_index:])
147         return data[:nextline_index].decode()
148
149     @_FileLikeObjectBase._before_close
150     @retry_method
151     def decompress(self, decompress, size, num_retries=None):
152         for segment in self.readall(size, num_retries=num_retries):
153             data = decompress(segment)
154             if data:
155                 yield data
156
157     @_FileLikeObjectBase._before_close
158     @retry_method
159     def readall_decompressed(self, size=2**20, num_retries=None):
160         self.seek(0)
161         if self.name.endswith('.bz2'):
162             dc = bz2.BZ2Decompressor()
163             return self.decompress(dc.decompress, size,
164                                    num_retries=num_retries)
165         elif self.name.endswith('.gz'):
166             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
167             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
168                                    size, num_retries=num_retries)
169         else:
170             return self.readall(size, num_retries=num_retries)
171
172     @_FileLikeObjectBase._before_close
173     @retry_method
174     def readlines(self, sizehint=float('inf'), num_retries=None):
175         data = []
176         data_size = 0
177         for s in self.readall(num_retries=num_retries):
178             data.append(s)
179             data_size += len(s)
180             if data_size >= sizehint:
181                 break
182         return b''.join(data).decode().splitlines(True)
183
184     def size(self):
185         raise NotImplementedError()
186
187     def read(self, size, num_retries=None):
188         raise NotImplementedError()
189
190     def readfrom(self, start, size, num_retries=None):
191         raise NotImplementedError()
192
193
194 class StreamFileReader(ArvadosFileReaderBase):
195     class _NameAttribute(str):
196         # The Python file API provides a plain .name attribute.
197         # Older SDK provided a name() method.
198         # This class provides both, for maximum compatibility.
199         def __call__(self):
200             return self
201
202     def __init__(self, stream, segments, name):
203         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
204         self._stream = stream
205         self.segments = segments
206
207     def stream_name(self):
208         return self._stream.name()
209
210     def size(self):
211         n = self.segments[-1]
212         return n.range_start + n.range_size
213
214     @_FileLikeObjectBase._before_close
215     @retry_method
216     def read(self, size, num_retries=None):
217         """Read up to 'size' bytes from the stream, starting at the current file position"""
218         if size == 0:
219             return b''
220
221         data = b''
222         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
223         if available_chunks:
224             lr = available_chunks[0]
225             data = self._stream.readfrom(lr.locator+lr.segment_offset,
226                                          lr.segment_size,
227                                          num_retries=num_retries)
228
229         self._filepos += len(data)
230         return data
231
232     @_FileLikeObjectBase._before_close
233     @retry_method
234     def readfrom(self, start, size, num_retries=None):
235         """Read up to 'size' bytes from the stream, starting at 'start'"""
236         if size == 0:
237             return b''
238
239         data = []
240         for lr in locators_and_ranges(self.segments, start, size):
241             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
242                                               num_retries=num_retries))
243         return b''.join(data)
244
245     def as_manifest(self):
246         segs = []
247         for r in self.segments:
248             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
249         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
250
251
252 def synchronized(orig_func):
253     @functools.wraps(orig_func)
254     def synchronized_wrapper(self, *args, **kwargs):
255         with self.lock:
256             return orig_func(self, *args, **kwargs)
257     return synchronized_wrapper
258
259
260 class StateChangeError(Exception):
261     def __init__(self, message, state, nextstate):
262         super(StateChangeError, self).__init__(message)
263         self.state = state
264         self.nextstate = nextstate
265
266 class _BufferBlock(object):
267     """A stand-in for a Keep block that is in the process of being written.
268
269     Writers can append to it, get the size, and compute the Keep locator.
270     There are three valid states:
271
272     WRITABLE
273       Can append to block.
274
275     PENDING
276       Block is in the process of being uploaded to Keep, append is an error.
277
278     COMMITTED
279       The block has been written to Keep, its internal buffer has been
280       released, fetching the block will fetch it via keep client (since we
281       discarded the internal copy), and identifiers referring to the BufferBlock
282       can be replaced with the block locator.
283
284     """
285
286     WRITABLE = 0
287     PENDING = 1
288     COMMITTED = 2
289     ERROR = 3
290
291     def __init__(self, blockid, starting_capacity, owner):
292         """
293         :blockid:
294           the identifier for this block
295
296         :starting_capacity:
297           the initial buffer capacity
298
299         :owner:
300           ArvadosFile that owns this block
301
302         """
303         self.blockid = blockid
304         self.buffer_block = bytearray(starting_capacity)
305         self.buffer_view = memoryview(self.buffer_block)
306         self.write_pointer = 0
307         self._state = _BufferBlock.WRITABLE
308         self._locator = None
309         self.owner = owner
310         self.lock = threading.Lock()
311         self.wait_for_commit = threading.Event()
312         self.error = None
313
314     @synchronized
315     def append(self, data):
316         """Append some data to the buffer.
317
318         Only valid if the block is in WRITABLE state.  Implements an expanding
319         buffer, doubling capacity as needed to accomdate all the data.
320
321         """
322         if self._state == _BufferBlock.WRITABLE:
323             if not isinstance(data, bytes) and not isinstance(data, memoryview):
324                 data = data.encode()
325             while (self.write_pointer+len(data)) > len(self.buffer_block):
326                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
327                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
328                 self.buffer_block = new_buffer_block
329                 self.buffer_view = memoryview(self.buffer_block)
330             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
331             self.write_pointer += len(data)
332             self._locator = None
333         else:
334             raise AssertionError("Buffer block is not writable")
335
336     STATE_TRANSITIONS = frozenset([
337             (WRITABLE, PENDING),
338             (PENDING, COMMITTED),
339             (PENDING, ERROR),
340             (ERROR, PENDING)])
341
342     @synchronized
343     def set_state(self, nextstate, val=None):
344         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
345             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
346         self._state = nextstate
347
348         if self._state == _BufferBlock.PENDING:
349             self.wait_for_commit.clear()
350
351         if self._state == _BufferBlock.COMMITTED:
352             self._locator = val
353             self.buffer_view = None
354             self.buffer_block = None
355             self.wait_for_commit.set()
356
357         if self._state == _BufferBlock.ERROR:
358             self.error = val
359             self.wait_for_commit.set()
360
361     @synchronized
362     def state(self):
363         return self._state
364
365     def size(self):
366         """The amount of data written to the buffer."""
367         return self.write_pointer
368
369     @synchronized
370     def locator(self):
371         """The Keep locator for this buffer's contents."""
372         if self._locator is None:
373             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
374         return self._locator
375
376     @synchronized
377     def clone(self, new_blockid, owner):
378         if self._state == _BufferBlock.COMMITTED:
379             raise AssertionError("Cannot duplicate committed buffer block")
380         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
381         bufferblock.append(self.buffer_view[0:self.size()])
382         return bufferblock
383
384     @synchronized
385     def clear(self):
386         self.owner = None
387         self.buffer_block = None
388         self.buffer_view = None
389
390
391 class NoopLock(object):
392     def __enter__(self):
393         return self
394
395     def __exit__(self, exc_type, exc_value, traceback):
396         pass
397
398     def acquire(self, blocking=False):
399         pass
400
401     def release(self):
402         pass
403
404
405 def must_be_writable(orig_func):
406     @functools.wraps(orig_func)
407     def must_be_writable_wrapper(self, *args, **kwargs):
408         if not self.writable():
409             raise IOError(errno.EROFS, "Collection is read-only.")
410         return orig_func(self, *args, **kwargs)
411     return must_be_writable_wrapper
412
413
414 class _BlockManager(object):
415     """BlockManager handles buffer blocks.
416
417     Also handles background block uploads, and background block prefetch for a
418     Collection of ArvadosFiles.
419
420     """
421
422     DEFAULT_PUT_THREADS = 2
423     DEFAULT_GET_THREADS = 2
424
425     def __init__(self, keep, copies=None, put_threads=None):
426         """keep: KeepClient object to use"""
427         self._keep = keep
428         self._bufferblocks = collections.OrderedDict()
429         self._put_queue = None
430         self._put_threads = None
431         self._prefetch_queue = None
432         self._prefetch_threads = None
433         self.lock = threading.Lock()
434         self.prefetch_enabled = True
435         if put_threads:
436             self.num_put_threads = put_threads
437         else:
438             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
439         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
440         self.copies = copies
441         self._pending_write_size = 0
442         self.threads_lock = threading.Lock()
443
444     @synchronized
445     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
446         """Allocate a new, empty bufferblock in WRITABLE state and return it.
447
448         :blockid:
449           optional block identifier, otherwise one will be automatically assigned
450
451         :starting_capacity:
452           optional capacity, otherwise will use default capacity
453
454         :owner:
455           ArvadosFile that owns this block
456
457         """
458         return self._alloc_bufferblock(blockid, starting_capacity, owner)
459
460     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
461         if blockid is None:
462             blockid = "%s" % uuid.uuid4()
463         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
464         self._bufferblocks[bufferblock.blockid] = bufferblock
465         return bufferblock
466
467     @synchronized
468     def dup_block(self, block, owner):
469         """Create a new bufferblock initialized with the content of an existing bufferblock.
470
471         :block:
472           the buffer block to copy.
473
474         :owner:
475           ArvadosFile that owns the new block
476
477         """
478         new_blockid = "bufferblock%i" % len(self._bufferblocks)
479         bufferblock = block.clone(new_blockid, owner)
480         self._bufferblocks[bufferblock.blockid] = bufferblock
481         return bufferblock
482
483     @synchronized
484     def is_bufferblock(self, locator):
485         return locator in self._bufferblocks
486
487     def _commit_bufferblock_worker(self):
488         """Background uploader thread."""
489
490         while True:
491             try:
492                 bufferblock = self._put_queue.get()
493                 if bufferblock is None:
494                     return
495
496                 if self.copies is None:
497                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
498                 else:
499                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
500                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
501
502             except Exception as e:
503                 bufferblock.set_state(_BufferBlock.ERROR, e)
504             finally:
505                 if self._put_queue is not None:
506                     self._put_queue.task_done()
507
508     def start_put_threads(self):
509         with self.threads_lock:
510             if self._put_threads is None:
511                 # Start uploader threads.
512
513                 # If we don't limit the Queue size, the upload queue can quickly
514                 # grow to take up gigabytes of RAM if the writing process is
515                 # generating data more quickly than it can be send to the Keep
516                 # servers.
517                 #
518                 # With two upload threads and a queue size of 2, this means up to 4
519                 # blocks pending.  If they are full 64 MiB blocks, that means up to
520                 # 256 MiB of internal buffering, which is the same size as the
521                 # default download block cache in KeepClient.
522                 self._put_queue = queue.Queue(maxsize=2)
523
524                 self._put_threads = []
525                 for i in range(0, self.num_put_threads):
526                     thread = threading.Thread(target=self._commit_bufferblock_worker)
527                     self._put_threads.append(thread)
528                     thread.daemon = True
529                     thread.start()
530
531     def _block_prefetch_worker(self):
532         """The background downloader thread."""
533         while True:
534             try:
535                 b = self._prefetch_queue.get()
536                 if b is None:
537                     return
538                 self._keep.get(b)
539             except Exception:
540                 _logger.exception("Exception doing block prefetch")
541
542     @synchronized
543     def start_get_threads(self):
544         if self._prefetch_threads is None:
545             self._prefetch_queue = queue.Queue()
546             self._prefetch_threads = []
547             for i in range(0, self.num_get_threads):
548                 thread = threading.Thread(target=self._block_prefetch_worker)
549                 self._prefetch_threads.append(thread)
550                 thread.daemon = True
551                 thread.start()
552
553
554     @synchronized
555     def stop_threads(self):
556         """Shut down and wait for background upload and download threads to finish."""
557
558         if self._put_threads is not None:
559             for t in self._put_threads:
560                 self._put_queue.put(None)
561             for t in self._put_threads:
562                 t.join()
563         self._put_threads = None
564         self._put_queue = None
565
566         if self._prefetch_threads is not None:
567             for t in self._prefetch_threads:
568                 self._prefetch_queue.put(None)
569             for t in self._prefetch_threads:
570                 t.join()
571         self._prefetch_threads = None
572         self._prefetch_queue = None
573
574     def __enter__(self):
575         return self
576
577     def __exit__(self, exc_type, exc_value, traceback):
578         self.stop_threads()
579
580     @synchronized
581     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
582         """Packs small blocks together before uploading"""
583         self._pending_write_size += closed_file_size
584
585         # Check if there are enough small blocks for filling up one in full
586         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
587
588             # Search blocks ready for getting packed together before being committed to Keep.
589             # A WRITABLE block always has an owner.
590             # A WRITABLE block with its owner.closed() implies that it's
591             # size is <= KEEP_BLOCK_SIZE/2.
592             try:
593                 small_blocks = [b for b in list(self._bufferblocks.values()) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
594             except AttributeError:
595                 # Writable blocks without owner shouldn't exist.
596                 raise UnownedBlockError()
597
598             if len(small_blocks) <= 1:
599                 # Not enough small blocks for repacking
600                 return
601
602             # Update the pending write size count with its true value, just in case
603             # some small file was opened, written and closed several times.
604             self._pending_write_size = sum([b.size() for b in small_blocks])
605             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
606                 return
607
608             new_bb = self._alloc_bufferblock()
609             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
610                 bb = small_blocks.pop(0)
611                 arvfile = bb.owner
612                 self._pending_write_size -= bb.size()
613                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
614                 arvfile.set_segments([Range(new_bb.blockid,
615                                             0,
616                                             bb.size(),
617                                             new_bb.write_pointer - bb.size())])
618                 self._delete_bufferblock(bb.blockid)
619             self.commit_bufferblock(new_bb, sync=sync)
620
621     def commit_bufferblock(self, block, sync):
622         """Initiate a background upload of a bufferblock.
623
624         :block:
625           The block object to upload
626
627         :sync:
628           If `sync` is True, upload the block synchronously.
629           If `sync` is False, upload the block asynchronously.  This will
630           return immediately unless the upload queue is at capacity, in
631           which case it will wait on an upload queue slot.
632
633         """
634         try:
635             # Mark the block as PENDING so to disallow any more appends.
636             block.set_state(_BufferBlock.PENDING)
637         except StateChangeError as e:
638             if e.state == _BufferBlock.PENDING:
639                 if sync:
640                     block.wait_for_commit.wait()
641                 else:
642                     return
643             if block.state() == _BufferBlock.COMMITTED:
644                 return
645             elif block.state() == _BufferBlock.ERROR:
646                 raise block.error
647             else:
648                 raise
649
650         if sync:
651             try:
652                 if self.copies is None:
653                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
654                 else:
655                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
656                 block.set_state(_BufferBlock.COMMITTED, loc)
657             except Exception as e:
658                 block.set_state(_BufferBlock.ERROR, e)
659                 raise
660         else:
661             self.start_put_threads()
662             self._put_queue.put(block)
663
664     @synchronized
665     def get_bufferblock(self, locator):
666         return self._bufferblocks.get(locator)
667
668     @synchronized
669     def delete_bufferblock(self, locator):
670         self._delete_bufferblock(locator)
671
672     def _delete_bufferblock(self, locator):
673         bb = self._bufferblocks[locator]
674         bb.clear()
675         del self._bufferblocks[locator]
676
677     def get_block_contents(self, locator, num_retries, cache_only=False):
678         """Fetch a block.
679
680         First checks to see if the locator is a BufferBlock and return that, if
681         not, passes the request through to KeepClient.get().
682
683         """
684         with self.lock:
685             if locator in self._bufferblocks:
686                 bufferblock = self._bufferblocks[locator]
687                 if bufferblock.state() != _BufferBlock.COMMITTED:
688                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
689                 else:
690                     locator = bufferblock._locator
691         if cache_only:
692             return self._keep.get_from_cache(locator)
693         else:
694             return self._keep.get(locator, num_retries=num_retries)
695
696     def commit_all(self):
697         """Commit all outstanding buffer blocks.
698
699         This is a synchronous call, and will not return until all buffer blocks
700         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
701
702         """
703         self.repack_small_blocks(force=True, sync=True)
704
705         with self.lock:
706             items = list(self._bufferblocks.items())
707
708         for k,v in items:
709             if v.state() != _BufferBlock.COMMITTED and v.owner:
710                 v.owner.flush(sync=False)
711
712         with self.lock:
713             if self._put_queue is not None:
714                 self._put_queue.join()
715
716                 err = []
717                 for k,v in items:
718                     if v.state() == _BufferBlock.ERROR:
719                         err.append((v.locator(), v.error))
720                 if err:
721                     raise KeepWriteError("Error writing some blocks", err, label="block")
722
723         for k,v in items:
724             # flush again with sync=True to remove committed bufferblocks from
725             # the segments.
726             if v.owner:
727                 v.owner.flush(sync=True)
728
729     def block_prefetch(self, locator):
730         """Initiate a background download of a block.
731
732         This assumes that the underlying KeepClient implements a block cache,
733         so repeated requests for the same block will not result in repeated
734         downloads (unless the block is evicted from the cache.)  This method
735         does not block.
736
737         """
738
739         if not self.prefetch_enabled:
740             return
741
742         if self._keep.get_from_cache(locator) is not None:
743             return
744
745         with self.lock:
746             if locator in self._bufferblocks:
747                 return
748
749         self.start_get_threads()
750         self._prefetch_queue.put(locator)
751
752
753 class ArvadosFile(object):
754     """Represent a file in a Collection.
755
756     ArvadosFile manages the underlying representation of a file in Keep as a
757     sequence of segments spanning a set of blocks, and implements random
758     read/write access.
759
760     This object may be accessed from multiple threads.
761
762     """
763
764     def __init__(self, parent, name, stream=[], segments=[]):
765         """
766         ArvadosFile constructor.
767
768         :stream:
769           a list of Range objects representing a block stream
770
771         :segments:
772           a list of Range objects representing segments
773         """
774         self.parent = parent
775         self.name = name
776         self._writers = set()
777         self._committed = False
778         self._segments = []
779         self.lock = parent.root_collection().lock
780         for s in segments:
781             self._add_segment(stream, s.locator, s.range_size)
782         self._current_bblock = None
783
784     def writable(self):
785         return self.parent.writable()
786
787     @synchronized
788     def permission_expired(self, as_of_dt=None):
789         """Returns True if any of the segment's locators is expired"""
790         for r in self._segments:
791             if KeepLocator(r.locator).permission_expired(as_of_dt):
792                 return True
793         return False
794
795     @synchronized
796     def segments(self):
797         return copy.copy(self._segments)
798
799     @synchronized
800     def clone(self, new_parent, new_name):
801         """Make a copy of this file."""
802         cp = ArvadosFile(new_parent, new_name)
803         cp.replace_contents(self)
804         return cp
805
806     @must_be_writable
807     @synchronized
808     def replace_contents(self, other):
809         """Replace segments of this file with segments from another `ArvadosFile` object."""
810
811         map_loc = {}
812         self._segments = []
813         for other_segment in other.segments():
814             new_loc = other_segment.locator
815             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
816                 if other_segment.locator not in map_loc:
817                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
818                     if bufferblock.state() != _BufferBlock.WRITABLE:
819                         map_loc[other_segment.locator] = bufferblock.locator()
820                     else:
821                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
822                 new_loc = map_loc[other_segment.locator]
823
824             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
825
826         self.set_committed(False)
827
828     def __eq__(self, other):
829         if other is self:
830             return True
831         if not isinstance(other, ArvadosFile):
832             return False
833
834         othersegs = other.segments()
835         with self.lock:
836             if len(self._segments) != len(othersegs):
837                 return False
838             for i in range(0, len(othersegs)):
839                 seg1 = self._segments[i]
840                 seg2 = othersegs[i]
841                 loc1 = seg1.locator
842                 loc2 = seg2.locator
843
844                 if self.parent._my_block_manager().is_bufferblock(loc1):
845                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
846
847                 if other.parent._my_block_manager().is_bufferblock(loc2):
848                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
849
850                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
851                     seg1.range_start != seg2.range_start or
852                     seg1.range_size != seg2.range_size or
853                     seg1.segment_offset != seg2.segment_offset):
854                     return False
855
856         return True
857
858     def __ne__(self, other):
859         return not self.__eq__(other)
860
861     @synchronized
862     def set_segments(self, segs):
863         self._segments = segs
864
865     @synchronized
866     def set_committed(self, value=True):
867         """Set committed flag.
868
869         If value is True, set committed to be True.
870
871         If value is False, set committed to be False for this and all parents.
872         """
873         if value == self._committed:
874             return
875         self._committed = value
876         if self._committed is False and self.parent is not None:
877             self.parent.set_committed(False)
878
879     @synchronized
880     def committed(self):
881         """Get whether this is committed or not."""
882         return self._committed
883
884     @synchronized
885     def add_writer(self, writer):
886         """Add an ArvadosFileWriter reference to the list of writers"""
887         if isinstance(writer, ArvadosFileWriter):
888             self._writers.add(writer)
889
890     @synchronized
891     def remove_writer(self, writer, flush):
892         """
893         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
894         and do some block maintenance tasks.
895         """
896         self._writers.remove(writer)
897
898         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
899             # File writer closed, not small enough for repacking
900             self.flush()
901         elif self.closed():
902             # All writers closed and size is adequate for repacking
903             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
904
905     def closed(self):
906         """
907         Get whether this is closed or not. When the writers list is empty, the file
908         is supposed to be closed.
909         """
910         return len(self._writers) == 0
911
912     @must_be_writable
913     @synchronized
914     def truncate(self, size):
915         """Shrink the size of the file.
916
917         If `size` is less than the size of the file, the file contents after
918         `size` will be discarded.  If `size` is greater than the current size
919         of the file, an IOError will be raised.
920
921         """
922         if size < self.size():
923             new_segs = []
924             for r in self._segments:
925                 range_end = r.range_start+r.range_size
926                 if r.range_start >= size:
927                     # segment is past the trucate size, all done
928                     break
929                 elif size < range_end:
930                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
931                     nr.segment_offset = r.segment_offset
932                     new_segs.append(nr)
933                     break
934                 else:
935                     new_segs.append(r)
936
937             self._segments = new_segs
938             self.set_committed(False)
939         elif size > self.size():
940             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
941
942     def readfrom(self, offset, size, num_retries, exact=False):
943         """Read up to `size` bytes from the file starting at `offset`.
944
945         :exact:
946          If False (default), return less data than requested if the read
947          crosses a block boundary and the next block isn't cached.  If True,
948          only return less data than requested when hitting EOF.
949         """
950
951         with self.lock:
952             if size == 0 or offset >= self.size():
953                 return b''
954             readsegs = locators_and_ranges(self._segments, offset, size)
955             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
956
957         locs = set()
958         data = []
959         for lr in readsegs:
960             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
961             if block:
962                 blockview = memoryview(block)
963                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
964                 locs.add(lr.locator)
965             else:
966                 break
967
968         for lr in prefetch:
969             if lr.locator not in locs:
970                 self.parent._my_block_manager().block_prefetch(lr.locator)
971                 locs.add(lr.locator)
972
973         return b''.join(data)
974
975     def _repack_writes(self, num_retries):
976         """Test if the buffer block has more data than actual segments.
977
978         This happens when a buffered write over-writes a file range written in
979         a previous buffered write.  Re-pack the buffer block for efficiency
980         and to avoid leaking information.
981
982         """
983         segs = self._segments
984
985         # Sum up the segments to get the total bytes of the file referencing
986         # into the buffer block.
987         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
988         write_total = sum([s.range_size for s in bufferblock_segs])
989
990         if write_total < self._current_bblock.size():
991             # There is more data in the buffer block than is actually accounted for by segments, so
992             # re-pack into a new buffer by copying over to a new buffer block.
993             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
994             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
995             for t in bufferblock_segs:
996                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
997                 t.segment_offset = new_bb.size() - t.range_size
998
999             self._current_bblock = new_bb
1000
1001     @must_be_writable
1002     @synchronized
1003     def writeto(self, offset, data, num_retries):
1004         """Write `data` to the file starting at `offset`.
1005
1006         This will update existing bytes and/or extend the size of the file as
1007         necessary.
1008
1009         """
1010         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1011             data = data.encode()
1012         if len(data) == 0:
1013             return
1014
1015         if offset > self.size():
1016             raise ArgumentError("Offset is past the end of the file")
1017
1018         if len(data) > config.KEEP_BLOCK_SIZE:
1019             # Chunk it up into smaller writes
1020             n = 0
1021             dataview = memoryview(data)
1022             while n < len(data):
1023                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1024                 n += config.KEEP_BLOCK_SIZE
1025             return
1026
1027         self.set_committed(False)
1028
1029         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1030             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1031
1032         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1033             self._repack_writes(num_retries)
1034             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1035                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1036                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1037
1038         self._current_bblock.append(data)
1039
1040         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1041
1042         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1043
1044         return len(data)
1045
1046     @synchronized
1047     def flush(self, sync=True, num_retries=0):
1048         """Flush the current bufferblock to Keep.
1049
1050         :sync:
1051           If True, commit block synchronously, wait until buffer block has been written.
1052           If False, commit block asynchronously, return immediately after putting block into
1053           the keep put queue.
1054         """
1055         if self.committed():
1056             return
1057
1058         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1059             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1060                 self._repack_writes(num_retries)
1061             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1062
1063         if sync:
1064             to_delete = set()
1065             for s in self._segments:
1066                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1067                 if bb:
1068                     if bb.state() != _BufferBlock.COMMITTED:
1069                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1070                     to_delete.add(s.locator)
1071                     s.locator = bb.locator()
1072             for s in to_delete:
1073                self.parent._my_block_manager().delete_bufferblock(s)
1074
1075         self.parent.notify(MOD, self.parent, self.name, (self, self))
1076
1077     @must_be_writable
1078     @synchronized
1079     def add_segment(self, blocks, pos, size):
1080         """Add a segment to the end of the file.
1081
1082         `pos` and `offset` reference a section of the stream described by
1083         `blocks` (a list of Range objects)
1084
1085         """
1086         self._add_segment(blocks, pos, size)
1087
1088     def _add_segment(self, blocks, pos, size):
1089         """Internal implementation of add_segment."""
1090         self.set_committed(False)
1091         for lr in locators_and_ranges(blocks, pos, size):
1092             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1093             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1094             self._segments.append(r)
1095
1096     @synchronized
1097     def size(self):
1098         """Get the file size."""
1099         if self._segments:
1100             n = self._segments[-1]
1101             return n.range_start + n.range_size
1102         else:
1103             return 0
1104
1105     @synchronized
1106     def manifest_text(self, stream_name=".", portable_locators=False,
1107                       normalize=False, only_committed=False):
1108         buf = ""
1109         filestream = []
1110         for segment in self.segments:
1111             loc = segment.locator
1112             if self.parent._my_block_manager().is_bufferblock(loc):
1113                 if only_committed:
1114                     continue
1115                 loc = self._bufferblocks[loc].calculate_locator()
1116             if portable_locators:
1117                 loc = KeepLocator(loc).stripped()
1118             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1119                                  segment.segment_offset, segment.range_size))
1120         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1121         buf += "\n"
1122         return buf
1123
1124     @must_be_writable
1125     @synchronized
1126     def _reparent(self, newparent, newname):
1127         self.set_committed(False)
1128         self.flush(sync=True)
1129         self.parent.remove(self.name)
1130         self.parent = newparent
1131         self.name = newname
1132         self.lock = self.parent.root_collection().lock
1133
1134
1135 class ArvadosFileReader(ArvadosFileReaderBase):
1136     """Wraps ArvadosFile in a file-like object supporting reading only.
1137
1138     Be aware that this class is NOT thread safe as there is no locking around
1139     updating file pointer.
1140
1141     """
1142
1143     def __init__(self, arvadosfile, mode="r", num_retries=None):
1144         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1145         self.arvadosfile = arvadosfile
1146
1147     def size(self):
1148         return self.arvadosfile.size()
1149
1150     def stream_name(self):
1151         return self.arvadosfile.parent.stream_name()
1152
1153     @_FileLikeObjectBase._before_close
1154     @retry_method
1155     def read(self, size=None, num_retries=None):
1156         """Read up to `size` bytes from the file and return the result.
1157
1158         Starts at the current file position.  If `size` is None, read the
1159         entire remainder of the file.
1160         """
1161         if size is None:
1162             data = []
1163             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1164             while rd:
1165                 data.append(rd)
1166                 self._filepos += len(rd)
1167                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1168             return b''.join(data)
1169         else:
1170             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1171             self._filepos += len(data)
1172             return data
1173
1174     @_FileLikeObjectBase._before_close
1175     @retry_method
1176     def readfrom(self, offset, size, num_retries=None):
1177         """Read up to `size` bytes from the stream, starting at the specified file offset.
1178
1179         This method does not change the file position.
1180         """
1181         return self.arvadosfile.readfrom(offset, size, num_retries)
1182
1183     def flush(self):
1184         pass
1185
1186
1187 class ArvadosFileWriter(ArvadosFileReader):
1188     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1189
1190     Be aware that this class is NOT thread safe as there is no locking around
1191     updating file pointer.
1192
1193     """
1194
1195     def __init__(self, arvadosfile, mode, num_retries=None):
1196         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1197         self.arvadosfile.add_writer(self)
1198
1199     @_FileLikeObjectBase._before_close
1200     @retry_method
1201     def write(self, data, num_retries=None):
1202         if self.mode[0] == "a":
1203             self.arvadosfile.writeto(self.size(), data, num_retries)
1204         else:
1205             self.arvadosfile.writeto(self._filepos, data, num_retries)
1206             self._filepos += len(data)
1207         return len(data)
1208
1209     @_FileLikeObjectBase._before_close
1210     @retry_method
1211     def writelines(self, seq, num_retries=None):
1212         for s in seq:
1213             self.write(s, num_retries=num_retries)
1214
1215     @_FileLikeObjectBase._before_close
1216     def truncate(self, size=None):
1217         if size is None:
1218             size = self._filepos
1219         self.arvadosfile.truncate(size)
1220         if self._filepos > self.size():
1221             self._filepos = self.size()
1222
1223     @_FileLikeObjectBase._before_close
1224     def flush(self):
1225         self.arvadosfile.flush()
1226
1227     def close(self, flush=True):
1228         if not self.closed:
1229             self.arvadosfile.remove_writer(self, flush)
1230             super(ArvadosFileWriter, self).close()