Merge branch '6347-log-timestamps'
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41 class _FileLikeObjectBase(object):
42     def __init__(self, name, mode):
43         self.name = name
44         self.mode = mode
45         self.closed = False
46
47     @staticmethod
48     def _before_close(orig_func):
49         @functools.wraps(orig_func)
50         def before_close_wrapper(self, *args, **kwargs):
51             if self.closed:
52                 raise ValueError("I/O operation on closed stream file")
53             return orig_func(self, *args, **kwargs)
54         return before_close_wrapper
55
56     def __enter__(self):
57         return self
58
59     def __exit__(self, exc_type, exc_value, traceback):
60         try:
61             self.close()
62         except Exception:
63             if exc_type is None:
64                 raise
65
66     def close(self):
67         self.closed = True
68
69
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71     def __init__(self, name, mode, num_retries=None):
72         super(ArvadosFileReaderBase, self).__init__(name, mode)
73         self._filepos = 0L
74         self.num_retries = num_retries
75         self._readline_cache = (None, None)
76
77     def __iter__(self):
78         while True:
79             data = self.readline()
80             if not data:
81                 break
82             yield data
83
84     def decompressed_name(self):
85         return re.sub('\.(bz2|gz)$', '', self.name)
86
87     @_FileLikeObjectBase._before_close
88     def seek(self, pos, whence=os.SEEK_SET):
89         if whence == os.SEEK_CUR:
90             pos += self._filepos
91         elif whence == os.SEEK_END:
92             pos += self.size()
93         self._filepos = min(max(pos, 0L), self.size())
94
95     def tell(self):
96         return self._filepos
97
98     @_FileLikeObjectBase._before_close
99     @retry_method
100     def readall(self, size=2**20, num_retries=None):
101         while True:
102             data = self.read(size, num_retries=num_retries)
103             if data == '':
104                 break
105             yield data
106
107     @_FileLikeObjectBase._before_close
108     @retry_method
109     def readline(self, size=float('inf'), num_retries=None):
110         cache_pos, cache_data = self._readline_cache
111         if self.tell() == cache_pos:
112             data = [cache_data]
113             self._filepos += len(cache_data)
114         else:
115             data = ['']
116         data_size = len(data[-1])
117         while (data_size < size) and ('\n' not in data[-1]):
118             next_read = self.read(2 ** 20, num_retries=num_retries)
119             if not next_read:
120                 break
121             data.append(next_read)
122             data_size += len(next_read)
123         data = ''.join(data)
124         try:
125             nextline_index = data.index('\n') + 1
126         except ValueError:
127             nextline_index = len(data)
128         nextline_index = min(nextline_index, size)
129         self._filepos -= len(data) - nextline_index
130         self._readline_cache = (self.tell(), data[nextline_index:])
131         return data[:nextline_index]
132
133     @_FileLikeObjectBase._before_close
134     @retry_method
135     def decompress(self, decompress, size, num_retries=None):
136         for segment in self.readall(size, num_retries=num_retries):
137             data = decompress(segment)
138             if data:
139                 yield data
140
141     @_FileLikeObjectBase._before_close
142     @retry_method
143     def readall_decompressed(self, size=2**20, num_retries=None):
144         self.seek(0)
145         if self.name.endswith('.bz2'):
146             dc = bz2.BZ2Decompressor()
147             return self.decompress(dc.decompress, size,
148                                    num_retries=num_retries)
149         elif self.name.endswith('.gz'):
150             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152                                    size, num_retries=num_retries)
153         else:
154             return self.readall(size, num_retries=num_retries)
155
156     @_FileLikeObjectBase._before_close
157     @retry_method
158     def readlines(self, sizehint=float('inf'), num_retries=None):
159         data = []
160         data_size = 0
161         for s in self.readall(num_retries=num_retries):
162             data.append(s)
163             data_size += len(s)
164             if data_size >= sizehint:
165                 break
166         return ''.join(data).splitlines(True)
167
168     def size(self):
169         raise NotImplementedError()
170
171     def read(self, size, num_retries=None):
172         raise NotImplementedError()
173
174     def readfrom(self, start, size, num_retries=None):
175         raise NotImplementedError()
176
177
178 class StreamFileReader(ArvadosFileReaderBase):
179     class _NameAttribute(str):
180         # The Python file API provides a plain .name attribute.
181         # Older SDK provided a name() method.
182         # This class provides both, for maximum compatibility.
183         def __call__(self):
184             return self
185
186     def __init__(self, stream, segments, name):
187         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188         self._stream = stream
189         self.segments = segments
190
191     def stream_name(self):
192         return self._stream.name()
193
194     def size(self):
195         n = self.segments[-1]
196         return n.range_start + n.range_size
197
198     @_FileLikeObjectBase._before_close
199     @retry_method
200     def read(self, size, num_retries=None):
201         """Read up to 'size' bytes from the stream, starting at the current file position"""
202         if size == 0:
203             return ''
204
205         data = ''
206         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
207         if available_chunks:
208             lr = available_chunks[0]
209             data = self._stream.readfrom(lr.locator+lr.segment_offset,
210                                           lr.segment_size,
211                                           num_retries=num_retries)
212
213         self._filepos += len(data)
214         return data
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def readfrom(self, start, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at 'start'"""
220         if size == 0:
221             return ''
222
223         data = []
224         for lr in locators_and_ranges(self.segments, start, size):
225             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226                                               num_retries=num_retries))
227         return ''.join(data)
228
229     def as_manifest(self):
230         segs = []
231         for r in self.segments:
232             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234
235
236 def synchronized(orig_func):
237     @functools.wraps(orig_func)
238     def synchronized_wrapper(self, *args, **kwargs):
239         with self.lock:
240             return orig_func(self, *args, **kwargs)
241     return synchronized_wrapper
242
243
244 class StateChangeError(Exception):
245     def __init__(self, message, state, nextstate):
246         super(StateChangeError, self).__init__(message)
247         self.state = state
248         self.nextstate = nextstate
249
250 class _BufferBlock(object):
251     """A stand-in for a Keep block that is in the process of being written.
252
253     Writers can append to it, get the size, and compute the Keep locator.
254     There are three valid states:
255
256     WRITABLE
257       Can append to block.
258
259     PENDING
260       Block is in the process of being uploaded to Keep, append is an error.
261
262     COMMITTED
263       The block has been written to Keep, its internal buffer has been
264       released, fetching the block will fetch it via keep client (since we
265       discarded the internal copy), and identifiers referring to the BufferBlock
266       can be replaced with the block locator.
267
268     """
269
270     WRITABLE = 0
271     PENDING = 1
272     COMMITTED = 2
273     ERROR = 3
274
275     def __init__(self, blockid, starting_capacity, owner):
276         """
277         :blockid:
278           the identifier for this block
279
280         :starting_capacity:
281           the initial buffer capacity
282
283         :owner:
284           ArvadosFile that owns this block
285
286         """
287         self.blockid = blockid
288         self.buffer_block = bytearray(starting_capacity)
289         self.buffer_view = memoryview(self.buffer_block)
290         self.write_pointer = 0
291         self._state = _BufferBlock.WRITABLE
292         self._locator = None
293         self.owner = owner
294         self.lock = threading.Lock()
295         self.wait_for_commit = threading.Event()
296         self.error = None
297
298     @synchronized
299     def append(self, data):
300         """Append some data to the buffer.
301
302         Only valid if the block is in WRITABLE state.  Implements an expanding
303         buffer, doubling capacity as needed to accomdate all the data.
304
305         """
306         if self._state == _BufferBlock.WRITABLE:
307             while (self.write_pointer+len(data)) > len(self.buffer_block):
308                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310                 self.buffer_block = new_buffer_block
311                 self.buffer_view = memoryview(self.buffer_block)
312             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313             self.write_pointer += len(data)
314             self._locator = None
315         else:
316             raise AssertionError("Buffer block is not writable")
317
318     STATE_TRANSITIONS = frozenset([
319             (WRITABLE, PENDING),
320             (PENDING, COMMITTED),
321             (PENDING, ERROR),
322             (ERROR, PENDING)])
323
324     @synchronized
325     def set_state(self, nextstate, val=None):
326         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328         self._state = nextstate
329
330         if self._state == _BufferBlock.PENDING:
331             self.wait_for_commit.clear()
332
333         if self._state == _BufferBlock.COMMITTED:
334             self._locator = val
335             self.buffer_view = None
336             self.buffer_block = None
337             self.wait_for_commit.set()
338
339         if self._state == _BufferBlock.ERROR:
340             self.error = val
341             self.wait_for_commit.set()
342
343     @synchronized
344     def state(self):
345         return self._state
346
347     def size(self):
348         """The amount of data written to the buffer."""
349         return self.write_pointer
350
351     @synchronized
352     def locator(self):
353         """The Keep locator for this buffer's contents."""
354         if self._locator is None:
355             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
356         return self._locator
357
358     @synchronized
359     def clone(self, new_blockid, owner):
360         if self._state == _BufferBlock.COMMITTED:
361             raise AssertionError("Cannot duplicate committed buffer block")
362         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363         bufferblock.append(self.buffer_view[0:self.size()])
364         return bufferblock
365
366     @synchronized
367     def clear(self):
368         self.owner = None
369         self.buffer_block = None
370         self.buffer_view = None
371
372
373 class NoopLock(object):
374     def __enter__(self):
375         return self
376
377     def __exit__(self, exc_type, exc_value, traceback):
378         pass
379
380     def acquire(self, blocking=False):
381         pass
382
383     def release(self):
384         pass
385
386
387 def must_be_writable(orig_func):
388     @functools.wraps(orig_func)
389     def must_be_writable_wrapper(self, *args, **kwargs):
390         if not self.writable():
391             raise IOError(errno.EROFS, "Collection is read-only.")
392         return orig_func(self, *args, **kwargs)
393     return must_be_writable_wrapper
394
395
396 class _BlockManager(object):
397     """BlockManager handles buffer blocks.
398
399     Also handles background block uploads, and background block prefetch for a
400     Collection of ArvadosFiles.
401
402     """
403
404     DEFAULT_PUT_THREADS = 2
405     DEFAULT_GET_THREADS = 2
406
407     def __init__(self, keep, copies=None, put_threads=None):
408         """keep: KeepClient object to use"""
409         self._keep = keep
410         self._bufferblocks = collections.OrderedDict()
411         self._put_queue = None
412         self._put_threads = None
413         self._prefetch_queue = None
414         self._prefetch_threads = None
415         self.lock = threading.Lock()
416         self.prefetch_enabled = True
417         if put_threads:
418             self.num_put_threads = put_threads
419         else:
420             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
421         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
422         self.copies = copies
423         self._pending_write_size = 0
424         self.threads_lock = threading.Lock()
425
426     @synchronized
427     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
428         """Allocate a new, empty bufferblock in WRITABLE state and return it.
429
430         :blockid:
431           optional block identifier, otherwise one will be automatically assigned
432
433         :starting_capacity:
434           optional capacity, otherwise will use default capacity
435
436         :owner:
437           ArvadosFile that owns this block
438
439         """
440         return self._alloc_bufferblock(blockid, starting_capacity, owner)
441
442     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
443         if blockid is None:
444             blockid = "%s" % uuid.uuid4()
445         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
446         self._bufferblocks[bufferblock.blockid] = bufferblock
447         return bufferblock
448
449     @synchronized
450     def dup_block(self, block, owner):
451         """Create a new bufferblock initialized with the content of an existing bufferblock.
452
453         :block:
454           the buffer block to copy.
455
456         :owner:
457           ArvadosFile that owns the new block
458
459         """
460         new_blockid = "bufferblock%i" % len(self._bufferblocks)
461         bufferblock = block.clone(new_blockid, owner)
462         self._bufferblocks[bufferblock.blockid] = bufferblock
463         return bufferblock
464
465     @synchronized
466     def is_bufferblock(self, locator):
467         return locator in self._bufferblocks
468
469     def _commit_bufferblock_worker(self):
470         """Background uploader thread."""
471
472         while True:
473             try:
474                 bufferblock = self._put_queue.get()
475                 if bufferblock is None:
476                     return
477
478                 if self.copies is None:
479                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
480                 else:
481                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
482                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
483
484             except Exception as e:
485                 bufferblock.set_state(_BufferBlock.ERROR, e)
486             finally:
487                 if self._put_queue is not None:
488                     self._put_queue.task_done()
489
490     def start_put_threads(self):
491         with self.threads_lock:
492             if self._put_threads is None:
493                 # Start uploader threads.
494
495                 # If we don't limit the Queue size, the upload queue can quickly
496                 # grow to take up gigabytes of RAM if the writing process is
497                 # generating data more quickly than it can be send to the Keep
498                 # servers.
499                 #
500                 # With two upload threads and a queue size of 2, this means up to 4
501                 # blocks pending.  If they are full 64 MiB blocks, that means up to
502                 # 256 MiB of internal buffering, which is the same size as the
503                 # default download block cache in KeepClient.
504                 self._put_queue = Queue.Queue(maxsize=2)
505
506                 self._put_threads = []
507                 for i in xrange(0, self.num_put_threads):
508                     thread = threading.Thread(target=self._commit_bufferblock_worker)
509                     self._put_threads.append(thread)
510                     thread.daemon = True
511                     thread.start()
512
513     def _block_prefetch_worker(self):
514         """The background downloader thread."""
515         while True:
516             try:
517                 b = self._prefetch_queue.get()
518                 if b is None:
519                     return
520                 self._keep.get(b)
521             except Exception:
522                 _logger.exception("Exception doing block prefetch")
523
524     @synchronized
525     def start_get_threads(self):
526         if self._prefetch_threads is None:
527             self._prefetch_queue = Queue.Queue()
528             self._prefetch_threads = []
529             for i in xrange(0, self.num_get_threads):
530                 thread = threading.Thread(target=self._block_prefetch_worker)
531                 self._prefetch_threads.append(thread)
532                 thread.daemon = True
533                 thread.start()
534
535
536     @synchronized
537     def stop_threads(self):
538         """Shut down and wait for background upload and download threads to finish."""
539
540         if self._put_threads is not None:
541             for t in self._put_threads:
542                 self._put_queue.put(None)
543             for t in self._put_threads:
544                 t.join()
545         self._put_threads = None
546         self._put_queue = None
547
548         if self._prefetch_threads is not None:
549             for t in self._prefetch_threads:
550                 self._prefetch_queue.put(None)
551             for t in self._prefetch_threads:
552                 t.join()
553         self._prefetch_threads = None
554         self._prefetch_queue = None
555
556     def __enter__(self):
557         return self
558
559     def __exit__(self, exc_type, exc_value, traceback):
560         self.stop_threads()
561
562     @synchronized
563     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
564         """Packs small blocks together before uploading"""
565         self._pending_write_size += closed_file_size
566
567         # Check if there are enough small blocks for filling up one in full
568         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
569
570             # Search blocks ready for getting packed together before being committed to Keep.
571             # A WRITABLE block always has an owner.
572             # A WRITABLE block with its owner.closed() implies that it's
573             # size is <= KEEP_BLOCK_SIZE/2.
574             small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
575
576             if len(small_blocks) <= 1:
577                 # Not enough small blocks for repacking
578                 return
579
580             # Update the pending write size count with its true value, just in case
581             # some small file was opened, written and closed several times.
582             self._pending_write_size = sum([b.size() for b in small_blocks])
583             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
584                 return
585
586             new_bb = self._alloc_bufferblock()
587             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
588                 bb = small_blocks.pop(0)
589                 arvfile = bb.owner
590                 self._pending_write_size -= bb.size()
591                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
592                 arvfile.set_segments([Range(new_bb.blockid,
593                                             0,
594                                             bb.size(),
595                                             new_bb.write_pointer - bb.size())])
596                 self._delete_bufferblock(bb.blockid)
597             self.commit_bufferblock(new_bb, sync=sync)
598
599     def commit_bufferblock(self, block, sync):
600         """Initiate a background upload of a bufferblock.
601
602         :block:
603           The block object to upload
604
605         :sync:
606           If `sync` is True, upload the block synchronously.
607           If `sync` is False, upload the block asynchronously.  This will
608           return immediately unless the upload queue is at capacity, in
609           which case it will wait on an upload queue slot.
610
611         """
612         try:
613             # Mark the block as PENDING so to disallow any more appends.
614             block.set_state(_BufferBlock.PENDING)
615         except StateChangeError as e:
616             if e.state == _BufferBlock.PENDING:
617                 if sync:
618                     block.wait_for_commit.wait()
619                 else:
620                     return
621             if block.state() == _BufferBlock.COMMITTED:
622                 return
623             elif block.state() == _BufferBlock.ERROR:
624                 raise block.error
625             else:
626                 raise
627
628         if sync:
629             try:
630                 if self.copies is None:
631                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
632                 else:
633                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
634                 block.set_state(_BufferBlock.COMMITTED, loc)
635             except Exception as e:
636                 block.set_state(_BufferBlock.ERROR, e)
637                 raise
638         else:
639             self.start_put_threads()
640             self._put_queue.put(block)
641
642     @synchronized
643     def get_bufferblock(self, locator):
644         return self._bufferblocks.get(locator)
645
646     @synchronized
647     def delete_bufferblock(self, locator):
648         self._delete_bufferblock(locator)
649
650     def _delete_bufferblock(self, locator):
651         bb = self._bufferblocks[locator]
652         bb.clear()
653         del self._bufferblocks[locator]
654
655     def get_block_contents(self, locator, num_retries, cache_only=False):
656         """Fetch a block.
657
658         First checks to see if the locator is a BufferBlock and return that, if
659         not, passes the request through to KeepClient.get().
660
661         """
662         with self.lock:
663             if locator in self._bufferblocks:
664                 bufferblock = self._bufferblocks[locator]
665                 if bufferblock.state() != _BufferBlock.COMMITTED:
666                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
667                 else:
668                     locator = bufferblock._locator
669         if cache_only:
670             return self._keep.get_from_cache(locator)
671         else:
672             return self._keep.get(locator, num_retries=num_retries)
673
674     def commit_all(self):
675         """Commit all outstanding buffer blocks.
676
677         This is a synchronous call, and will not return until all buffer blocks
678         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
679
680         """
681         self.repack_small_blocks(force=True, sync=True)
682
683         with self.lock:
684             items = self._bufferblocks.items()
685
686         for k,v in items:
687             if v.state() != _BufferBlock.COMMITTED and v.owner:
688                 v.owner.flush(sync=False)
689
690         with self.lock:
691             if self._put_queue is not None:
692                 self._put_queue.join()
693
694                 err = []
695                 for k,v in items:
696                     if v.state() == _BufferBlock.ERROR:
697                         err.append((v.locator(), v.error))
698                 if err:
699                     raise KeepWriteError("Error writing some blocks", err, label="block")
700
701         for k,v in items:
702             # flush again with sync=True to remove committed bufferblocks from
703             # the segments.
704             if v.owner:
705                 v.owner.flush(sync=True)
706
707     def block_prefetch(self, locator):
708         """Initiate a background download of a block.
709
710         This assumes that the underlying KeepClient implements a block cache,
711         so repeated requests for the same block will not result in repeated
712         downloads (unless the block is evicted from the cache.)  This method
713         does not block.
714
715         """
716
717         if not self.prefetch_enabled:
718             return
719
720         if self._keep.get_from_cache(locator) is not None:
721             return
722
723         with self.lock:
724             if locator in self._bufferblocks:
725                 return
726
727         self.start_get_threads()
728         self._prefetch_queue.put(locator)
729
730
731 class ArvadosFile(object):
732     """Represent a file in a Collection.
733
734     ArvadosFile manages the underlying representation of a file in Keep as a
735     sequence of segments spanning a set of blocks, and implements random
736     read/write access.
737
738     This object may be accessed from multiple threads.
739
740     """
741
742     def __init__(self, parent, name, stream=[], segments=[]):
743         """
744         ArvadosFile constructor.
745
746         :stream:
747           a list of Range objects representing a block stream
748
749         :segments:
750           a list of Range objects representing segments
751         """
752         self.parent = parent
753         self.name = name
754         self._writers = set()
755         self._committed = False
756         self._segments = []
757         self.lock = parent.root_collection().lock
758         for s in segments:
759             self._add_segment(stream, s.locator, s.range_size)
760         self._current_bblock = None
761
762     def writable(self):
763         return self.parent.writable()
764
765     @synchronized
766     def permission_expired(self, as_of_dt=None):
767         """Returns True if any of the segment's locators is expired"""
768         for r in self._segments:
769             if KeepLocator(r.locator).permission_expired(as_of_dt):
770                 return True
771         return False
772
773     @synchronized
774     def segments(self):
775         return copy.copy(self._segments)
776
777     @synchronized
778     def clone(self, new_parent, new_name):
779         """Make a copy of this file."""
780         cp = ArvadosFile(new_parent, new_name)
781         cp.replace_contents(self)
782         return cp
783
784     @must_be_writable
785     @synchronized
786     def replace_contents(self, other):
787         """Replace segments of this file with segments from another `ArvadosFile` object."""
788
789         map_loc = {}
790         self._segments = []
791         for other_segment in other.segments():
792             new_loc = other_segment.locator
793             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
794                 if other_segment.locator not in map_loc:
795                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
796                     if bufferblock.state() != _BufferBlock.WRITABLE:
797                         map_loc[other_segment.locator] = bufferblock.locator()
798                     else:
799                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
800                 new_loc = map_loc[other_segment.locator]
801
802             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
803
804         self.set_committed(False)
805
806     def __eq__(self, other):
807         if other is self:
808             return True
809         if not isinstance(other, ArvadosFile):
810             return False
811
812         othersegs = other.segments()
813         with self.lock:
814             if len(self._segments) != len(othersegs):
815                 return False
816             for i in xrange(0, len(othersegs)):
817                 seg1 = self._segments[i]
818                 seg2 = othersegs[i]
819                 loc1 = seg1.locator
820                 loc2 = seg2.locator
821
822                 if self.parent._my_block_manager().is_bufferblock(loc1):
823                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
824
825                 if other.parent._my_block_manager().is_bufferblock(loc2):
826                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
827
828                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
829                     seg1.range_start != seg2.range_start or
830                     seg1.range_size != seg2.range_size or
831                     seg1.segment_offset != seg2.segment_offset):
832                     return False
833
834         return True
835
836     def __ne__(self, other):
837         return not self.__eq__(other)
838
839     @synchronized
840     def set_segments(self, segs):
841         self._segments = segs
842
843     @synchronized
844     def set_committed(self, value=True):
845         """Set committed flag.
846
847         If value is True, set committed to be True.
848
849         If value is False, set committed to be False for this and all parents.
850         """
851         if value == self._committed:
852             return
853         self._committed = value
854         if self._committed is False and self.parent is not None:
855             self.parent.set_committed(False)
856
857     @synchronized
858     def committed(self):
859         """Get whether this is committed or not."""
860         return self._committed
861
862     @synchronized
863     def add_writer(self, writer):
864         """Add an ArvadosFileWriter reference to the list of writers"""
865         if isinstance(writer, ArvadosFileWriter):
866             self._writers.add(writer)
867
868     @synchronized
869     def remove_writer(self, writer, flush):
870         """
871         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
872         and do some block maintenance tasks.
873         """
874         self._writers.remove(writer)
875
876         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
877             # File writer closed, not small enough for repacking
878             self.flush()
879         elif self.closed():
880             # All writers closed and size is adequate for repacking
881             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
882
883     def closed(self):
884         """
885         Get whether this is closed or not. When the writers list is empty, the file
886         is supposed to be closed.
887         """
888         return len(self._writers) == 0
889
890     @must_be_writable
891     @synchronized
892     def truncate(self, size):
893         """Shrink the size of the file.
894
895         If `size` is less than the size of the file, the file contents after
896         `size` will be discarded.  If `size` is greater than the current size
897         of the file, an IOError will be raised.
898
899         """
900         if size < self.size():
901             new_segs = []
902             for r in self._segments:
903                 range_end = r.range_start+r.range_size
904                 if r.range_start >= size:
905                     # segment is past the trucate size, all done
906                     break
907                 elif size < range_end:
908                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
909                     nr.segment_offset = r.segment_offset
910                     new_segs.append(nr)
911                     break
912                 else:
913                     new_segs.append(r)
914
915             self._segments = new_segs
916             self.set_committed(False)
917         elif size > self.size():
918             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
919
920     def readfrom(self, offset, size, num_retries, exact=False):
921         """Read up to `size` bytes from the file starting at `offset`.
922
923         :exact:
924          If False (default), return less data than requested if the read
925          crosses a block boundary and the next block isn't cached.  If True,
926          only return less data than requested when hitting EOF.
927         """
928
929         with self.lock:
930             if size == 0 or offset >= self.size():
931                 return ''
932             readsegs = locators_and_ranges(self._segments, offset, size)
933             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
934
935         locs = set()
936         data = []
937         for lr in readsegs:
938             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
939             if block:
940                 blockview = memoryview(block)
941                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
942                 locs.add(lr.locator)
943             else:
944                 break
945
946         for lr in prefetch:
947             if lr.locator not in locs:
948                 self.parent._my_block_manager().block_prefetch(lr.locator)
949                 locs.add(lr.locator)
950
951         return ''.join(data)
952
953     def _repack_writes(self, num_retries):
954         """Test if the buffer block has more data than actual segments.
955
956         This happens when a buffered write over-writes a file range written in
957         a previous buffered write.  Re-pack the buffer block for efficiency
958         and to avoid leaking information.
959
960         """
961         segs = self._segments
962
963         # Sum up the segments to get the total bytes of the file referencing
964         # into the buffer block.
965         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
966         write_total = sum([s.range_size for s in bufferblock_segs])
967
968         if write_total < self._current_bblock.size():
969             # There is more data in the buffer block than is actually accounted for by segments, so
970             # re-pack into a new buffer by copying over to a new buffer block.
971             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
972             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
973             for t in bufferblock_segs:
974                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
975                 t.segment_offset = new_bb.size() - t.range_size
976
977             self._current_bblock = new_bb
978
979     @must_be_writable
980     @synchronized
981     def writeto(self, offset, data, num_retries):
982         """Write `data` to the file starting at `offset`.
983
984         This will update existing bytes and/or extend the size of the file as
985         necessary.
986
987         """
988         if len(data) == 0:
989             return
990
991         if offset > self.size():
992             raise ArgumentError("Offset is past the end of the file")
993
994         if len(data) > config.KEEP_BLOCK_SIZE:
995             # Chunk it up into smaller writes
996             n = 0
997             dataview = memoryview(data)
998             while n < len(data):
999                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1000                 n += config.KEEP_BLOCK_SIZE
1001             return
1002
1003         self.set_committed(False)
1004
1005         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1006             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1007
1008         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1009             self._repack_writes(num_retries)
1010             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1011                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1012                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1013
1014         self._current_bblock.append(data)
1015
1016         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1017
1018         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1019
1020         return len(data)
1021
1022     @synchronized
1023     def flush(self, sync=True, num_retries=0):
1024         """Flush the current bufferblock to Keep.
1025
1026         :sync:
1027           If True, commit block synchronously, wait until buffer block has been written.
1028           If False, commit block asynchronously, return immediately after putting block into
1029           the keep put queue.
1030         """
1031         if self.committed():
1032             return
1033
1034         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1035             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1036                 self._repack_writes(num_retries)
1037             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1038
1039         if sync:
1040             to_delete = set()
1041             for s in self._segments:
1042                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1043                 if bb:
1044                     if bb.state() != _BufferBlock.COMMITTED:
1045                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1046                     to_delete.add(s.locator)
1047                     s.locator = bb.locator()
1048             for s in to_delete:
1049                self.parent._my_block_manager().delete_bufferblock(s)
1050
1051         self.parent.notify(MOD, self.parent, self.name, (self, self))
1052
1053     @must_be_writable
1054     @synchronized
1055     def add_segment(self, blocks, pos, size):
1056         """Add a segment to the end of the file.
1057
1058         `pos` and `offset` reference a section of the stream described by
1059         `blocks` (a list of Range objects)
1060
1061         """
1062         self._add_segment(blocks, pos, size)
1063
1064     def _add_segment(self, blocks, pos, size):
1065         """Internal implementation of add_segment."""
1066         self.set_committed(False)
1067         for lr in locators_and_ranges(blocks, pos, size):
1068             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1069             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1070             self._segments.append(r)
1071
1072     @synchronized
1073     def size(self):
1074         """Get the file size."""
1075         if self._segments:
1076             n = self._segments[-1]
1077             return n.range_start + n.range_size
1078         else:
1079             return 0
1080
1081     @synchronized
1082     def manifest_text(self, stream_name=".", portable_locators=False,
1083                       normalize=False, only_committed=False):
1084         buf = ""
1085         filestream = []
1086         for segment in self.segments:
1087             loc = segment.locator
1088             if self.parent._my_block_manager().is_bufferblock(loc):
1089                 if only_committed:
1090                     continue
1091                 loc = self._bufferblocks[loc].calculate_locator()
1092             if portable_locators:
1093                 loc = KeepLocator(loc).stripped()
1094             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1095                                  segment.segment_offset, segment.range_size))
1096         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1097         buf += "\n"
1098         return buf
1099
1100     @must_be_writable
1101     @synchronized
1102     def _reparent(self, newparent, newname):
1103         self.set_committed(False)
1104         self.flush(sync=True)
1105         self.parent.remove(self.name)
1106         self.parent = newparent
1107         self.name = newname
1108         self.lock = self.parent.root_collection().lock
1109
1110
1111 class ArvadosFileReader(ArvadosFileReaderBase):
1112     """Wraps ArvadosFile in a file-like object supporting reading only.
1113
1114     Be aware that this class is NOT thread safe as there is no locking around
1115     updating file pointer.
1116
1117     """
1118
1119     def __init__(self, arvadosfile, num_retries=None):
1120         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1121         self.arvadosfile = arvadosfile
1122
1123     def size(self):
1124         return self.arvadosfile.size()
1125
1126     def stream_name(self):
1127         return self.arvadosfile.parent.stream_name()
1128
1129     @_FileLikeObjectBase._before_close
1130     @retry_method
1131     def read(self, size=None, num_retries=None):
1132         """Read up to `size` bytes from the file and return the result.
1133
1134         Starts at the current file position.  If `size` is None, read the
1135         entire remainder of the file.
1136         """
1137         if size is None:
1138             data = []
1139             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1140             while rd:
1141                 data.append(rd)
1142                 self._filepos += len(rd)
1143                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1144             return ''.join(data)
1145         else:
1146             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1147             self._filepos += len(data)
1148             return data
1149
1150     @_FileLikeObjectBase._before_close
1151     @retry_method
1152     def readfrom(self, offset, size, num_retries=None):
1153         """Read up to `size` bytes from the stream, starting at the specified file offset.
1154
1155         This method does not change the file position.
1156         """
1157         return self.arvadosfile.readfrom(offset, size, num_retries)
1158
1159     def flush(self):
1160         pass
1161
1162
1163 class ArvadosFileWriter(ArvadosFileReader):
1164     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1165
1166     Be aware that this class is NOT thread safe as there is no locking around
1167     updating file pointer.
1168
1169     """
1170
1171     def __init__(self, arvadosfile, mode, num_retries=None):
1172         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1173         self.mode = mode
1174         self.arvadosfile.add_writer(self)
1175
1176     @_FileLikeObjectBase._before_close
1177     @retry_method
1178     def write(self, data, num_retries=None):
1179         if self.mode[0] == "a":
1180             self.arvadosfile.writeto(self.size(), data, num_retries)
1181         else:
1182             self.arvadosfile.writeto(self._filepos, data, num_retries)
1183             self._filepos += len(data)
1184         return len(data)
1185
1186     @_FileLikeObjectBase._before_close
1187     @retry_method
1188     def writelines(self, seq, num_retries=None):
1189         for s in seq:
1190             self.write(s, num_retries=num_retries)
1191
1192     @_FileLikeObjectBase._before_close
1193     def truncate(self, size=None):
1194         if size is None:
1195             size = self._filepos
1196         self.arvadosfile.truncate(size)
1197         if self._filepos > self.size():
1198             self._filepos = self.size()
1199
1200     @_FileLikeObjectBase._before_close
1201     def flush(self):
1202         self.arvadosfile.flush()
1203
1204     def close(self, flush=True):
1205         if not self.closed:
1206             self.arvadosfile.remove_writer(self, flush)
1207             super(ArvadosFileWriter, self).close()