2c44c349e00de03605367c60b87711a13b39c88e
[arvados.git] / sdk / python / arvados / arvfile.py
1 from __future__ import absolute_import
2 import functools
3 import os
4 import zlib
5 import bz2
6 from . import config
7 import hashlib
8 import threading
9 import Queue
10 import copy
11 import errno
12 import re
13 import logging
14 import collections
15 import uuid
16
17 from .errors import KeepWriteError, AssertionError, ArgumentError
18 from .keep import KeepLocator
19 from ._normalize_stream import normalize_stream
20 from ._ranges import locators_and_ranges, replace_range, Range
21 from .retry import retry_method
22
23 MOD = "mod"
24 WRITE = "write"
25
26 _logger = logging.getLogger('arvados.arvfile')
27
28 def split(path):
29     """split(path) -> streamname, filename
30
31     Separate the stream name and file name in a /-separated stream path and
32     return a tuple (stream_name, file_name).  If no stream name is available,
33     assume '.'.
34
35     """
36     try:
37         stream_name, file_name = path.rsplit('/', 1)
38     except ValueError:  # No / in string
39         stream_name, file_name = '.', path
40     return stream_name, file_name
41
42
43 class UnownedBlockError(Exception):
44     """Raised when there's an writable block without an owner on the BlockManager."""
45     pass
46
47
48 class _FileLikeObjectBase(object):
49     def __init__(self, name, mode):
50         self.name = name
51         self.mode = mode
52         self.closed = False
53
54     @staticmethod
55     def _before_close(orig_func):
56         @functools.wraps(orig_func)
57         def before_close_wrapper(self, *args, **kwargs):
58             if self.closed:
59                 raise ValueError("I/O operation on closed stream file")
60             return orig_func(self, *args, **kwargs)
61         return before_close_wrapper
62
63     def __enter__(self):
64         return self
65
66     def __exit__(self, exc_type, exc_value, traceback):
67         try:
68             self.close()
69         except Exception:
70             if exc_type is None:
71                 raise
72
73     def close(self):
74         self.closed = True
75
76
77 class ArvadosFileReaderBase(_FileLikeObjectBase):
78     def __init__(self, name, mode, num_retries=None):
79         super(ArvadosFileReaderBase, self).__init__(name, mode)
80         self._filepos = 0
81         self.num_retries = num_retries
82         self._readline_cache = (None, None)
83
84     def __iter__(self):
85         while True:
86             data = self.readline()
87             if not data:
88                 break
89             yield data
90
91     def decompressed_name(self):
92         return re.sub('\.(bz2|gz)$', '', self.name)
93
94     @_FileLikeObjectBase._before_close
95     def seek(self, pos, whence=os.SEEK_SET):
96         if whence == os.SEEK_CUR:
97             pos += self._filepos
98         elif whence == os.SEEK_END:
99             pos += self.size()
100         self._filepos = min(max(pos, 0), self.size())
101
102     def tell(self):
103         return self._filepos
104
105     @_FileLikeObjectBase._before_close
106     @retry_method
107     def readall(self, size=2**20, num_retries=None):
108         while True:
109             data = self.read(size, num_retries=num_retries)
110             if data == '':
111                 break
112             yield data
113
114     @_FileLikeObjectBase._before_close
115     @retry_method
116     def readline(self, size=float('inf'), num_retries=None):
117         cache_pos, cache_data = self._readline_cache
118         if self.tell() == cache_pos:
119             data = [cache_data]
120             self._filepos += len(cache_data)
121         else:
122             data = ['']
123         data_size = len(data[-1])
124         while (data_size < size) and ('\n' not in data[-1]):
125             next_read = self.read(2 ** 20, num_retries=num_retries)
126             if not next_read:
127                 break
128             data.append(next_read)
129             data_size += len(next_read)
130         data = ''.join(data)
131         try:
132             nextline_index = data.index('\n') + 1
133         except ValueError:
134             nextline_index = len(data)
135         nextline_index = min(nextline_index, size)
136         self._filepos -= len(data) - nextline_index
137         self._readline_cache = (self.tell(), data[nextline_index:])
138         return data[:nextline_index]
139
140     @_FileLikeObjectBase._before_close
141     @retry_method
142     def decompress(self, decompress, size, num_retries=None):
143         for segment in self.readall(size, num_retries=num_retries):
144             data = decompress(segment)
145             if data:
146                 yield data
147
148     @_FileLikeObjectBase._before_close
149     @retry_method
150     def readall_decompressed(self, size=2**20, num_retries=None):
151         self.seek(0)
152         if self.name.endswith('.bz2'):
153             dc = bz2.BZ2Decompressor()
154             return self.decompress(dc.decompress, size,
155                                    num_retries=num_retries)
156         elif self.name.endswith('.gz'):
157             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
158             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
159                                    size, num_retries=num_retries)
160         else:
161             return self.readall(size, num_retries=num_retries)
162
163     @_FileLikeObjectBase._before_close
164     @retry_method
165     def readlines(self, sizehint=float('inf'), num_retries=None):
166         data = []
167         data_size = 0
168         for s in self.readall(num_retries=num_retries):
169             data.append(s)
170             data_size += len(s)
171             if data_size >= sizehint:
172                 break
173         return ''.join(data).splitlines(True)
174
175     def size(self):
176         raise NotImplementedError()
177
178     def read(self, size, num_retries=None):
179         raise NotImplementedError()
180
181     def readfrom(self, start, size, num_retries=None):
182         raise NotImplementedError()
183
184
185 class StreamFileReader(ArvadosFileReaderBase):
186     class _NameAttribute(str):
187         # The Python file API provides a plain .name attribute.
188         # Older SDK provided a name() method.
189         # This class provides both, for maximum compatibility.
190         def __call__(self):
191             return self
192
193     def __init__(self, stream, segments, name):
194         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
195         self._stream = stream
196         self.segments = segments
197
198     def stream_name(self):
199         return self._stream.name()
200
201     def size(self):
202         n = self.segments[-1]
203         return n.range_start + n.range_size
204
205     @_FileLikeObjectBase._before_close
206     @retry_method
207     def read(self, size, num_retries=None):
208         """Read up to 'size' bytes from the stream, starting at the current file position"""
209         if size == 0:
210             return ''
211
212         data = ''
213         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
214         if available_chunks:
215             lr = available_chunks[0]
216             data = self._stream.readfrom(lr.locator+lr.segment_offset,
217                                          lr.segment_size,
218                                          num_retries=num_retries)
219
220         self._filepos += len(data)
221         return data
222
223     @_FileLikeObjectBase._before_close
224     @retry_method
225     def readfrom(self, start, size, num_retries=None):
226         """Read up to 'size' bytes from the stream, starting at 'start'"""
227         if size == 0:
228             return ''
229
230         data = []
231         for lr in locators_and_ranges(self.segments, start, size):
232             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
233                                               num_retries=num_retries))
234         return ''.join(data)
235
236     def as_manifest(self):
237         segs = []
238         for r in self.segments:
239             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
240         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
241
242
243 def synchronized(orig_func):
244     @functools.wraps(orig_func)
245     def synchronized_wrapper(self, *args, **kwargs):
246         with self.lock:
247             return orig_func(self, *args, **kwargs)
248     return synchronized_wrapper
249
250
251 class StateChangeError(Exception):
252     def __init__(self, message, state, nextstate):
253         super(StateChangeError, self).__init__(message)
254         self.state = state
255         self.nextstate = nextstate
256
257 class _BufferBlock(object):
258     """A stand-in for a Keep block that is in the process of being written.
259
260     Writers can append to it, get the size, and compute the Keep locator.
261     There are three valid states:
262
263     WRITABLE
264       Can append to block.
265
266     PENDING
267       Block is in the process of being uploaded to Keep, append is an error.
268
269     COMMITTED
270       The block has been written to Keep, its internal buffer has been
271       released, fetching the block will fetch it via keep client (since we
272       discarded the internal copy), and identifiers referring to the BufferBlock
273       can be replaced with the block locator.
274
275     """
276
277     WRITABLE = 0
278     PENDING = 1
279     COMMITTED = 2
280     ERROR = 3
281
282     def __init__(self, blockid, starting_capacity, owner):
283         """
284         :blockid:
285           the identifier for this block
286
287         :starting_capacity:
288           the initial buffer capacity
289
290         :owner:
291           ArvadosFile that owns this block
292
293         """
294         self.blockid = blockid
295         self.buffer_block = bytearray(starting_capacity)
296         self.buffer_view = memoryview(self.buffer_block)
297         self.write_pointer = 0
298         self._state = _BufferBlock.WRITABLE
299         self._locator = None
300         self.owner = owner
301         self.lock = threading.Lock()
302         self.wait_for_commit = threading.Event()
303         self.error = None
304
305     @synchronized
306     def append(self, data):
307         """Append some data to the buffer.
308
309         Only valid if the block is in WRITABLE state.  Implements an expanding
310         buffer, doubling capacity as needed to accomdate all the data.
311
312         """
313         if self._state == _BufferBlock.WRITABLE:
314             while (self.write_pointer+len(data)) > len(self.buffer_block):
315                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
316                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
317                 self.buffer_block = new_buffer_block
318                 self.buffer_view = memoryview(self.buffer_block)
319             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
320             self.write_pointer += len(data)
321             self._locator = None
322         else:
323             raise AssertionError("Buffer block is not writable")
324
325     STATE_TRANSITIONS = frozenset([
326             (WRITABLE, PENDING),
327             (PENDING, COMMITTED),
328             (PENDING, ERROR),
329             (ERROR, PENDING)])
330
331     @synchronized
332     def set_state(self, nextstate, val=None):
333         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
334             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
335         self._state = nextstate
336
337         if self._state == _BufferBlock.PENDING:
338             self.wait_for_commit.clear()
339
340         if self._state == _BufferBlock.COMMITTED:
341             self._locator = val
342             self.buffer_view = None
343             self.buffer_block = None
344             self.wait_for_commit.set()
345
346         if self._state == _BufferBlock.ERROR:
347             self.error = val
348             self.wait_for_commit.set()
349
350     @synchronized
351     def state(self):
352         return self._state
353
354     def size(self):
355         """The amount of data written to the buffer."""
356         return self.write_pointer
357
358     @synchronized
359     def locator(self):
360         """The Keep locator for this buffer's contents."""
361         if self._locator is None:
362             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
363         return self._locator
364
365     @synchronized
366     def clone(self, new_blockid, owner):
367         if self._state == _BufferBlock.COMMITTED:
368             raise AssertionError("Cannot duplicate committed buffer block")
369         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
370         bufferblock.append(self.buffer_view[0:self.size()])
371         return bufferblock
372
373     @synchronized
374     def clear(self):
375         self.owner = None
376         self.buffer_block = None
377         self.buffer_view = None
378
379
380 class NoopLock(object):
381     def __enter__(self):
382         return self
383
384     def __exit__(self, exc_type, exc_value, traceback):
385         pass
386
387     def acquire(self, blocking=False):
388         pass
389
390     def release(self):
391         pass
392
393
394 def must_be_writable(orig_func):
395     @functools.wraps(orig_func)
396     def must_be_writable_wrapper(self, *args, **kwargs):
397         if not self.writable():
398             raise IOError(errno.EROFS, "Collection is read-only.")
399         return orig_func(self, *args, **kwargs)
400     return must_be_writable_wrapper
401
402
403 class _BlockManager(object):
404     """BlockManager handles buffer blocks.
405
406     Also handles background block uploads, and background block prefetch for a
407     Collection of ArvadosFiles.
408
409     """
410
411     DEFAULT_PUT_THREADS = 2
412     DEFAULT_GET_THREADS = 2
413
414     def __init__(self, keep, copies=None, put_threads=None):
415         """keep: KeepClient object to use"""
416         self._keep = keep
417         self._bufferblocks = collections.OrderedDict()
418         self._put_queue = None
419         self._put_threads = None
420         self._prefetch_queue = None
421         self._prefetch_threads = None
422         self.lock = threading.Lock()
423         self.prefetch_enabled = True
424         if put_threads:
425             self.num_put_threads = put_threads
426         else:
427             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
428         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
429         self.copies = copies
430         self._pending_write_size = 0
431         self.threads_lock = threading.Lock()
432
433     @synchronized
434     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
435         """Allocate a new, empty bufferblock in WRITABLE state and return it.
436
437         :blockid:
438           optional block identifier, otherwise one will be automatically assigned
439
440         :starting_capacity:
441           optional capacity, otherwise will use default capacity
442
443         :owner:
444           ArvadosFile that owns this block
445
446         """
447         return self._alloc_bufferblock(blockid, starting_capacity, owner)
448
449     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
450         if blockid is None:
451             blockid = "%s" % uuid.uuid4()
452         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
453         self._bufferblocks[bufferblock.blockid] = bufferblock
454         return bufferblock
455
456     @synchronized
457     def dup_block(self, block, owner):
458         """Create a new bufferblock initialized with the content of an existing bufferblock.
459
460         :block:
461           the buffer block to copy.
462
463         :owner:
464           ArvadosFile that owns the new block
465
466         """
467         new_blockid = "bufferblock%i" % len(self._bufferblocks)
468         bufferblock = block.clone(new_blockid, owner)
469         self._bufferblocks[bufferblock.blockid] = bufferblock
470         return bufferblock
471
472     @synchronized
473     def is_bufferblock(self, locator):
474         return locator in self._bufferblocks
475
476     def _commit_bufferblock_worker(self):
477         """Background uploader thread."""
478
479         while True:
480             try:
481                 bufferblock = self._put_queue.get()
482                 if bufferblock is None:
483                     return
484
485                 if self.copies is None:
486                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
487                 else:
488                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
489                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
490
491             except Exception as e:
492                 bufferblock.set_state(_BufferBlock.ERROR, e)
493             finally:
494                 if self._put_queue is not None:
495                     self._put_queue.task_done()
496
497     def start_put_threads(self):
498         with self.threads_lock:
499             if self._put_threads is None:
500                 # Start uploader threads.
501
502                 # If we don't limit the Queue size, the upload queue can quickly
503                 # grow to take up gigabytes of RAM if the writing process is
504                 # generating data more quickly than it can be send to the Keep
505                 # servers.
506                 #
507                 # With two upload threads and a queue size of 2, this means up to 4
508                 # blocks pending.  If they are full 64 MiB blocks, that means up to
509                 # 256 MiB of internal buffering, which is the same size as the
510                 # default download block cache in KeepClient.
511                 self._put_queue = Queue.Queue(maxsize=2)
512
513                 self._put_threads = []
514                 for i in xrange(0, self.num_put_threads):
515                     thread = threading.Thread(target=self._commit_bufferblock_worker)
516                     self._put_threads.append(thread)
517                     thread.daemon = True
518                     thread.start()
519
520     def _block_prefetch_worker(self):
521         """The background downloader thread."""
522         while True:
523             try:
524                 b = self._prefetch_queue.get()
525                 if b is None:
526                     return
527                 self._keep.get(b)
528             except Exception:
529                 _logger.exception("Exception doing block prefetch")
530
531     @synchronized
532     def start_get_threads(self):
533         if self._prefetch_threads is None:
534             self._prefetch_queue = Queue.Queue()
535             self._prefetch_threads = []
536             for i in xrange(0, self.num_get_threads):
537                 thread = threading.Thread(target=self._block_prefetch_worker)
538                 self._prefetch_threads.append(thread)
539                 thread.daemon = True
540                 thread.start()
541
542
543     @synchronized
544     def stop_threads(self):
545         """Shut down and wait for background upload and download threads to finish."""
546
547         if self._put_threads is not None:
548             for t in self._put_threads:
549                 self._put_queue.put(None)
550             for t in self._put_threads:
551                 t.join()
552         self._put_threads = None
553         self._put_queue = None
554
555         if self._prefetch_threads is not None:
556             for t in self._prefetch_threads:
557                 self._prefetch_queue.put(None)
558             for t in self._prefetch_threads:
559                 t.join()
560         self._prefetch_threads = None
561         self._prefetch_queue = None
562
563     def __enter__(self):
564         return self
565
566     def __exit__(self, exc_type, exc_value, traceback):
567         self.stop_threads()
568
569     @synchronized
570     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
571         """Packs small blocks together before uploading"""
572         self._pending_write_size += closed_file_size
573
574         # Check if there are enough small blocks for filling up one in full
575         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
576
577             # Search blocks ready for getting packed together before being committed to Keep.
578             # A WRITABLE block always has an owner.
579             # A WRITABLE block with its owner.closed() implies that it's
580             # size is <= KEEP_BLOCK_SIZE/2.
581             try:
582                 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
583             except AttributeError:
584                 # Writable blocks without owner shouldn't exist.
585                 raise UnownedBlockError()
586
587             if len(small_blocks) <= 1:
588                 # Not enough small blocks for repacking
589                 return
590
591             # Update the pending write size count with its true value, just in case
592             # some small file was opened, written and closed several times.
593             self._pending_write_size = sum([b.size() for b in small_blocks])
594             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
595                 return
596
597             new_bb = self._alloc_bufferblock()
598             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
599                 bb = small_blocks.pop(0)
600                 arvfile = bb.owner
601                 self._pending_write_size -= bb.size()
602                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
603                 arvfile.set_segments([Range(new_bb.blockid,
604                                             0,
605                                             bb.size(),
606                                             new_bb.write_pointer - bb.size())])
607                 self._delete_bufferblock(bb.blockid)
608             self.commit_bufferblock(new_bb, sync=sync)
609
610     def commit_bufferblock(self, block, sync):
611         """Initiate a background upload of a bufferblock.
612
613         :block:
614           The block object to upload
615
616         :sync:
617           If `sync` is True, upload the block synchronously.
618           If `sync` is False, upload the block asynchronously.  This will
619           return immediately unless the upload queue is at capacity, in
620           which case it will wait on an upload queue slot.
621
622         """
623         try:
624             # Mark the block as PENDING so to disallow any more appends.
625             block.set_state(_BufferBlock.PENDING)
626         except StateChangeError as e:
627             if e.state == _BufferBlock.PENDING:
628                 if sync:
629                     block.wait_for_commit.wait()
630                 else:
631                     return
632             if block.state() == _BufferBlock.COMMITTED:
633                 return
634             elif block.state() == _BufferBlock.ERROR:
635                 raise block.error
636             else:
637                 raise
638
639         if sync:
640             try:
641                 if self.copies is None:
642                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
643                 else:
644                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
645                 block.set_state(_BufferBlock.COMMITTED, loc)
646             except Exception as e:
647                 block.set_state(_BufferBlock.ERROR, e)
648                 raise
649         else:
650             self.start_put_threads()
651             self._put_queue.put(block)
652
653     @synchronized
654     def get_bufferblock(self, locator):
655         return self._bufferblocks.get(locator)
656
657     @synchronized
658     def delete_bufferblock(self, locator):
659         self._delete_bufferblock(locator)
660
661     def _delete_bufferblock(self, locator):
662         bb = self._bufferblocks[locator]
663         bb.clear()
664         del self._bufferblocks[locator]
665
666     def get_block_contents(self, locator, num_retries, cache_only=False):
667         """Fetch a block.
668
669         First checks to see if the locator is a BufferBlock and return that, if
670         not, passes the request through to KeepClient.get().
671
672         """
673         with self.lock:
674             if locator in self._bufferblocks:
675                 bufferblock = self._bufferblocks[locator]
676                 if bufferblock.state() != _BufferBlock.COMMITTED:
677                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
678                 else:
679                     locator = bufferblock._locator
680         if cache_only:
681             return self._keep.get_from_cache(locator)
682         else:
683             return self._keep.get(locator, num_retries=num_retries)
684
685     def commit_all(self):
686         """Commit all outstanding buffer blocks.
687
688         This is a synchronous call, and will not return until all buffer blocks
689         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
690
691         """
692         self.repack_small_blocks(force=True, sync=True)
693
694         with self.lock:
695             items = self._bufferblocks.items()
696
697         for k,v in items:
698             if v.state() != _BufferBlock.COMMITTED and v.owner:
699                 v.owner.flush(sync=False)
700
701         with self.lock:
702             if self._put_queue is not None:
703                 self._put_queue.join()
704
705                 err = []
706                 for k,v in items:
707                     if v.state() == _BufferBlock.ERROR:
708                         err.append((v.locator(), v.error))
709                 if err:
710                     raise KeepWriteError("Error writing some blocks", err, label="block")
711
712         for k,v in items:
713             # flush again with sync=True to remove committed bufferblocks from
714             # the segments.
715             if v.owner:
716                 v.owner.flush(sync=True)
717
718     def block_prefetch(self, locator):
719         """Initiate a background download of a block.
720
721         This assumes that the underlying KeepClient implements a block cache,
722         so repeated requests for the same block will not result in repeated
723         downloads (unless the block is evicted from the cache.)  This method
724         does not block.
725
726         """
727
728         if not self.prefetch_enabled:
729             return
730
731         if self._keep.get_from_cache(locator) is not None:
732             return
733
734         with self.lock:
735             if locator in self._bufferblocks:
736                 return
737
738         self.start_get_threads()
739         self._prefetch_queue.put(locator)
740
741
742 class ArvadosFile(object):
743     """Represent a file in a Collection.
744
745     ArvadosFile manages the underlying representation of a file in Keep as a
746     sequence of segments spanning a set of blocks, and implements random
747     read/write access.
748
749     This object may be accessed from multiple threads.
750
751     """
752
753     def __init__(self, parent, name, stream=[], segments=[]):
754         """
755         ArvadosFile constructor.
756
757         :stream:
758           a list of Range objects representing a block stream
759
760         :segments:
761           a list of Range objects representing segments
762         """
763         self.parent = parent
764         self.name = name
765         self._writers = set()
766         self._committed = False
767         self._segments = []
768         self.lock = parent.root_collection().lock
769         for s in segments:
770             self._add_segment(stream, s.locator, s.range_size)
771         self._current_bblock = None
772
773     def writable(self):
774         return self.parent.writable()
775
776     @synchronized
777     def permission_expired(self, as_of_dt=None):
778         """Returns True if any of the segment's locators is expired"""
779         for r in self._segments:
780             if KeepLocator(r.locator).permission_expired(as_of_dt):
781                 return True
782         return False
783
784     @synchronized
785     def segments(self):
786         return copy.copy(self._segments)
787
788     @synchronized
789     def clone(self, new_parent, new_name):
790         """Make a copy of this file."""
791         cp = ArvadosFile(new_parent, new_name)
792         cp.replace_contents(self)
793         return cp
794
795     @must_be_writable
796     @synchronized
797     def replace_contents(self, other):
798         """Replace segments of this file with segments from another `ArvadosFile` object."""
799
800         map_loc = {}
801         self._segments = []
802         for other_segment in other.segments():
803             new_loc = other_segment.locator
804             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
805                 if other_segment.locator not in map_loc:
806                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
807                     if bufferblock.state() != _BufferBlock.WRITABLE:
808                         map_loc[other_segment.locator] = bufferblock.locator()
809                     else:
810                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
811                 new_loc = map_loc[other_segment.locator]
812
813             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
814
815         self.set_committed(False)
816
817     def __eq__(self, other):
818         if other is self:
819             return True
820         if not isinstance(other, ArvadosFile):
821             return False
822
823         othersegs = other.segments()
824         with self.lock:
825             if len(self._segments) != len(othersegs):
826                 return False
827             for i in xrange(0, len(othersegs)):
828                 seg1 = self._segments[i]
829                 seg2 = othersegs[i]
830                 loc1 = seg1.locator
831                 loc2 = seg2.locator
832
833                 if self.parent._my_block_manager().is_bufferblock(loc1):
834                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
835
836                 if other.parent._my_block_manager().is_bufferblock(loc2):
837                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
838
839                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
840                     seg1.range_start != seg2.range_start or
841                     seg1.range_size != seg2.range_size or
842                     seg1.segment_offset != seg2.segment_offset):
843                     return False
844
845         return True
846
847     def __ne__(self, other):
848         return not self.__eq__(other)
849
850     @synchronized
851     def set_segments(self, segs):
852         self._segments = segs
853
854     @synchronized
855     def set_committed(self, value=True):
856         """Set committed flag.
857
858         If value is True, set committed to be True.
859
860         If value is False, set committed to be False for this and all parents.
861         """
862         if value == self._committed:
863             return
864         self._committed = value
865         if self._committed is False and self.parent is not None:
866             self.parent.set_committed(False)
867
868     @synchronized
869     def committed(self):
870         """Get whether this is committed or not."""
871         return self._committed
872
873     @synchronized
874     def add_writer(self, writer):
875         """Add an ArvadosFileWriter reference to the list of writers"""
876         if isinstance(writer, ArvadosFileWriter):
877             self._writers.add(writer)
878
879     @synchronized
880     def remove_writer(self, writer, flush):
881         """
882         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
883         and do some block maintenance tasks.
884         """
885         self._writers.remove(writer)
886
887         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
888             # File writer closed, not small enough for repacking
889             self.flush()
890         elif self.closed():
891             # All writers closed and size is adequate for repacking
892             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
893
894     def closed(self):
895         """
896         Get whether this is closed or not. When the writers list is empty, the file
897         is supposed to be closed.
898         """
899         return len(self._writers) == 0
900
901     @must_be_writable
902     @synchronized
903     def truncate(self, size):
904         """Shrink the size of the file.
905
906         If `size` is less than the size of the file, the file contents after
907         `size` will be discarded.  If `size` is greater than the current size
908         of the file, an IOError will be raised.
909
910         """
911         if size < self.size():
912             new_segs = []
913             for r in self._segments:
914                 range_end = r.range_start+r.range_size
915                 if r.range_start >= size:
916                     # segment is past the trucate size, all done
917                     break
918                 elif size < range_end:
919                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
920                     nr.segment_offset = r.segment_offset
921                     new_segs.append(nr)
922                     break
923                 else:
924                     new_segs.append(r)
925
926             self._segments = new_segs
927             self.set_committed(False)
928         elif size > self.size():
929             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
930
931     def readfrom(self, offset, size, num_retries, exact=False):
932         """Read up to `size` bytes from the file starting at `offset`.
933
934         :exact:
935          If False (default), return less data than requested if the read
936          crosses a block boundary and the next block isn't cached.  If True,
937          only return less data than requested when hitting EOF.
938         """
939
940         with self.lock:
941             if size == 0 or offset >= self.size():
942                 return ''
943             readsegs = locators_and_ranges(self._segments, offset, size)
944             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
945
946         locs = set()
947         data = []
948         for lr in readsegs:
949             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
950             if block:
951                 blockview = memoryview(block)
952                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
953                 locs.add(lr.locator)
954             else:
955                 break
956
957         for lr in prefetch:
958             if lr.locator not in locs:
959                 self.parent._my_block_manager().block_prefetch(lr.locator)
960                 locs.add(lr.locator)
961
962         return ''.join(data)
963
964     def _repack_writes(self, num_retries):
965         """Test if the buffer block has more data than actual segments.
966
967         This happens when a buffered write over-writes a file range written in
968         a previous buffered write.  Re-pack the buffer block for efficiency
969         and to avoid leaking information.
970
971         """
972         segs = self._segments
973
974         # Sum up the segments to get the total bytes of the file referencing
975         # into the buffer block.
976         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
977         write_total = sum([s.range_size for s in bufferblock_segs])
978
979         if write_total < self._current_bblock.size():
980             # There is more data in the buffer block than is actually accounted for by segments, so
981             # re-pack into a new buffer by copying over to a new buffer block.
982             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
983             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
984             for t in bufferblock_segs:
985                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
986                 t.segment_offset = new_bb.size() - t.range_size
987
988             self._current_bblock = new_bb
989
990     @must_be_writable
991     @synchronized
992     def writeto(self, offset, data, num_retries):
993         """Write `data` to the file starting at `offset`.
994
995         This will update existing bytes and/or extend the size of the file as
996         necessary.
997
998         """
999         if len(data) == 0:
1000             return
1001
1002         if offset > self.size():
1003             raise ArgumentError("Offset is past the end of the file")
1004
1005         if len(data) > config.KEEP_BLOCK_SIZE:
1006             # Chunk it up into smaller writes
1007             n = 0
1008             dataview = memoryview(data)
1009             while n < len(data):
1010                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1011                 n += config.KEEP_BLOCK_SIZE
1012             return
1013
1014         self.set_committed(False)
1015
1016         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1017             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1018
1019         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1020             self._repack_writes(num_retries)
1021             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1022                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1023                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1024
1025         self._current_bblock.append(data)
1026
1027         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1028
1029         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1030
1031         return len(data)
1032
1033     @synchronized
1034     def flush(self, sync=True, num_retries=0):
1035         """Flush the current bufferblock to Keep.
1036
1037         :sync:
1038           If True, commit block synchronously, wait until buffer block has been written.
1039           If False, commit block asynchronously, return immediately after putting block into
1040           the keep put queue.
1041         """
1042         if self.committed():
1043             return
1044
1045         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1046             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1047                 self._repack_writes(num_retries)
1048             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1049
1050         if sync:
1051             to_delete = set()
1052             for s in self._segments:
1053                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1054                 if bb:
1055                     if bb.state() != _BufferBlock.COMMITTED:
1056                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1057                     to_delete.add(s.locator)
1058                     s.locator = bb.locator()
1059             for s in to_delete:
1060                self.parent._my_block_manager().delete_bufferblock(s)
1061
1062         self.parent.notify(MOD, self.parent, self.name, (self, self))
1063
1064     @must_be_writable
1065     @synchronized
1066     def add_segment(self, blocks, pos, size):
1067         """Add a segment to the end of the file.
1068
1069         `pos` and `offset` reference a section of the stream described by
1070         `blocks` (a list of Range objects)
1071
1072         """
1073         self._add_segment(blocks, pos, size)
1074
1075     def _add_segment(self, blocks, pos, size):
1076         """Internal implementation of add_segment."""
1077         self.set_committed(False)
1078         for lr in locators_and_ranges(blocks, pos, size):
1079             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1080             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1081             self._segments.append(r)
1082
1083     @synchronized
1084     def size(self):
1085         """Get the file size."""
1086         if self._segments:
1087             n = self._segments[-1]
1088             return n.range_start + n.range_size
1089         else:
1090             return 0
1091
1092     @synchronized
1093     def manifest_text(self, stream_name=".", portable_locators=False,
1094                       normalize=False, only_committed=False):
1095         buf = ""
1096         filestream = []
1097         for segment in self.segments:
1098             loc = segment.locator
1099             if self.parent._my_block_manager().is_bufferblock(loc):
1100                 if only_committed:
1101                     continue
1102                 loc = self._bufferblocks[loc].calculate_locator()
1103             if portable_locators:
1104                 loc = KeepLocator(loc).stripped()
1105             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1106                                  segment.segment_offset, segment.range_size))
1107         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1108         buf += "\n"
1109         return buf
1110
1111     @must_be_writable
1112     @synchronized
1113     def _reparent(self, newparent, newname):
1114         self.set_committed(False)
1115         self.flush(sync=True)
1116         self.parent.remove(self.name)
1117         self.parent = newparent
1118         self.name = newname
1119         self.lock = self.parent.root_collection().lock
1120
1121
1122 class ArvadosFileReader(ArvadosFileReaderBase):
1123     """Wraps ArvadosFile in a file-like object supporting reading only.
1124
1125     Be aware that this class is NOT thread safe as there is no locking around
1126     updating file pointer.
1127
1128     """
1129
1130     def __init__(self, arvadosfile, num_retries=None):
1131         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1132         self.arvadosfile = arvadosfile
1133
1134     def size(self):
1135         return self.arvadosfile.size()
1136
1137     def stream_name(self):
1138         return self.arvadosfile.parent.stream_name()
1139
1140     @_FileLikeObjectBase._before_close
1141     @retry_method
1142     def read(self, size=None, num_retries=None):
1143         """Read up to `size` bytes from the file and return the result.
1144
1145         Starts at the current file position.  If `size` is None, read the
1146         entire remainder of the file.
1147         """
1148         if size is None:
1149             data = []
1150             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1151             while rd:
1152                 data.append(rd)
1153                 self._filepos += len(rd)
1154                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1155             return ''.join(data)
1156         else:
1157             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1158             self._filepos += len(data)
1159             return data
1160
1161     @_FileLikeObjectBase._before_close
1162     @retry_method
1163     def readfrom(self, offset, size, num_retries=None):
1164         """Read up to `size` bytes from the stream, starting at the specified file offset.
1165
1166         This method does not change the file position.
1167         """
1168         return self.arvadosfile.readfrom(offset, size, num_retries)
1169
1170     def flush(self):
1171         pass
1172
1173
1174 class ArvadosFileWriter(ArvadosFileReader):
1175     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1176
1177     Be aware that this class is NOT thread safe as there is no locking around
1178     updating file pointer.
1179
1180     """
1181
1182     def __init__(self, arvadosfile, mode, num_retries=None):
1183         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1184         self.mode = mode
1185         self.arvadosfile.add_writer(self)
1186
1187     @_FileLikeObjectBase._before_close
1188     @retry_method
1189     def write(self, data, num_retries=None):
1190         if self.mode[0] == "a":
1191             self.arvadosfile.writeto(self.size(), data, num_retries)
1192         else:
1193             self.arvadosfile.writeto(self._filepos, data, num_retries)
1194             self._filepos += len(data)
1195         return len(data)
1196
1197     @_FileLikeObjectBase._before_close
1198     @retry_method
1199     def writelines(self, seq, num_retries=None):
1200         for s in seq:
1201             self.write(s, num_retries=num_retries)
1202
1203     @_FileLikeObjectBase._before_close
1204     def truncate(self, size=None):
1205         if size is None:
1206             size = self._filepos
1207         self.arvadosfile.truncate(size)
1208         if self._filepos > self.size():
1209             self._filepos = self.size()
1210
1211     @_FileLikeObjectBase._before_close
1212     def flush(self):
1213         self.arvadosfile.flush()
1214
1215     def close(self, flush=True):
1216         if not self.closed:
1217             self.arvadosfile.remove_writer(self, flush)
1218             super(ArvadosFileWriter, self).close()