Merge branch 'master' into 9998-unsigned_manifest
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41 class _FileLikeObjectBase(object):
42     def __init__(self, name, mode):
43         self.name = name
44         self.mode = mode
45         self.closed = False
46
47     @staticmethod
48     def _before_close(orig_func):
49         @functools.wraps(orig_func)
50         def before_close_wrapper(self, *args, **kwargs):
51             if self.closed:
52                 raise ValueError("I/O operation on closed stream file")
53             return orig_func(self, *args, **kwargs)
54         return before_close_wrapper
55
56     def __enter__(self):
57         return self
58
59     def __exit__(self, exc_type, exc_value, traceback):
60         try:
61             self.close()
62         except Exception:
63             if exc_type is None:
64                 raise
65
66     def close(self):
67         self.closed = True
68
69
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71     def __init__(self, name, mode, num_retries=None):
72         super(ArvadosFileReaderBase, self).__init__(name, mode)
73         self._filepos = 0L
74         self.num_retries = num_retries
75         self._readline_cache = (None, None)
76
77     def __iter__(self):
78         while True:
79             data = self.readline()
80             if not data:
81                 break
82             yield data
83
84     def decompressed_name(self):
85         return re.sub('\.(bz2|gz)$', '', self.name)
86
87     @_FileLikeObjectBase._before_close
88     def seek(self, pos, whence=os.SEEK_SET):
89         if whence == os.SEEK_CUR:
90             pos += self._filepos
91         elif whence == os.SEEK_END:
92             pos += self.size()
93         self._filepos = min(max(pos, 0L), self.size())
94
95     def tell(self):
96         return self._filepos
97
98     @_FileLikeObjectBase._before_close
99     @retry_method
100     def readall(self, size=2**20, num_retries=None):
101         while True:
102             data = self.read(size, num_retries=num_retries)
103             if data == '':
104                 break
105             yield data
106
107     @_FileLikeObjectBase._before_close
108     @retry_method
109     def readline(self, size=float('inf'), num_retries=None):
110         cache_pos, cache_data = self._readline_cache
111         if self.tell() == cache_pos:
112             data = [cache_data]
113             self._filepos += len(cache_data)
114         else:
115             data = ['']
116         data_size = len(data[-1])
117         while (data_size < size) and ('\n' not in data[-1]):
118             next_read = self.read(2 ** 20, num_retries=num_retries)
119             if not next_read:
120                 break
121             data.append(next_read)
122             data_size += len(next_read)
123         data = ''.join(data)
124         try:
125             nextline_index = data.index('\n') + 1
126         except ValueError:
127             nextline_index = len(data)
128         nextline_index = min(nextline_index, size)
129         self._filepos -= len(data) - nextline_index
130         self._readline_cache = (self.tell(), data[nextline_index:])
131         return data[:nextline_index]
132
133     @_FileLikeObjectBase._before_close
134     @retry_method
135     def decompress(self, decompress, size, num_retries=None):
136         for segment in self.readall(size, num_retries=num_retries):
137             data = decompress(segment)
138             if data:
139                 yield data
140
141     @_FileLikeObjectBase._before_close
142     @retry_method
143     def readall_decompressed(self, size=2**20, num_retries=None):
144         self.seek(0)
145         if self.name.endswith('.bz2'):
146             dc = bz2.BZ2Decompressor()
147             return self.decompress(dc.decompress, size,
148                                    num_retries=num_retries)
149         elif self.name.endswith('.gz'):
150             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152                                    size, num_retries=num_retries)
153         else:
154             return self.readall(size, num_retries=num_retries)
155
156     @_FileLikeObjectBase._before_close
157     @retry_method
158     def readlines(self, sizehint=float('inf'), num_retries=None):
159         data = []
160         data_size = 0
161         for s in self.readall(num_retries=num_retries):
162             data.append(s)
163             data_size += len(s)
164             if data_size >= sizehint:
165                 break
166         return ''.join(data).splitlines(True)
167
168     def size(self):
169         raise NotImplementedError()
170
171     def read(self, size, num_retries=None):
172         raise NotImplementedError()
173
174     def readfrom(self, start, size, num_retries=None):
175         raise NotImplementedError()
176
177
178 class StreamFileReader(ArvadosFileReaderBase):
179     class _NameAttribute(str):
180         # The Python file API provides a plain .name attribute.
181         # Older SDK provided a name() method.
182         # This class provides both, for maximum compatibility.
183         def __call__(self):
184             return self
185
186     def __init__(self, stream, segments, name):
187         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188         self._stream = stream
189         self.segments = segments
190
191     def stream_name(self):
192         return self._stream.name()
193
194     def size(self):
195         n = self.segments[-1]
196         return n.range_start + n.range_size
197
198     @_FileLikeObjectBase._before_close
199     @retry_method
200     def read(self, size, num_retries=None):
201         """Read up to 'size' bytes from the stream, starting at the current file position"""
202         if size == 0:
203             return ''
204
205         data = ''
206         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
207         if available_chunks:
208             lr = available_chunks[0]
209             data = self._stream.readfrom(lr.locator+lr.segment_offset,
210                                           lr.segment_size,
211                                           num_retries=num_retries)
212
213         self._filepos += len(data)
214         return data
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def readfrom(self, start, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at 'start'"""
220         if size == 0:
221             return ''
222
223         data = []
224         for lr in locators_and_ranges(self.segments, start, size):
225             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226                                               num_retries=num_retries))
227         return ''.join(data)
228
229     def as_manifest(self):
230         segs = []
231         for r in self.segments:
232             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234
235
236 def synchronized(orig_func):
237     @functools.wraps(orig_func)
238     def synchronized_wrapper(self, *args, **kwargs):
239         with self.lock:
240             return orig_func(self, *args, **kwargs)
241     return synchronized_wrapper
242
243
244 class StateChangeError(Exception):
245     def __init__(self, message, state, nextstate):
246         super(StateChangeError, self).__init__(message)
247         self.state = state
248         self.nextstate = nextstate
249
250 class _BufferBlock(object):
251     """A stand-in for a Keep block that is in the process of being written.
252
253     Writers can append to it, get the size, and compute the Keep locator.
254     There are three valid states:
255
256     WRITABLE
257       Can append to block.
258
259     PENDING
260       Block is in the process of being uploaded to Keep, append is an error.
261
262     COMMITTED
263       The block has been written to Keep, its internal buffer has been
264       released, fetching the block will fetch it via keep client (since we
265       discarded the internal copy), and identifiers referring to the BufferBlock
266       can be replaced with the block locator.
267
268     """
269
270     WRITABLE = 0
271     PENDING = 1
272     COMMITTED = 2
273     ERROR = 3
274
275     def __init__(self, blockid, starting_capacity, owner):
276         """
277         :blockid:
278           the identifier for this block
279
280         :starting_capacity:
281           the initial buffer capacity
282
283         :owner:
284           ArvadosFile that owns this block
285
286         """
287         self.blockid = blockid
288         self.buffer_block = bytearray(starting_capacity)
289         self.buffer_view = memoryview(self.buffer_block)
290         self.write_pointer = 0
291         self._state = _BufferBlock.WRITABLE
292         self._locator = None
293         self.owner = owner
294         self.lock = threading.Lock()
295         self.wait_for_commit = threading.Event()
296         self.error = None
297
298     @synchronized
299     def append(self, data):
300         """Append some data to the buffer.
301
302         Only valid if the block is in WRITABLE state.  Implements an expanding
303         buffer, doubling capacity as needed to accomdate all the data.
304
305         """
306         if self._state == _BufferBlock.WRITABLE:
307             while (self.write_pointer+len(data)) > len(self.buffer_block):
308                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310                 self.buffer_block = new_buffer_block
311                 self.buffer_view = memoryview(self.buffer_block)
312             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313             self.write_pointer += len(data)
314             self._locator = None
315         else:
316             raise AssertionError("Buffer block is not writable")
317
318     STATE_TRANSITIONS = frozenset([
319             (WRITABLE, PENDING),
320             (PENDING, COMMITTED),
321             (PENDING, ERROR),
322             (ERROR, PENDING)])
323
324     @synchronized
325     def set_state(self, nextstate, val=None):
326         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328         self._state = nextstate
329
330         if self._state == _BufferBlock.PENDING:
331             self.wait_for_commit.clear()
332
333         if self._state == _BufferBlock.COMMITTED:
334             self._locator = val
335             self.buffer_view = None
336             self.buffer_block = None
337             self.wait_for_commit.set()
338
339         if self._state == _BufferBlock.ERROR:
340             self.error = val
341             self.wait_for_commit.set()
342
343     @synchronized
344     def state(self):
345         return self._state
346
347     def size(self):
348         """The amount of data written to the buffer."""
349         return self.write_pointer
350
351     @synchronized
352     def locator(self):
353         """The Keep locator for this buffer's contents."""
354         if self._locator is None:
355             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
356         return self._locator
357
358     @synchronized
359     def clone(self, new_blockid, owner):
360         if self._state == _BufferBlock.COMMITTED:
361             raise AssertionError("Cannot duplicate committed buffer block")
362         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363         bufferblock.append(self.buffer_view[0:self.size()])
364         return bufferblock
365
366     @synchronized
367     def clear(self):
368         self.owner = None
369         self.buffer_block = None
370         self.buffer_view = None
371
372
373 class NoopLock(object):
374     def __enter__(self):
375         return self
376
377     def __exit__(self, exc_type, exc_value, traceback):
378         pass
379
380     def acquire(self, blocking=False):
381         pass
382
383     def release(self):
384         pass
385
386
387 def must_be_writable(orig_func):
388     @functools.wraps(orig_func)
389     def must_be_writable_wrapper(self, *args, **kwargs):
390         if not self.writable():
391             raise IOError(errno.EROFS, "Collection is read-only.")
392         return orig_func(self, *args, **kwargs)
393     return must_be_writable_wrapper
394
395
396 class _BlockManager(object):
397     """BlockManager handles buffer blocks.
398
399     Also handles background block uploads, and background block prefetch for a
400     Collection of ArvadosFiles.
401
402     """
403
404     DEFAULT_PUT_THREADS = 2
405     DEFAULT_GET_THREADS = 2
406
407     def __init__(self, keep, copies=None):
408         """keep: KeepClient object to use"""
409         self._keep = keep
410         self._bufferblocks = collections.OrderedDict()
411         self._put_queue = None
412         self._put_threads = None
413         self._prefetch_queue = None
414         self._prefetch_threads = None
415         self.lock = threading.Lock()
416         self.prefetch_enabled = True
417         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
418         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
419         self.copies = copies
420         self._pending_write_size = 0
421         self.threads_lock = threading.Lock()
422
423     @synchronized
424     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
425         """Allocate a new, empty bufferblock in WRITABLE state and return it.
426
427         :blockid:
428           optional block identifier, otherwise one will be automatically assigned
429
430         :starting_capacity:
431           optional capacity, otherwise will use default capacity
432
433         :owner:
434           ArvadosFile that owns this block
435
436         """
437         return self._alloc_bufferblock(blockid, starting_capacity, owner)
438
439     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
440         if blockid is None:
441             blockid = "%s" % uuid.uuid4()
442         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
443         self._bufferblocks[bufferblock.blockid] = bufferblock
444         return bufferblock
445
446     @synchronized
447     def dup_block(self, block, owner):
448         """Create a new bufferblock initialized with the content of an existing bufferblock.
449
450         :block:
451           the buffer block to copy.
452
453         :owner:
454           ArvadosFile that owns the new block
455
456         """
457         new_blockid = "bufferblock%i" % len(self._bufferblocks)
458         bufferblock = block.clone(new_blockid, owner)
459         self._bufferblocks[bufferblock.blockid] = bufferblock
460         return bufferblock
461
462     @synchronized
463     def is_bufferblock(self, locator):
464         return locator in self._bufferblocks
465
466     def _commit_bufferblock_worker(self):
467         """Background uploader thread."""
468
469         while True:
470             try:
471                 bufferblock = self._put_queue.get()
472                 if bufferblock is None:
473                     return
474
475                 if self.copies is None:
476                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
477                 else:
478                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
479                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
480
481             except Exception as e:
482                 bufferblock.set_state(_BufferBlock.ERROR, e)
483             finally:
484                 if self._put_queue is not None:
485                     self._put_queue.task_done()
486
487     def start_put_threads(self):
488         with self.threads_lock:
489             if self._put_threads is None:
490                 # Start uploader threads.
491
492                 # If we don't limit the Queue size, the upload queue can quickly
493                 # grow to take up gigabytes of RAM if the writing process is
494                 # generating data more quickly than it can be send to the Keep
495                 # servers.
496                 #
497                 # With two upload threads and a queue size of 2, this means up to 4
498                 # blocks pending.  If they are full 64 MiB blocks, that means up to
499                 # 256 MiB of internal buffering, which is the same size as the
500                 # default download block cache in KeepClient.
501                 self._put_queue = Queue.Queue(maxsize=2)
502
503                 self._put_threads = []
504                 for i in xrange(0, self.num_put_threads):
505                     thread = threading.Thread(target=self._commit_bufferblock_worker)
506                     self._put_threads.append(thread)
507                     thread.daemon = True
508                     thread.start()
509
510     def _block_prefetch_worker(self):
511         """The background downloader thread."""
512         while True:
513             try:
514                 b = self._prefetch_queue.get()
515                 if b is None:
516                     return
517                 self._keep.get(b)
518             except Exception:
519                 _logger.exception("Exception doing block prefetch")
520
521     @synchronized
522     def start_get_threads(self):
523         if self._prefetch_threads is None:
524             self._prefetch_queue = Queue.Queue()
525             self._prefetch_threads = []
526             for i in xrange(0, self.num_get_threads):
527                 thread = threading.Thread(target=self._block_prefetch_worker)
528                 self._prefetch_threads.append(thread)
529                 thread.daemon = True
530                 thread.start()
531
532
533     @synchronized
534     def stop_threads(self):
535         """Shut down and wait for background upload and download threads to finish."""
536
537         if self._put_threads is not None:
538             for t in self._put_threads:
539                 self._put_queue.put(None)
540             for t in self._put_threads:
541                 t.join()
542         self._put_threads = None
543         self._put_queue = None
544
545         if self._prefetch_threads is not None:
546             for t in self._prefetch_threads:
547                 self._prefetch_queue.put(None)
548             for t in self._prefetch_threads:
549                 t.join()
550         self._prefetch_threads = None
551         self._prefetch_queue = None
552
553     def __enter__(self):
554         return self
555
556     def __exit__(self, exc_type, exc_value, traceback):
557         self.stop_threads()
558
559     @synchronized
560     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
561         """Packs small blocks together before uploading"""
562         self._pending_write_size += closed_file_size
563
564         # Check if there are enough small blocks for filling up one in full
565         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
566
567             # Search blocks ready for getting packed together before being committed to Keep.
568             # A WRITABLE block always has an owner.
569             # A WRITABLE block with its owner.closed() implies that it's
570             # size is <= KEEP_BLOCK_SIZE/2.
571             small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
572
573             if len(small_blocks) <= 1:
574                 # Not enough small blocks for repacking
575                 return
576
577             # Update the pending write size count with its true value, just in case
578             # some small file was opened, written and closed several times.
579             self._pending_write_size = sum([b.size() for b in small_blocks])
580             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
581                 return
582
583             new_bb = self._alloc_bufferblock()
584             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
585                 bb = small_blocks.pop(0)
586                 arvfile = bb.owner
587                 self._pending_write_size -= bb.size()
588                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
589                 arvfile.set_segments([Range(new_bb.blockid,
590                                             0,
591                                             bb.size(),
592                                             new_bb.write_pointer - bb.size())])
593                 self._delete_bufferblock(bb.blockid)
594             self.commit_bufferblock(new_bb, sync=sync)
595
596     def commit_bufferblock(self, block, sync):
597         """Initiate a background upload of a bufferblock.
598
599         :block:
600           The block object to upload
601
602         :sync:
603           If `sync` is True, upload the block synchronously.
604           If `sync` is False, upload the block asynchronously.  This will
605           return immediately unless the upload queue is at capacity, in
606           which case it will wait on an upload queue slot.
607
608         """
609         try:
610             # Mark the block as PENDING so to disallow any more appends.
611             block.set_state(_BufferBlock.PENDING)
612         except StateChangeError as e:
613             if e.state == _BufferBlock.PENDING:
614                 if sync:
615                     block.wait_for_commit.wait()
616                 else:
617                     return
618             if block.state() == _BufferBlock.COMMITTED:
619                 return
620             elif block.state() == _BufferBlock.ERROR:
621                 raise block.error
622             else:
623                 raise
624
625         if sync:
626             try:
627                 if self.copies is None:
628                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
629                 else:
630                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
631                 block.set_state(_BufferBlock.COMMITTED, loc)
632             except Exception as e:
633                 block.set_state(_BufferBlock.ERROR, e)
634                 raise
635         else:
636             self.start_put_threads()
637             self._put_queue.put(block)
638
639     @synchronized
640     def get_bufferblock(self, locator):
641         return self._bufferblocks.get(locator)
642
643     @synchronized
644     def delete_bufferblock(self, locator):
645         self._delete_bufferblock(locator)
646
647     def _delete_bufferblock(self, locator):
648         bb = self._bufferblocks[locator]
649         bb.clear()
650         del self._bufferblocks[locator]
651
652     def get_block_contents(self, locator, num_retries, cache_only=False):
653         """Fetch a block.
654
655         First checks to see if the locator is a BufferBlock and return that, if
656         not, passes the request through to KeepClient.get().
657
658         """
659         with self.lock:
660             if locator in self._bufferblocks:
661                 bufferblock = self._bufferblocks[locator]
662                 if bufferblock.state() != _BufferBlock.COMMITTED:
663                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
664                 else:
665                     locator = bufferblock._locator
666         if cache_only:
667             return self._keep.get_from_cache(locator)
668         else:
669             return self._keep.get(locator, num_retries=num_retries)
670
671     def commit_all(self):
672         """Commit all outstanding buffer blocks.
673
674         This is a synchronous call, and will not return until all buffer blocks
675         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
676
677         """
678         self.repack_small_blocks(force=True, sync=True)
679
680         with self.lock:
681             items = self._bufferblocks.items()
682
683         for k,v in items:
684             if v.state() != _BufferBlock.COMMITTED and v.owner:
685                 v.owner.flush(sync=False)
686
687         with self.lock:
688             if self._put_queue is not None:
689                 self._put_queue.join()
690
691                 err = []
692                 for k,v in items:
693                     if v.state() == _BufferBlock.ERROR:
694                         err.append((v.locator(), v.error))
695                 if err:
696                     raise KeepWriteError("Error writing some blocks", err, label="block")
697
698         for k,v in items:
699             # flush again with sync=True to remove committed bufferblocks from
700             # the segments.
701             if v.owner:
702                 v.owner.flush(sync=True)
703
704     def block_prefetch(self, locator):
705         """Initiate a background download of a block.
706
707         This assumes that the underlying KeepClient implements a block cache,
708         so repeated requests for the same block will not result in repeated
709         downloads (unless the block is evicted from the cache.)  This method
710         does not block.
711
712         """
713
714         if not self.prefetch_enabled:
715             return
716
717         if self._keep.get_from_cache(locator) is not None:
718             return
719
720         with self.lock:
721             if locator in self._bufferblocks:
722                 return
723
724         self.start_get_threads()
725         self._prefetch_queue.put(locator)
726
727
728 class ArvadosFile(object):
729     """Represent a file in a Collection.
730
731     ArvadosFile manages the underlying representation of a file in Keep as a
732     sequence of segments spanning a set of blocks, and implements random
733     read/write access.
734
735     This object may be accessed from multiple threads.
736
737     """
738
739     def __init__(self, parent, name, stream=[], segments=[]):
740         """
741         ArvadosFile constructor.
742
743         :stream:
744           a list of Range objects representing a block stream
745
746         :segments:
747           a list of Range objects representing segments
748         """
749         self.parent = parent
750         self.name = name
751         self._writers = set()
752         self._committed = False
753         self._segments = []
754         self.lock = parent.root_collection().lock
755         for s in segments:
756             self._add_segment(stream, s.locator, s.range_size)
757         self._current_bblock = None
758
759     def writable(self):
760         return self.parent.writable()
761
762     @synchronized
763     def segments(self):
764         return copy.copy(self._segments)
765
766     @synchronized
767     def clone(self, new_parent, new_name):
768         """Make a copy of this file."""
769         cp = ArvadosFile(new_parent, new_name)
770         cp.replace_contents(self)
771         return cp
772
773     @must_be_writable
774     @synchronized
775     def replace_contents(self, other):
776         """Replace segments of this file with segments from another `ArvadosFile` object."""
777
778         map_loc = {}
779         self._segments = []
780         for other_segment in other.segments():
781             new_loc = other_segment.locator
782             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
783                 if other_segment.locator not in map_loc:
784                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
785                     if bufferblock.state() != _BufferBlock.WRITABLE:
786                         map_loc[other_segment.locator] = bufferblock.locator()
787                     else:
788                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
789                 new_loc = map_loc[other_segment.locator]
790
791             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
792
793         self._committed = False
794
795     def __eq__(self, other):
796         if other is self:
797             return True
798         if not isinstance(other, ArvadosFile):
799             return False
800
801         othersegs = other.segments()
802         with self.lock:
803             if len(self._segments) != len(othersegs):
804                 return False
805             for i in xrange(0, len(othersegs)):
806                 seg1 = self._segments[i]
807                 seg2 = othersegs[i]
808                 loc1 = seg1.locator
809                 loc2 = seg2.locator
810
811                 if self.parent._my_block_manager().is_bufferblock(loc1):
812                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
813
814                 if other.parent._my_block_manager().is_bufferblock(loc2):
815                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
816
817                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
818                     seg1.range_start != seg2.range_start or
819                     seg1.range_size != seg2.range_size or
820                     seg1.segment_offset != seg2.segment_offset):
821                     return False
822
823         return True
824
825     def __ne__(self, other):
826         return not self.__eq__(other)
827
828     @synchronized
829     def set_segments(self, segs):
830         self._segments = segs
831
832     @synchronized
833     def set_committed(self):
834         """Set committed flag to True"""
835         self._committed = True
836
837     @synchronized
838     def committed(self):
839         """Get whether this is committed or not."""
840         return self._committed
841
842     @synchronized
843     def add_writer(self, writer):
844         """Add an ArvadosFileWriter reference to the list of writers"""
845         if isinstance(writer, ArvadosFileWriter):
846             self._writers.add(writer)
847
848     @synchronized
849     def remove_writer(self, writer, flush):
850         """
851         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
852         and do some block maintenance tasks.
853         """
854         self._writers.remove(writer)
855
856         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
857             # File writer closed, not small enough for repacking
858             self.flush()
859         elif self.closed():
860             # All writers closed and size is adequate for repacking
861             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
862
863     def closed(self):
864         """
865         Get whether this is closed or not. When the writers list is empty, the file
866         is supposed to be closed.
867         """
868         return len(self._writers) == 0
869
870     @must_be_writable
871     @synchronized
872     def truncate(self, size):
873         """Shrink the size of the file.
874
875         If `size` is less than the size of the file, the file contents after
876         `size` will be discarded.  If `size` is greater than the current size
877         of the file, an IOError will be raised.
878
879         """
880         if size < self.size():
881             new_segs = []
882             for r in self._segments:
883                 range_end = r.range_start+r.range_size
884                 if r.range_start >= size:
885                     # segment is past the trucate size, all done
886                     break
887                 elif size < range_end:
888                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
889                     nr.segment_offset = r.segment_offset
890                     new_segs.append(nr)
891                     break
892                 else:
893                     new_segs.append(r)
894
895             self._segments = new_segs
896             self._committed = False
897         elif size > self.size():
898             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
899
900     def readfrom(self, offset, size, num_retries, exact=False):
901         """Read up to `size` bytes from the file starting at `offset`.
902
903         :exact:
904          If False (default), return less data than requested if the read
905          crosses a block boundary and the next block isn't cached.  If True,
906          only return less data than requested when hitting EOF.
907         """
908
909         with self.lock:
910             if size == 0 or offset >= self.size():
911                 return ''
912             readsegs = locators_and_ranges(self._segments, offset, size)
913             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
914
915         locs = set()
916         data = []
917         for lr in readsegs:
918             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
919             if block:
920                 blockview = memoryview(block)
921                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
922                 locs.add(lr.locator)
923             else:
924                 break
925
926         for lr in prefetch:
927             if lr.locator not in locs:
928                 self.parent._my_block_manager().block_prefetch(lr.locator)
929                 locs.add(lr.locator)
930
931         return ''.join(data)
932
933     def _repack_writes(self, num_retries):
934         """Test if the buffer block has more data than actual segments.
935
936         This happens when a buffered write over-writes a file range written in
937         a previous buffered write.  Re-pack the buffer block for efficiency
938         and to avoid leaking information.
939
940         """
941         segs = self._segments
942
943         # Sum up the segments to get the total bytes of the file referencing
944         # into the buffer block.
945         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
946         write_total = sum([s.range_size for s in bufferblock_segs])
947
948         if write_total < self._current_bblock.size():
949             # There is more data in the buffer block than is actually accounted for by segments, so
950             # re-pack into a new buffer by copying over to a new buffer block.
951             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
952             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
953             for t in bufferblock_segs:
954                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
955                 t.segment_offset = new_bb.size() - t.range_size
956
957             self._current_bblock = new_bb
958
959     @must_be_writable
960     @synchronized
961     def writeto(self, offset, data, num_retries):
962         """Write `data` to the file starting at `offset`.
963
964         This will update existing bytes and/or extend the size of the file as
965         necessary.
966
967         """
968         if len(data) == 0:
969             return
970
971         if offset > self.size():
972             raise ArgumentError("Offset is past the end of the file")
973
974         if len(data) > config.KEEP_BLOCK_SIZE:
975             # Chunk it up into smaller writes
976             n = 0
977             dataview = memoryview(data)
978             while n < len(data):
979                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
980                 n += config.KEEP_BLOCK_SIZE
981             return
982
983         self._committed = False
984
985         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
986             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
987
988         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
989             self._repack_writes(num_retries)
990             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
991                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
992                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
993
994         self._current_bblock.append(data)
995
996         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
997
998         self.parent.notify(WRITE, self.parent, self.name, (self, self))
999
1000         return len(data)
1001
1002     @synchronized
1003     def flush(self, sync=True, num_retries=0):
1004         """Flush the current bufferblock to Keep.
1005
1006         :sync:
1007           If True, commit block synchronously, wait until buffer block has been written.
1008           If False, commit block asynchronously, return immediately after putting block into
1009           the keep put queue.
1010         """
1011         if self.committed():
1012             return
1013
1014         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1015             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1016                 self._repack_writes(num_retries)
1017             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1018
1019         if sync:
1020             to_delete = set()
1021             for s in self._segments:
1022                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1023                 if bb:
1024                     if bb.state() != _BufferBlock.COMMITTED:
1025                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1026                     to_delete.add(s.locator)
1027                     s.locator = bb.locator()
1028             for s in to_delete:
1029                self.parent._my_block_manager().delete_bufferblock(s)
1030
1031         self.parent.notify(MOD, self.parent, self.name, (self, self))
1032
1033     @must_be_writable
1034     @synchronized
1035     def add_segment(self, blocks, pos, size):
1036         """Add a segment to the end of the file.
1037
1038         `pos` and `offset` reference a section of the stream described by
1039         `blocks` (a list of Range objects)
1040
1041         """
1042         self._add_segment(blocks, pos, size)
1043
1044     def _add_segment(self, blocks, pos, size):
1045         """Internal implementation of add_segment."""
1046         self._committed = False
1047         for lr in locators_and_ranges(blocks, pos, size):
1048             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1049             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1050             self._segments.append(r)
1051
1052     @synchronized
1053     def size(self):
1054         """Get the file size."""
1055         if self._segments:
1056             n = self._segments[-1]
1057             return n.range_start + n.range_size
1058         else:
1059             return 0
1060
1061     @synchronized
1062     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1063         buf = ""
1064         filestream = []
1065         for segment in self.segments:
1066             loc = segment.locator
1067             if loc.startswith("bufferblock"):
1068                 loc = self._bufferblocks[loc].calculate_locator()
1069             if portable_locators:
1070                 loc = KeepLocator(loc).stripped()
1071             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1072                                  segment.segment_offset, segment.range_size))
1073         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1074         buf += "\n"
1075         return buf
1076
1077     @must_be_writable
1078     @synchronized
1079     def _reparent(self, newparent, newname):
1080         self._committed = False
1081         self.flush(sync=True)
1082         self.parent.remove(self.name)
1083         self.parent = newparent
1084         self.name = newname
1085         self.lock = self.parent.root_collection().lock
1086
1087
1088 class ArvadosFileReader(ArvadosFileReaderBase):
1089     """Wraps ArvadosFile in a file-like object supporting reading only.
1090
1091     Be aware that this class is NOT thread safe as there is no locking around
1092     updating file pointer.
1093
1094     """
1095
1096     def __init__(self, arvadosfile, num_retries=None):
1097         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1098         self.arvadosfile = arvadosfile
1099
1100     def size(self):
1101         return self.arvadosfile.size()
1102
1103     def stream_name(self):
1104         return self.arvadosfile.parent.stream_name()
1105
1106     @_FileLikeObjectBase._before_close
1107     @retry_method
1108     def read(self, size=None, num_retries=None):
1109         """Read up to `size` bytes from the file and return the result.
1110
1111         Starts at the current file position.  If `size` is None, read the
1112         entire remainder of the file.
1113         """
1114         if size is None:
1115             data = []
1116             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1117             while rd:
1118                 data.append(rd)
1119                 self._filepos += len(rd)
1120                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1121             return ''.join(data)
1122         else:
1123             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1124             self._filepos += len(data)
1125             return data
1126
1127     @_FileLikeObjectBase._before_close
1128     @retry_method
1129     def readfrom(self, offset, size, num_retries=None):
1130         """Read up to `size` bytes from the stream, starting at the specified file offset.
1131
1132         This method does not change the file position.
1133         """
1134         return self.arvadosfile.readfrom(offset, size, num_retries)
1135
1136     def flush(self):
1137         pass
1138
1139
1140 class ArvadosFileWriter(ArvadosFileReader):
1141     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1142
1143     Be aware that this class is NOT thread safe as there is no locking around
1144     updating file pointer.
1145
1146     """
1147
1148     def __init__(self, arvadosfile, mode, num_retries=None):
1149         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1150         self.mode = mode
1151         self.arvadosfile.add_writer(self)
1152
1153     @_FileLikeObjectBase._before_close
1154     @retry_method
1155     def write(self, data, num_retries=None):
1156         if self.mode[0] == "a":
1157             self.arvadosfile.writeto(self.size(), data, num_retries)
1158         else:
1159             self.arvadosfile.writeto(self._filepos, data, num_retries)
1160             self._filepos += len(data)
1161         return len(data)
1162
1163     @_FileLikeObjectBase._before_close
1164     @retry_method
1165     def writelines(self, seq, num_retries=None):
1166         for s in seq:
1167             self.write(s, num_retries=num_retries)
1168
1169     @_FileLikeObjectBase._before_close
1170     def truncate(self, size=None):
1171         if size is None:
1172             size = self._filepos
1173         self.arvadosfile.truncate(size)
1174         if self._filepos > self.size():
1175             self._filepos = self.size()
1176
1177     @_FileLikeObjectBase._before_close
1178     def flush(self):
1179         self.arvadosfile.flush()
1180
1181     def close(self, flush=True):
1182         if not self.closed:
1183             self.arvadosfile.remove_writer(self, flush)
1184             super(ArvadosFileWriter, self).close()