e00eeae47e3890ee7307fb6d7aa9b8e0e87b331e
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import traceback
9 import Queue
10 import copy
11 import errno
12 import re
13 import logging
14 import collections
15 import uuid
16
17 from .errors import KeepWriteError, AssertionError, ArgumentError
18 from .keep import KeepLocator
19 from ._normalize_stream import normalize_stream
20 from ._ranges import locators_and_ranges, replace_range, Range
21 from .retry import retry_method
22
23 MOD = "mod"
24 WRITE = "write"
25
26 _logger = logging.getLogger('arvados.arvfile')
27
28 def split(path):
29     """split(path) -> streamname, filename
30
31     Separate the stream name and file name in a /-separated stream path and
32     return a tuple (stream_name, file_name).  If no stream name is available,
33     assume '.'.
34
35     """
36     try:
37         stream_name, file_name = path.rsplit('/', 1)
38     except ValueError:  # No / in string
39         stream_name, file_name = '.', path
40     return stream_name, file_name
41
42 class _FileLikeObjectBase(object):
43     def __init__(self, name, mode):
44         self.name = name
45         self.mode = mode
46         self.closed = False
47
48     @staticmethod
49     def _before_close(orig_func):
50         @functools.wraps(orig_func)
51         def before_close_wrapper(self, *args, **kwargs):
52             if self.closed:
53                 raise ValueError("I/O operation on closed stream file")
54             return orig_func(self, *args, **kwargs)
55         return before_close_wrapper
56
57     def __enter__(self):
58         return self
59
60     def __exit__(self, exc_type, exc_value, traceback):
61         try:
62             self.close()
63         except Exception:
64             if exc_type is None:
65                 raise
66
67     def close(self):
68         self.closed = True
69
70
71 class ArvadosFileReaderBase(_FileLikeObjectBase):
72     def __init__(self, name, mode, num_retries=None):
73         super(ArvadosFileReaderBase, self).__init__(name, mode)
74         self._filepos = 0L
75         self.num_retries = num_retries
76         self._readline_cache = (None, None)
77
78     def __iter__(self):
79         while True:
80             data = self.readline()
81             if not data:
82                 break
83             yield data
84
85     def decompressed_name(self):
86         return re.sub('\.(bz2|gz)$', '', self.name)
87
88     @_FileLikeObjectBase._before_close
89     def seek(self, pos, whence=os.SEEK_SET):
90         if whence == os.SEEK_CUR:
91             pos += self._filepos
92         elif whence == os.SEEK_END:
93             pos += self.size()
94         self._filepos = min(max(pos, 0L), self.size())
95
96     def tell(self):
97         return self._filepos
98
99     @_FileLikeObjectBase._before_close
100     @retry_method
101     def readall(self, size=2**20, num_retries=None):
102         while True:
103             data = self.read(size, num_retries=num_retries)
104             if data == '':
105                 break
106             yield data
107
108     @_FileLikeObjectBase._before_close
109     @retry_method
110     def readline(self, size=float('inf'), num_retries=None):
111         cache_pos, cache_data = self._readline_cache
112         if self.tell() == cache_pos:
113             data = [cache_data]
114             self._filepos += len(cache_data)
115         else:
116             data = ['']
117         data_size = len(data[-1])
118         while (data_size < size) and ('\n' not in data[-1]):
119             next_read = self.read(2 ** 20, num_retries=num_retries)
120             if not next_read:
121                 break
122             data.append(next_read)
123             data_size += len(next_read)
124         data = ''.join(data)
125         try:
126             nextline_index = data.index('\n') + 1
127         except ValueError:
128             nextline_index = len(data)
129         nextline_index = min(nextline_index, size)
130         self._filepos -= len(data) - nextline_index
131         self._readline_cache = (self.tell(), data[nextline_index:])
132         return data[:nextline_index]
133
134     @_FileLikeObjectBase._before_close
135     @retry_method
136     def decompress(self, decompress, size, num_retries=None):
137         for segment in self.readall(size, num_retries=num_retries):
138             data = decompress(segment)
139             if data:
140                 yield data
141
142     @_FileLikeObjectBase._before_close
143     @retry_method
144     def readall_decompressed(self, size=2**20, num_retries=None):
145         self.seek(0)
146         if self.name.endswith('.bz2'):
147             dc = bz2.BZ2Decompressor()
148             return self.decompress(dc.decompress, size,
149                                    num_retries=num_retries)
150         elif self.name.endswith('.gz'):
151             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
152             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
153                                    size, num_retries=num_retries)
154         else:
155             return self.readall(size, num_retries=num_retries)
156
157     @_FileLikeObjectBase._before_close
158     @retry_method
159     def readlines(self, sizehint=float('inf'), num_retries=None):
160         data = []
161         data_size = 0
162         for s in self.readall(num_retries=num_retries):
163             data.append(s)
164             data_size += len(s)
165             if data_size >= sizehint:
166                 break
167         return ''.join(data).splitlines(True)
168
169     def size(self):
170         raise NotImplementedError()
171
172     def read(self, size, num_retries=None):
173         raise NotImplementedError()
174
175     def readfrom(self, start, size, num_retries=None):
176         raise NotImplementedError()
177
178
179 class StreamFileReader(ArvadosFileReaderBase):
180     class _NameAttribute(str):
181         # The Python file API provides a plain .name attribute.
182         # Older SDK provided a name() method.
183         # This class provides both, for maximum compatibility.
184         def __call__(self):
185             return self
186
187     def __init__(self, stream, segments, name):
188         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
189         self._stream = stream
190         self.segments = segments
191
192     def stream_name(self):
193         return self._stream.name()
194
195     def size(self):
196         n = self.segments[-1]
197         return n.range_start + n.range_size
198
199     @_FileLikeObjectBase._before_close
200     @retry_method
201     def read(self, size, num_retries=None):
202         """Read up to 'size' bytes from the stream, starting at the current file position"""
203         if size == 0:
204             return ''
205
206         data = ''
207         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
208         if available_chunks:
209             lr = available_chunks[0]
210             data = self._stream.readfrom(lr.locator+lr.segment_offset,
211                                           lr.segment_size,
212                                           num_retries=num_retries)
213
214         self._filepos += len(data)
215         return data
216
217     @_FileLikeObjectBase._before_close
218     @retry_method
219     def readfrom(self, start, size, num_retries=None):
220         """Read up to 'size' bytes from the stream, starting at 'start'"""
221         if size == 0:
222             return ''
223
224         data = []
225         for lr in locators_and_ranges(self.segments, start, size):
226             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
227                                               num_retries=num_retries))
228         return ''.join(data)
229
230     def as_manifest(self):
231         segs = []
232         for r in self.segments:
233             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
234         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
235
236
237 def synchronized(orig_func):
238     @functools.wraps(orig_func)
239     def synchronized_wrapper(self, *args, **kwargs):
240         with self.lock:
241             return orig_func(self, *args, **kwargs)
242     return synchronized_wrapper
243
244
245 class StateChangeError(Exception):
246     def __init__(self, message, state, nextstate):
247         super(StateChangeError, self).__init__(message)
248         self.state = state
249         self.nextstate = nextstate
250
251 class _BufferBlock(object):
252     """A stand-in for a Keep block that is in the process of being written.
253
254     Writers can append to it, get the size, and compute the Keep locator.
255     There are three valid states:
256
257     WRITABLE
258       Can append to block.
259
260     PENDING
261       Block is in the process of being uploaded to Keep, append is an error.
262
263     COMMITTED
264       The block has been written to Keep, its internal buffer has been
265       released, fetching the block will fetch it via keep client (since we
266       discarded the internal copy), and identifiers referring to the BufferBlock
267       can be replaced with the block locator.
268
269     """
270
271     WRITABLE = 0
272     PENDING = 1
273     COMMITTED = 2
274     ERROR = 3
275
276     def __init__(self, blockid, starting_capacity, owner):
277         """
278         :blockid:
279           the identifier for this block
280
281         :starting_capacity:
282           the initial buffer capacity
283
284         :owner:
285           ArvadosFile that owns this block
286
287         """
288         self.blockid = blockid
289         self.buffer_block = bytearray(starting_capacity)
290         self.buffer_view = memoryview(self.buffer_block)
291         self.write_pointer = 0
292         self._state = _BufferBlock.WRITABLE
293         self._locator = None
294         self.owner = owner
295         self.lock = threading.Lock()
296         self.wait_for_commit = threading.Event()
297         self.error = None
298
299     @synchronized
300     def append(self, data):
301         """Append some data to the buffer.
302
303         Only valid if the block is in WRITABLE state.  Implements an expanding
304         buffer, doubling capacity as needed to accomdate all the data.
305
306         """
307         if self._state == _BufferBlock.WRITABLE:
308             while (self.write_pointer+len(data)) > len(self.buffer_block):
309                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
310                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
311                 self.buffer_block = new_buffer_block
312                 self.buffer_view = memoryview(self.buffer_block)
313             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
314             self.write_pointer += len(data)
315             self._locator = None
316         else:
317             raise AssertionError("Buffer block is not writable")
318
319     STATE_TRANSITIONS = frozenset([
320             (WRITABLE, PENDING),
321             (PENDING, COMMITTED),
322             (PENDING, ERROR),
323             (ERROR, PENDING)])
324
325     @synchronized
326     def set_state(self, nextstate, val=None):
327         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
328             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
329         self._state = nextstate
330
331         if self._state == _BufferBlock.PENDING:
332             self.wait_for_commit.clear()
333
334         if self._state == _BufferBlock.COMMITTED:
335             self._locator = val
336             self.buffer_view = None
337             self.buffer_block = None
338             self.wait_for_commit.set()
339
340         if self._state == _BufferBlock.ERROR:
341             self.error = val
342             self.wait_for_commit.set()
343
344     @synchronized
345     def state(self):
346         return self._state
347
348     def size(self):
349         """The amount of data written to the buffer."""
350         return self.write_pointer
351
352     @synchronized
353     def locator(self):
354         """The Keep locator for this buffer's contents."""
355         if self._locator is None:
356             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
357         return self._locator
358
359     @synchronized
360     def clone(self, new_blockid, owner):
361         if self._state == _BufferBlock.COMMITTED:
362             raise AssertionError("Cannot duplicate committed buffer block")
363         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
364         bufferblock.append(self.buffer_view[0:self.size()])
365         return bufferblock
366
367     @synchronized
368     def clear(self):
369         self.owner = None
370         self.buffer_block = None
371         self.buffer_view = None
372
373
374 class NoopLock(object):
375     def __enter__(self):
376         return self
377
378     def __exit__(self, exc_type, exc_value, traceback):
379         pass
380
381     def acquire(self, blocking=False):
382         pass
383
384     def release(self):
385         pass
386
387
388 def must_be_writable(orig_func):
389     @functools.wraps(orig_func)
390     def must_be_writable_wrapper(self, *args, **kwargs):
391         if not self.writable():
392             raise IOError(errno.EROFS, "Collection is read-only.")
393         return orig_func(self, *args, **kwargs)
394     return must_be_writable_wrapper
395
396
397 class _BlockManager(object):
398     """BlockManager handles buffer blocks.
399
400     Also handles background block uploads, and background block prefetch for a
401     Collection of ArvadosFiles.
402
403     """
404
405     DEFAULT_PUT_THREADS = 2
406     DEFAULT_GET_THREADS = 2
407
408     def __init__(self, keep, copies=None):
409         """keep: KeepClient object to use"""
410         self._keep = keep
411         self._bufferblocks = collections.OrderedDict()
412         self._put_queue = None
413         self._put_threads = None
414         self._prefetch_queue = None
415         self._prefetch_threads = None
416         self.lock = threading.Lock()
417         self.prefetch_enabled = True
418         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
419         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
420         self.copies = copies
421         self._pending_write_size = 0
422         self.threads_lock = threading.Lock()
423
424     @synchronized
425     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
426         """Allocate a new, empty bufferblock in WRITABLE state and return it.
427
428         :blockid:
429           optional block identifier, otherwise one will be automatically assigned
430
431         :starting_capacity:
432           optional capacity, otherwise will use default capacity
433
434         :owner:
435           ArvadosFile that owns this block
436
437         """
438         return self._alloc_bufferblock(blockid, starting_capacity, owner)
439
440     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
441         if blockid is None:
442             blockid = "%s" % uuid.uuid4()
443         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
444         self._bufferblocks[bufferblock.blockid] = bufferblock
445         return bufferblock
446
447     @synchronized
448     def dup_block(self, block, owner):
449         """Create a new bufferblock initialized with the content of an existing bufferblock.
450
451         :block:
452           the buffer block to copy.
453
454         :owner:
455           ArvadosFile that owns the new block
456
457         """
458         new_blockid = "bufferblock%i" % len(self._bufferblocks)
459         bufferblock = block.clone(new_blockid, owner)
460         self._bufferblocks[bufferblock.blockid] = bufferblock
461         return bufferblock
462
463     @synchronized
464     def is_bufferblock(self, locator):
465         return locator in self._bufferblocks
466
467     def _commit_bufferblock_worker(self):
468         """Background uploader thread."""
469
470         while True:
471             try:
472                 bufferblock = self._put_queue.get()
473                 if bufferblock is None:
474                     return
475
476                 if self.copies is None:
477                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
478                 else:
479                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
480                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
481
482             except Exception as e:
483                 bufferblock.set_state(_BufferBlock.ERROR, e)
484             finally:
485                 if self._put_queue is not None:
486                     self._put_queue.task_done()
487
488     def start_put_threads(self):
489         with self.threads_lock:
490             if self._put_threads is None:
491                 # Start uploader threads.
492
493                 # If we don't limit the Queue size, the upload queue can quickly
494                 # grow to take up gigabytes of RAM if the writing process is
495                 # generating data more quickly than it can be send to the Keep
496                 # servers.
497                 #
498                 # With two upload threads and a queue size of 2, this means up to 4
499                 # blocks pending.  If they are full 64 MiB blocks, that means up to
500                 # 256 MiB of internal buffering, which is the same size as the
501                 # default download block cache in KeepClient.
502                 self._put_queue = Queue.Queue(maxsize=2)
503
504                 self._put_threads = []
505                 for i in xrange(0, self.num_put_threads):
506                     thread = threading.Thread(target=self._commit_bufferblock_worker)
507                     self._put_threads.append(thread)
508                     thread.daemon = True
509                     thread.start()
510
511     def _block_prefetch_worker(self):
512         """The background downloader thread."""
513         while True:
514             try:
515                 b = self._prefetch_queue.get()
516                 if b is None:
517                     return
518                 self._keep.get(b)
519             except Exception:
520                 _logger.error(traceback.format_exc())
521
522     @synchronized
523     def start_get_threads(self):
524         if self._prefetch_threads is None:
525             self._prefetch_queue = Queue.Queue()
526             self._prefetch_threads = []
527             for i in xrange(0, self.num_get_threads):
528                 thread = threading.Thread(target=self._block_prefetch_worker)
529                 self._prefetch_threads.append(thread)
530                 thread.daemon = True
531                 thread.start()
532
533
534     @synchronized
535     def stop_threads(self):
536         """Shut down and wait for background upload and download threads to finish."""
537
538         if self._put_threads is not None:
539             for t in self._put_threads:
540                 self._put_queue.put(None)
541             for t in self._put_threads:
542                 t.join()
543         self._put_threads = None
544         self._put_queue = None
545
546         if self._prefetch_threads is not None:
547             for t in self._prefetch_threads:
548                 self._prefetch_queue.put(None)
549             for t in self._prefetch_threads:
550                 t.join()
551         self._prefetch_threads = None
552         self._prefetch_queue = None
553
554     def __enter__(self):
555         return self
556
557     def __exit__(self, exc_type, exc_value, traceback):
558         self.stop_threads()
559
560     @synchronized
561     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
562         """Packs small blocks together before uploading"""
563         self._pending_write_size += closed_file_size
564
565         # Check if there are enough small blocks for filling up one in full
566         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
567
568             # Search blocks ready for getting packed together before being committed to Keep.
569             # A WRITABLE block always has an owner.
570             # A WRITABLE block with its owner.closed() implies that it's
571             # size is <= KEEP_BLOCK_SIZE/2.
572             small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
573
574             if len(small_blocks) <= 1:
575                 # Not enough small blocks for repacking
576                 return
577
578             # Update the pending write size count with its true value, just in case
579             # some small file was opened, written and closed several times.
580             self._pending_write_size = sum([b.size() for b in small_blocks])
581             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
582                 return
583
584             new_bb = self._alloc_bufferblock()
585             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
586                 bb = small_blocks.pop(0)
587                 arvfile = bb.owner
588                 self._pending_write_size -= bb.size()
589                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
590                 arvfile.set_segments([Range(new_bb.blockid,
591                                             0,
592                                             bb.size(),
593                                             new_bb.write_pointer - bb.size())])
594                 self._delete_bufferblock(bb.blockid)
595             self.commit_bufferblock(new_bb, sync=sync)
596
597     def commit_bufferblock(self, block, sync):
598         """Initiate a background upload of a bufferblock.
599
600         :block:
601           The block object to upload
602
603         :sync:
604           If `sync` is True, upload the block synchronously.
605           If `sync` is False, upload the block asynchronously.  This will
606           return immediately unless the upload queue is at capacity, in
607           which case it will wait on an upload queue slot.
608
609         """
610         try:
611             # Mark the block as PENDING so to disallow any more appends.
612             block.set_state(_BufferBlock.PENDING)
613         except StateChangeError as e:
614             if e.state == _BufferBlock.PENDING:
615                 if sync:
616                     block.wait_for_commit.wait()
617                 else:
618                     return
619             if block.state() == _BufferBlock.COMMITTED:
620                 return
621             elif block.state() == _BufferBlock.ERROR:
622                 raise block.error
623             else:
624                 raise
625
626         if sync:
627             try:
628                 if self.copies is None:
629                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
630                 else:
631                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
632                 block.set_state(_BufferBlock.COMMITTED, loc)
633             except Exception as e:
634                 block.set_state(_BufferBlock.ERROR, e)
635                 raise
636         else:
637             self.start_put_threads()
638             self._put_queue.put(block)
639
640     @synchronized
641     def get_bufferblock(self, locator):
642         return self._bufferblocks.get(locator)
643
644     @synchronized
645     def delete_bufferblock(self, locator):
646         self._delete_bufferblock(locator)
647
648     def _delete_bufferblock(self, locator):
649         bb = self._bufferblocks[locator]
650         bb.clear()
651         del self._bufferblocks[locator]
652
653     def get_block_contents(self, locator, num_retries, cache_only=False):
654         """Fetch a block.
655
656         First checks to see if the locator is a BufferBlock and return that, if
657         not, passes the request through to KeepClient.get().
658
659         """
660         with self.lock:
661             if locator in self._bufferblocks:
662                 bufferblock = self._bufferblocks[locator]
663                 if bufferblock.state() != _BufferBlock.COMMITTED:
664                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
665                 else:
666                     locator = bufferblock._locator
667         if cache_only:
668             return self._keep.get_from_cache(locator)
669         else:
670             return self._keep.get(locator, num_retries=num_retries)
671
672     def commit_all(self):
673         """Commit all outstanding buffer blocks.
674
675         This is a synchronous call, and will not return until all buffer blocks
676         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
677
678         """
679         self.repack_small_blocks(force=True, sync=True)
680
681         with self.lock:
682             items = self._bufferblocks.items()
683
684         for k,v in items:
685             if v.state() != _BufferBlock.COMMITTED and v.owner:
686                 v.owner.flush(sync=False)
687
688         with self.lock:
689             if self._put_queue is not None:
690                 self._put_queue.join()
691
692                 err = []
693                 for k,v in items:
694                     if v.state() == _BufferBlock.ERROR:
695                         err.append((v.locator(), v.error))
696                 if err:
697                     raise KeepWriteError("Error writing some blocks", err, label="block")
698
699         for k,v in items:
700             # flush again with sync=True to remove committed bufferblocks from
701             # the segments.
702             if v.owner:
703                 v.owner.flush(sync=True)
704
705     def block_prefetch(self, locator):
706         """Initiate a background download of a block.
707
708         This assumes that the underlying KeepClient implements a block cache,
709         so repeated requests for the same block will not result in repeated
710         downloads (unless the block is evicted from the cache.)  This method
711         does not block.
712
713         """
714
715         if not self.prefetch_enabled:
716             return
717
718         if self._keep.get_from_cache(locator) is not None:
719             return
720
721         with self.lock:
722             if locator in self._bufferblocks:
723                 return
724
725         self.start_get_threads()
726         self._prefetch_queue.put(locator)
727
728
729 class ArvadosFile(object):
730     """Represent a file in a Collection.
731
732     ArvadosFile manages the underlying representation of a file in Keep as a
733     sequence of segments spanning a set of blocks, and implements random
734     read/write access.
735
736     This object may be accessed from multiple threads.
737
738     """
739
740     def __init__(self, parent, name, stream=[], segments=[]):
741         """
742         ArvadosFile constructor.
743
744         :stream:
745           a list of Range objects representing a block stream
746
747         :segments:
748           a list of Range objects representing segments
749         """
750         self.parent = parent
751         self.name = name
752         self._writers = set()
753         self._committed = False
754         self._segments = []
755         self.lock = parent.root_collection().lock
756         for s in segments:
757             self._add_segment(stream, s.locator, s.range_size)
758         self._current_bblock = None
759
760     def writable(self):
761         return self.parent.writable()
762
763     @synchronized
764     def segments(self):
765         return copy.copy(self._segments)
766
767     @synchronized
768     def clone(self, new_parent, new_name):
769         """Make a copy of this file."""
770         cp = ArvadosFile(new_parent, new_name)
771         cp.replace_contents(self)
772         return cp
773
774     @must_be_writable
775     @synchronized
776     def replace_contents(self, other):
777         """Replace segments of this file with segments from another `ArvadosFile` object."""
778
779         map_loc = {}
780         self._segments = []
781         for other_segment in other.segments():
782             new_loc = other_segment.locator
783             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
784                 if other_segment.locator not in map_loc:
785                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
786                     if bufferblock.state() != _BufferBlock.WRITABLE:
787                         map_loc[other_segment.locator] = bufferblock.locator()
788                     else:
789                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
790                 new_loc = map_loc[other_segment.locator]
791
792             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
793
794         self._committed = False
795
796     def __eq__(self, other):
797         if other is self:
798             return True
799         if not isinstance(other, ArvadosFile):
800             return False
801
802         othersegs = other.segments()
803         with self.lock:
804             if len(self._segments) != len(othersegs):
805                 return False
806             for i in xrange(0, len(othersegs)):
807                 seg1 = self._segments[i]
808                 seg2 = othersegs[i]
809                 loc1 = seg1.locator
810                 loc2 = seg2.locator
811
812                 if self.parent._my_block_manager().is_bufferblock(loc1):
813                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
814
815                 if other.parent._my_block_manager().is_bufferblock(loc2):
816                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
817
818                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
819                     seg1.range_start != seg2.range_start or
820                     seg1.range_size != seg2.range_size or
821                     seg1.segment_offset != seg2.segment_offset):
822                     return False
823
824         return True
825
826     def __ne__(self, other):
827         return not self.__eq__(other)
828
829     @synchronized
830     def set_segments(self, segs):
831         self._segments = segs
832
833     @synchronized
834     def set_committed(self):
835         """Set committed flag to True"""
836         self._committed = True
837
838     @synchronized
839     def committed(self):
840         """Get whether this is committed or not."""
841         return self._committed
842
843     @synchronized
844     def add_writer(self, writer):
845         """Add an ArvadosFileWriter reference to the list of writers"""
846         if isinstance(writer, ArvadosFileWriter):
847             self._writers.add(writer)
848
849     @synchronized
850     def remove_writer(self, writer, flush):
851         """
852         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
853         and do some block maintenance tasks.
854         """
855         self._writers.remove(writer)
856
857         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
858             # File writer closed, not small enough for repacking
859             self.flush()
860         elif self.closed():
861             # All writers closed and size is adequate for repacking
862             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
863
864     def closed(self):
865         """
866         Get whether this is closed or not. When the writers list is empty, the file
867         is supposed to be closed.
868         """
869         return len(self._writers) == 0
870
871     @must_be_writable
872     @synchronized
873     def truncate(self, size):
874         """Shrink the size of the file.
875
876         If `size` is less than the size of the file, the file contents after
877         `size` will be discarded.  If `size` is greater than the current size
878         of the file, an IOError will be raised.
879
880         """
881         if size < self.size():
882             new_segs = []
883             for r in self._segments:
884                 range_end = r.range_start+r.range_size
885                 if r.range_start >= size:
886                     # segment is past the trucate size, all done
887                     break
888                 elif size < range_end:
889                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
890                     nr.segment_offset = r.segment_offset
891                     new_segs.append(nr)
892                     break
893                 else:
894                     new_segs.append(r)
895
896             self._segments = new_segs
897             self._committed = False
898         elif size > self.size():
899             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
900
901     def readfrom(self, offset, size, num_retries, exact=False):
902         """Read up to `size` bytes from the file starting at `offset`.
903
904         :exact:
905          If False (default), return less data than requested if the read
906          crosses a block boundary and the next block isn't cached.  If True,
907          only return less data than requested when hitting EOF.
908         """
909
910         with self.lock:
911             if size == 0 or offset >= self.size():
912                 return ''
913             readsegs = locators_and_ranges(self._segments, offset, size)
914             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
915
916         locs = set()
917         data = []
918         for lr in readsegs:
919             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
920             if block:
921                 blockview = memoryview(block)
922                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
923                 locs.add(lr.locator)
924             else:
925                 break
926
927         for lr in prefetch:
928             if lr.locator not in locs:
929                 self.parent._my_block_manager().block_prefetch(lr.locator)
930                 locs.add(lr.locator)
931
932         return ''.join(data)
933
934     def _repack_writes(self, num_retries):
935         """Test if the buffer block has more data than actual segments.
936
937         This happens when a buffered write over-writes a file range written in
938         a previous buffered write.  Re-pack the buffer block for efficiency
939         and to avoid leaking information.
940
941         """
942         segs = self._segments
943
944         # Sum up the segments to get the total bytes of the file referencing
945         # into the buffer block.
946         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
947         write_total = sum([s.range_size for s in bufferblock_segs])
948
949         if write_total < self._current_bblock.size():
950             # There is more data in the buffer block than is actually accounted for by segments, so
951             # re-pack into a new buffer by copying over to a new buffer block.
952             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
953             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
954             for t in bufferblock_segs:
955                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
956                 t.segment_offset = new_bb.size() - t.range_size
957
958             self._current_bblock = new_bb
959
960     @must_be_writable
961     @synchronized
962     def writeto(self, offset, data, num_retries):
963         """Write `data` to the file starting at `offset`.
964
965         This will update existing bytes and/or extend the size of the file as
966         necessary.
967
968         """
969         if len(data) == 0:
970             return
971
972         if offset > self.size():
973             raise ArgumentError("Offset is past the end of the file")
974
975         if len(data) > config.KEEP_BLOCK_SIZE:
976             # Chunk it up into smaller writes
977             n = 0
978             dataview = memoryview(data)
979             while n < len(data):
980                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
981                 n += config.KEEP_BLOCK_SIZE
982             return
983
984         self._committed = False
985
986         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
987             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
988
989         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
990             self._repack_writes(num_retries)
991             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
992                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
993                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
994
995         self._current_bblock.append(data)
996
997         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
998
999         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1000
1001         return len(data)
1002
1003     @synchronized
1004     def flush(self, sync=True, num_retries=0):
1005         """Flush the current bufferblock to Keep.
1006
1007         :sync:
1008           If True, commit block synchronously, wait until buffer block has been written.
1009           If False, commit block asynchronously, return immediately after putting block into
1010           the keep put queue.
1011         """
1012         if self.committed():
1013             return
1014
1015         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1016             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1017                 self._repack_writes(num_retries)
1018             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1019
1020         if sync:
1021             to_delete = set()
1022             for s in self._segments:
1023                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1024                 if bb:
1025                     if bb.state() != _BufferBlock.COMMITTED:
1026                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1027                     to_delete.add(s.locator)
1028                     s.locator = bb.locator()
1029             for s in to_delete:
1030                self.parent._my_block_manager().delete_bufferblock(s)
1031
1032         self.parent.notify(MOD, self.parent, self.name, (self, self))
1033
1034     @must_be_writable
1035     @synchronized
1036     def add_segment(self, blocks, pos, size):
1037         """Add a segment to the end of the file.
1038
1039         `pos` and `offset` reference a section of the stream described by
1040         `blocks` (a list of Range objects)
1041
1042         """
1043         self._add_segment(blocks, pos, size)
1044
1045     def _add_segment(self, blocks, pos, size):
1046         """Internal implementation of add_segment."""
1047         self._committed = False
1048         for lr in locators_and_ranges(blocks, pos, size):
1049             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1050             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1051             self._segments.append(r)
1052
1053     @synchronized
1054     def size(self):
1055         """Get the file size."""
1056         if self._segments:
1057             n = self._segments[-1]
1058             return n.range_start + n.range_size
1059         else:
1060             return 0
1061
1062     @synchronized
1063     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1064         buf = ""
1065         filestream = []
1066         for segment in self.segments:
1067             loc = segment.locator
1068             if loc.startswith("bufferblock"):
1069                 loc = self._bufferblocks[loc].calculate_locator()
1070             if portable_locators:
1071                 loc = KeepLocator(loc).stripped()
1072             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1073                                  segment.segment_offset, segment.range_size))
1074         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1075         buf += "\n"
1076         return buf
1077
1078     @must_be_writable
1079     @synchronized
1080     def _reparent(self, newparent, newname):
1081         self._committed = False
1082         self.flush(sync=True)
1083         self.parent.remove(self.name)
1084         self.parent = newparent
1085         self.name = newname
1086         self.lock = self.parent.root_collection().lock
1087
1088
1089 class ArvadosFileReader(ArvadosFileReaderBase):
1090     """Wraps ArvadosFile in a file-like object supporting reading only.
1091
1092     Be aware that this class is NOT thread safe as there is no locking around
1093     updating file pointer.
1094
1095     """
1096
1097     def __init__(self, arvadosfile, num_retries=None):
1098         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1099         self.arvadosfile = arvadosfile
1100
1101     def size(self):
1102         return self.arvadosfile.size()
1103
1104     def stream_name(self):
1105         return self.arvadosfile.parent.stream_name()
1106
1107     @_FileLikeObjectBase._before_close
1108     @retry_method
1109     def read(self, size=None, num_retries=None):
1110         """Read up to `size` bytes from the file and return the result.
1111
1112         Starts at the current file position.  If `size` is None, read the
1113         entire remainder of the file.
1114         """
1115         if size is None:
1116             data = []
1117             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1118             while rd:
1119                 data.append(rd)
1120                 self._filepos += len(rd)
1121                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1122             return ''.join(data)
1123         else:
1124             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1125             self._filepos += len(data)
1126             return data
1127
1128     @_FileLikeObjectBase._before_close
1129     @retry_method
1130     def readfrom(self, offset, size, num_retries=None):
1131         """Read up to `size` bytes from the stream, starting at the specified file offset.
1132
1133         This method does not change the file position.
1134         """
1135         return self.arvadosfile.readfrom(offset, size, num_retries)
1136
1137     def flush(self):
1138         pass
1139
1140
1141 class ArvadosFileWriter(ArvadosFileReader):
1142     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1143
1144     Be aware that this class is NOT thread safe as there is no locking around
1145     updating file pointer.
1146
1147     """
1148
1149     def __init__(self, arvadosfile, mode, num_retries=None):
1150         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1151         self.mode = mode
1152         self.arvadosfile.add_writer(self)
1153
1154     @_FileLikeObjectBase._before_close
1155     @retry_method
1156     def write(self, data, num_retries=None):
1157         if self.mode[0] == "a":
1158             self.arvadosfile.writeto(self.size(), data, num_retries)
1159         else:
1160             self.arvadosfile.writeto(self._filepos, data, num_retries)
1161             self._filepos += len(data)
1162         return len(data)
1163
1164     @_FileLikeObjectBase._before_close
1165     @retry_method
1166     def writelines(self, seq, num_retries=None):
1167         for s in seq:
1168             self.write(s, num_retries=num_retries)
1169
1170     @_FileLikeObjectBase._before_close
1171     def truncate(self, size=None):
1172         if size is None:
1173             size = self._filepos
1174         self.arvadosfile.truncate(size)
1175         if self._filepos > self.size():
1176             self._filepos = self.size()
1177
1178     @_FileLikeObjectBase._before_close
1179     def flush(self):
1180         self.arvadosfile.flush()
1181
1182     def close(self, flush=True):
1183         if not self.closed:
1184             self.arvadosfile.remove_writer(self, flush)
1185             super(ArvadosFileWriter, self).close()