9701: Changes on the Python SDK to allow small file packing on Collection class:
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13
14 from .errors import KeepWriteError, AssertionError, ArgumentError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
19
20 MOD = "mod"
21 WRITE = "write"
22
23 _logger = logging.getLogger('arvados.arvfile')
24
25 def split(path):
26     """split(path) -> streamname, filename
27
28     Separate the stream name and file name in a /-separated stream path and
29     return a tuple (stream_name, file_name).  If no stream name is available,
30     assume '.'.
31
32     """
33     try:
34         stream_name, file_name = path.rsplit('/', 1)
35     except ValueError:  # No / in string
36         stream_name, file_name = '.', path
37     return stream_name, file_name
38
39 class _FileLikeObjectBase(object):
40     def __init__(self, name, mode):
41         self.name = name
42         self.mode = mode
43         self.closed = False
44
45     @staticmethod
46     def _before_close(orig_func):
47         @functools.wraps(orig_func)
48         def before_close_wrapper(self, *args, **kwargs):
49             if self.closed:
50                 raise ValueError("I/O operation on closed stream file")
51             return orig_func(self, *args, **kwargs)
52         return before_close_wrapper
53
54     def __enter__(self):
55         return self
56
57     def __exit__(self, exc_type, exc_value, traceback):
58         try:
59             self.close()
60         except Exception:
61             if exc_type is None:
62                 raise
63
64     def close(self):
65         self.closed = True
66
67
68 class ArvadosFileReaderBase(_FileLikeObjectBase):
69     def __init__(self, name, mode, num_retries=None):
70         super(ArvadosFileReaderBase, self).__init__(name, mode)
71         self._filepos = 0L
72         self.num_retries = num_retries
73         self._readline_cache = (None, None)
74
75     def __iter__(self):
76         while True:
77             data = self.readline()
78             if not data:
79                 break
80             yield data
81
82     def decompressed_name(self):
83         return re.sub('\.(bz2|gz)$', '', self.name)
84
85     @_FileLikeObjectBase._before_close
86     def seek(self, pos, whence=os.SEEK_SET):
87         if whence == os.SEEK_CUR:
88             pos += self._filepos
89         elif whence == os.SEEK_END:
90             pos += self.size()
91         self._filepos = min(max(pos, 0L), self.size())
92
93     def tell(self):
94         return self._filepos
95
96     @_FileLikeObjectBase._before_close
97     @retry_method
98     def readall(self, size=2**20, num_retries=None):
99         while True:
100             data = self.read(size, num_retries=num_retries)
101             if data == '':
102                 break
103             yield data
104
105     @_FileLikeObjectBase._before_close
106     @retry_method
107     def readline(self, size=float('inf'), num_retries=None):
108         cache_pos, cache_data = self._readline_cache
109         if self.tell() == cache_pos:
110             data = [cache_data]
111             self._filepos += len(cache_data)
112         else:
113             data = ['']
114         data_size = len(data[-1])
115         while (data_size < size) and ('\n' not in data[-1]):
116             next_read = self.read(2 ** 20, num_retries=num_retries)
117             if not next_read:
118                 break
119             data.append(next_read)
120             data_size += len(next_read)
121         data = ''.join(data)
122         try:
123             nextline_index = data.index('\n') + 1
124         except ValueError:
125             nextline_index = len(data)
126         nextline_index = min(nextline_index, size)
127         self._filepos -= len(data) - nextline_index
128         self._readline_cache = (self.tell(), data[nextline_index:])
129         return data[:nextline_index]
130
131     @_FileLikeObjectBase._before_close
132     @retry_method
133     def decompress(self, decompress, size, num_retries=None):
134         for segment in self.readall(size, num_retries=num_retries):
135             data = decompress(segment)
136             if data:
137                 yield data
138
139     @_FileLikeObjectBase._before_close
140     @retry_method
141     def readall_decompressed(self, size=2**20, num_retries=None):
142         self.seek(0)
143         if self.name.endswith('.bz2'):
144             dc = bz2.BZ2Decompressor()
145             return self.decompress(dc.decompress, size,
146                                    num_retries=num_retries)
147         elif self.name.endswith('.gz'):
148             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
149             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
150                                    size, num_retries=num_retries)
151         else:
152             return self.readall(size, num_retries=num_retries)
153
154     @_FileLikeObjectBase._before_close
155     @retry_method
156     def readlines(self, sizehint=float('inf'), num_retries=None):
157         data = []
158         data_size = 0
159         for s in self.readall(num_retries=num_retries):
160             data.append(s)
161             data_size += len(s)
162             if data_size >= sizehint:
163                 break
164         return ''.join(data).splitlines(True)
165
166     def size(self):
167         raise NotImplementedError()
168
169     def read(self, size, num_retries=None):
170         raise NotImplementedError()
171
172     def readfrom(self, start, size, num_retries=None):
173         raise NotImplementedError()
174
175
176 class StreamFileReader(ArvadosFileReaderBase):
177     class _NameAttribute(str):
178         # The Python file API provides a plain .name attribute.
179         # Older SDK provided a name() method.
180         # This class provides both, for maximum compatibility.
181         def __call__(self):
182             return self
183
184     def __init__(self, stream, segments, name):
185         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
186         self._stream = stream
187         self.segments = segments
188
189     def stream_name(self):
190         return self._stream.name()
191
192     def size(self):
193         n = self.segments[-1]
194         return n.range_start + n.range_size
195
196     @_FileLikeObjectBase._before_close
197     @retry_method
198     def read(self, size, num_retries=None):
199         """Read up to 'size' bytes from the stream, starting at the current file position"""
200         if size == 0:
201             return ''
202
203         data = ''
204         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
205         if available_chunks:
206             lr = available_chunks[0]
207             data = self._stream.readfrom(lr.locator+lr.segment_offset,
208                                           lr.segment_size,
209                                           num_retries=num_retries)
210
211         self._filepos += len(data)
212         return data
213
214     @_FileLikeObjectBase._before_close
215     @retry_method
216     def readfrom(self, start, size, num_retries=None):
217         """Read up to 'size' bytes from the stream, starting at 'start'"""
218         if size == 0:
219             return ''
220
221         data = []
222         for lr in locators_and_ranges(self.segments, start, size):
223             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
224                                               num_retries=num_retries))
225         return ''.join(data)
226
227     def as_manifest(self):
228         segs = []
229         for r in self.segments:
230             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
231         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
232
233
234 def synchronized(orig_func):
235     @functools.wraps(orig_func)
236     def synchronized_wrapper(self, *args, **kwargs):
237         with self.lock:
238             return orig_func(self, *args, **kwargs)
239     return synchronized_wrapper
240
241
242 class StateChangeError(Exception):
243     def __init__(self, message, state, nextstate):
244         super(StateChangeError, self).__init__(message)
245         self.state = state
246         self.nextstate = nextstate
247
248 class _BufferBlock(object):
249     """A stand-in for a Keep block that is in the process of being written.
250
251     Writers can append to it, get the size, and compute the Keep locator.
252     There are three valid states:
253
254     WRITABLE
255       Can append to block.
256
257     PENDING
258       Block is in the process of being uploaded to Keep, append is an error.
259
260     COMMITTED
261       The block has been written to Keep, its internal buffer has been
262       released, fetching the block will fetch it via keep client (since we
263       discarded the internal copy), and identifiers referring to the BufferBlock
264       can be replaced with the block locator.
265
266     """
267
268     WRITABLE = 0
269     PENDING = 1
270     COMMITTED = 2
271     ERROR = 3
272
273     def __init__(self, blockid, starting_capacity, owner):
274         """
275         :blockid:
276           the identifier for this block
277
278         :starting_capacity:
279           the initial buffer capacity
280
281         :owner:
282           ArvadosFile that owns this block
283
284         """
285         self.blockid = blockid
286         self.buffer_block = bytearray(starting_capacity)
287         self.buffer_view = memoryview(self.buffer_block)
288         self.write_pointer = 0
289         self._state = _BufferBlock.WRITABLE
290         self._locator = None
291         self.owner = owner
292         self.lock = threading.Lock()
293         self.wait_for_commit = threading.Event()
294         self.error = None
295
296     @synchronized
297     def append(self, data):
298         """Append some data to the buffer.
299
300         Only valid if the block is in WRITABLE state.  Implements an expanding
301         buffer, doubling capacity as needed to accomdate all the data.
302
303         """
304         if self._state == _BufferBlock.WRITABLE:
305             while (self.write_pointer+len(data)) > len(self.buffer_block):
306                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
307                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
308                 self.buffer_block = new_buffer_block
309                 self.buffer_view = memoryview(self.buffer_block)
310             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
311             self.write_pointer += len(data)
312             self._locator = None
313         else:
314             raise AssertionError("Buffer block is not writable")
315
316     STATE_TRANSITIONS = frozenset([
317             (WRITABLE, PENDING),
318             (PENDING, COMMITTED),
319             (PENDING, ERROR),
320             (ERROR, PENDING)])
321
322     @synchronized
323     def set_state(self, nextstate, val=None):
324         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
325             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
326         self._state = nextstate
327
328         if self._state == _BufferBlock.PENDING:
329             self.wait_for_commit.clear()
330
331         if self._state == _BufferBlock.COMMITTED:
332             self._locator = val
333             self.buffer_view = None
334             self.buffer_block = None
335             self.wait_for_commit.set()
336
337         if self._state == _BufferBlock.ERROR:
338             self.error = val
339             self.wait_for_commit.set()
340
341     @synchronized
342     def state(self):
343         return self._state
344
345     def size(self):
346         """The amount of data written to the buffer."""
347         return self.write_pointer
348
349     @synchronized
350     def locator(self):
351         """The Keep locator for this buffer's contents."""
352         if self._locator is None:
353             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
354         return self._locator
355
356     @synchronized
357     def clone(self, new_blockid, owner):
358         if self._state == _BufferBlock.COMMITTED:
359             raise AssertionError("Cannot duplicate committed buffer block")
360         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
361         bufferblock.append(self.buffer_view[0:self.size()])
362         return bufferblock
363
364     @synchronized
365     def clear(self):
366         self.owner = None
367         self.buffer_block = None
368         self.buffer_view = None
369
370
371 class NoopLock(object):
372     def __enter__(self):
373         return self
374
375     def __exit__(self, exc_type, exc_value, traceback):
376         pass
377
378     def acquire(self, blocking=False):
379         pass
380
381     def release(self):
382         pass
383
384
385 def must_be_writable(orig_func):
386     @functools.wraps(orig_func)
387     def must_be_writable_wrapper(self, *args, **kwargs):
388         if not self.writable():
389             raise IOError(errno.EROFS, "Collection is read-only.")
390         return orig_func(self, *args, **kwargs)
391     return must_be_writable_wrapper
392
393
394 class _BlockManager(object):
395     """BlockManager handles buffer blocks.
396
397     Also handles background block uploads, and background block prefetch for a
398     Collection of ArvadosFiles.
399
400     """
401
402     DEFAULT_PUT_THREADS = 2
403     DEFAULT_GET_THREADS = 2
404
405     def __init__(self, keep, copies=None):
406         """keep: KeepClient object to use"""
407         self._keep = keep
408         self._bufferblocks = {}
409         self._put_queue = None
410         self._put_threads = None
411         self._prefetch_queue = None
412         self._prefetch_threads = None
413         self.lock = threading.Lock()
414         self.prefetch_enabled = True
415         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
416         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
417         self.copies = copies
418
419     @synchronized
420     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
421         """Allocate a new, empty bufferblock in WRITABLE state and return it.
422
423         :blockid:
424           optional block identifier, otherwise one will be automatically assigned
425
426         :starting_capacity:
427           optional capacity, otherwise will use default capacity
428
429         :owner:
430           ArvadosFile that owns this block
431
432         """
433         if blockid is None:
434             blockid = "bufferblock%i" % len(self._bufferblocks)
435         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
436         self._bufferblocks[bufferblock.blockid] = bufferblock
437         return bufferblock
438
439     @synchronized
440     def dup_block(self, block, owner):
441         """Create a new bufferblock initialized with the content of an existing bufferblock.
442
443         :block:
444           the buffer block to copy.
445
446         :owner:
447           ArvadosFile that owns the new block
448
449         """
450         new_blockid = "bufferblock%i" % len(self._bufferblocks)
451         bufferblock = block.clone(new_blockid, owner)
452         self._bufferblocks[bufferblock.blockid] = bufferblock
453         return bufferblock
454
455     @synchronized
456     def is_bufferblock(self, locator):
457         return locator in self._bufferblocks
458
459     def _commit_bufferblock_worker(self):
460         """Background uploader thread."""
461
462         while True:
463             try:
464                 bufferblock = self._put_queue.get()
465                 if bufferblock is None:
466                     return
467
468                 if self.copies is None:
469                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
470                 else:
471                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
472                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
473
474             except Exception as e:
475                 bufferblock.set_state(_BufferBlock.ERROR, e)
476             finally:
477                 if self._put_queue is not None:
478                     self._put_queue.task_done()
479
480     @synchronized
481     def start_put_threads(self):
482         if self._put_threads is None:
483             # Start uploader threads.
484
485             # If we don't limit the Queue size, the upload queue can quickly
486             # grow to take up gigabytes of RAM if the writing process is
487             # generating data more quickly than it can be send to the Keep
488             # servers.
489             #
490             # With two upload threads and a queue size of 2, this means up to 4
491             # blocks pending.  If they are full 64 MiB blocks, that means up to
492             # 256 MiB of internal buffering, which is the same size as the
493             # default download block cache in KeepClient.
494             self._put_queue = Queue.Queue(maxsize=2)
495
496             self._put_threads = []
497             for i in xrange(0, self.num_put_threads):
498                 thread = threading.Thread(target=self._commit_bufferblock_worker)
499                 self._put_threads.append(thread)
500                 thread.daemon = True
501                 thread.start()
502
503     def _block_prefetch_worker(self):
504         """The background downloader thread."""
505         while True:
506             try:
507                 b = self._prefetch_queue.get()
508                 if b is None:
509                     return
510                 self._keep.get(b)
511             except Exception:
512                 pass
513
514     @synchronized
515     def start_get_threads(self):
516         if self._prefetch_threads is None:
517             self._prefetch_queue = Queue.Queue()
518             self._prefetch_threads = []
519             for i in xrange(0, self.num_get_threads):
520                 thread = threading.Thread(target=self._block_prefetch_worker)
521                 self._prefetch_threads.append(thread)
522                 thread.daemon = True
523                 thread.start()
524
525
526     @synchronized
527     def stop_threads(self):
528         """Shut down and wait for background upload and download threads to finish."""
529
530         if self._put_threads is not None:
531             for t in self._put_threads:
532                 self._put_queue.put(None)
533             for t in self._put_threads:
534                 t.join()
535         self._put_threads = None
536         self._put_queue = None
537
538         if self._prefetch_threads is not None:
539             for t in self._prefetch_threads:
540                 self._prefetch_queue.put(None)
541             for t in self._prefetch_threads:
542                 t.join()
543         self._prefetch_threads = None
544         self._prefetch_queue = None
545
546     def __enter__(self):
547         return self
548
549     def __exit__(self, exc_type, exc_value, traceback):
550         self.stop_threads()
551
552     def repack_small_blocks(self, force=False):
553         """Packs small blocks together before uploading"""
554         # Candidate bblocks -- This could be sorted in some way to prioritize some
555         # kind of bblocks
556         small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner and b.owner.closed() and b.owner.size() <= (config.KEEP_BLOCK_SIZE / 2)]
557         if len(small_blocks) == 0:
558             return
559
560         # Check if there's enough small blocks for combining and uploading
561         pending_write_size = sum([b.size() for b in small_blocks])
562         if force or (pending_write_size > (config.KEEP_BLOCK_SIZE / 2)):
563             if len(small_blocks) == 1:
564                 # No small blocks for repacking, leave this one alone
565                 # so it's committed before exiting.
566                 return
567             new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
568             self._bufferblocks[new_bb.blockid] = new_bb
569             size = 0
570             while len(small_blocks) > 0 and size <= (config.KEEP_BLOCK_SIZE / 2):
571                 bb = small_blocks.pop(0)
572                 size += bb.size()
573                 new_segs = []
574                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
575                 # FIXME: We shoudn't be accessing _segments directly
576                 bb.owner._segments = [Range(new_bb.blockid, 0, bb.size(), size-bb.size())]
577                 bb.clear()
578                 del self._bufferblocks[bb.blockid]
579             # new_bb's size greater half a keep block, let's commit it
580             self.commit_bufferblock(new_bb, sync=True)
581
582     def commit_bufferblock(self, block, sync):
583         """Initiate a background upload of a bufferblock.
584
585         :block:
586           The block object to upload
587
588         :sync:
589           If `sync` is True, upload the block synchronously.
590           If `sync` is False, upload the block asynchronously.  This will
591           return immediately unless the upload queue is at capacity, in
592           which case it will wait on an upload queue slot.
593
594         """
595         try:
596             # Mark the block as PENDING so to disallow any more appends.
597             block.set_state(_BufferBlock.PENDING)
598         except StateChangeError as e:
599             if e.state == _BufferBlock.PENDING:
600                 if sync:
601                     block.wait_for_commit.wait()
602                 else:
603                     return
604             if block.state() == _BufferBlock.COMMITTED:
605                 return
606             elif block.state() == _BufferBlock.ERROR:
607                 raise block.error
608             else:
609                 raise
610
611         if sync:
612             try:
613                 if self.copies is None:
614                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
615                 else:
616                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
617                 block.set_state(_BufferBlock.COMMITTED, loc)
618             except Exception as e:
619                 block.set_state(_BufferBlock.ERROR, e)
620                 raise
621         else:
622             self.start_put_threads()
623             self._put_queue.put(block)
624
625     @synchronized
626     def get_bufferblock(self, locator):
627         return self._bufferblocks.get(locator)
628
629     @synchronized
630     def delete_bufferblock(self, locator):
631         bb = self._bufferblocks[locator]
632         bb.clear()
633         del self._bufferblocks[locator]
634
635     def get_block_contents(self, locator, num_retries, cache_only=False):
636         """Fetch a block.
637
638         First checks to see if the locator is a BufferBlock and return that, if
639         not, passes the request through to KeepClient.get().
640
641         """
642         with self.lock:
643             if locator in self._bufferblocks:
644                 bufferblock = self._bufferblocks[locator]
645                 if bufferblock.state() != _BufferBlock.COMMITTED:
646                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
647                 else:
648                     locator = bufferblock._locator
649         if cache_only:
650             return self._keep.get_from_cache(locator)
651         else:
652             return self._keep.get(locator, num_retries=num_retries)
653
654     def commit_all(self):
655         """Commit all outstanding buffer blocks.
656
657         This is a synchronous call, and will not return until all buffer blocks
658         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
659
660         """
661         with self.lock:
662             self.repack_small_blocks(force=True)
663             items = self._bufferblocks.items()
664
665         for k,v in items:
666             if v.state() != _BufferBlock.COMMITTED and v.owner:
667                 v.owner.flush(sync=False)
668
669         with self.lock:
670             if self._put_queue is not None:
671                 self._put_queue.join()
672
673                 err = []
674                 for k,v in items:
675                     if v.state() == _BufferBlock.ERROR:
676                         err.append((v.locator(), v.error))
677                 if err:
678                     raise KeepWriteError("Error writing some blocks", err, label="block")
679
680         for k,v in items:
681             # flush again with sync=True to remove committed bufferblocks from
682             # the segments.
683             if v.owner:
684                 v.owner.flush(sync=True)
685
686     def block_prefetch(self, locator):
687         """Initiate a background download of a block.
688
689         This assumes that the underlying KeepClient implements a block cache,
690         so repeated requests for the same block will not result in repeated
691         downloads (unless the block is evicted from the cache.)  This method
692         does not block.
693
694         """
695
696         if not self.prefetch_enabled:
697             return
698
699         if self._keep.get_from_cache(locator) is not None:
700             return
701
702         with self.lock:
703             if locator in self._bufferblocks:
704                 return
705
706         self.start_get_threads()
707         self._prefetch_queue.put(locator)
708
709
710 class ArvadosFile(object):
711     """Represent a file in a Collection.
712
713     ArvadosFile manages the underlying representation of a file in Keep as a
714     sequence of segments spanning a set of blocks, and implements random
715     read/write access.
716
717     This object may be accessed from multiple threads.
718
719     """
720
721     def __init__(self, parent, name, stream=[], segments=[]):
722         """
723         ArvadosFile constructor.
724
725         :stream:
726           a list of Range objects representing a block stream
727
728         :segments:
729           a list of Range objects representing segments
730         """
731         self.parent = parent
732         self.name = name
733         self._closed = False
734         self._committed = False
735         self._segments = []
736         self.lock = parent.root_collection().lock
737         for s in segments:
738             self._add_segment(stream, s.locator, s.range_size)
739         self._current_bblock = None
740
741     def writable(self):
742         return self.parent.writable()
743
744     @synchronized
745     def segments(self):
746         return copy.copy(self._segments)
747
748     @synchronized
749     def clone(self, new_parent, new_name):
750         """Make a copy of this file."""
751         cp = ArvadosFile(new_parent, new_name)
752         cp.replace_contents(self)
753         return cp
754
755     @must_be_writable
756     @synchronized
757     def replace_contents(self, other):
758         """Replace segments of this file with segments from another `ArvadosFile` object."""
759
760         map_loc = {}
761         self._segments = []
762         for other_segment in other.segments():
763             new_loc = other_segment.locator
764             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
765                 if other_segment.locator not in map_loc:
766                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
767                     if bufferblock.state() != _BufferBlock.WRITABLE:
768                         map_loc[other_segment.locator] = bufferblock.locator()
769                     else:
770                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
771                 new_loc = map_loc[other_segment.locator]
772
773             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
774
775         self._committed = False
776
777     def __eq__(self, other):
778         if other is self:
779             return True
780         if not isinstance(other, ArvadosFile):
781             return False
782
783         othersegs = other.segments()
784         with self.lock:
785             if len(self._segments) != len(othersegs):
786                 return False
787             for i in xrange(0, len(othersegs)):
788                 seg1 = self._segments[i]
789                 seg2 = othersegs[i]
790                 loc1 = seg1.locator
791                 loc2 = seg2.locator
792
793                 if self.parent._my_block_manager().is_bufferblock(loc1):
794                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
795
796                 if other.parent._my_block_manager().is_bufferblock(loc2):
797                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
798
799                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
800                     seg1.range_start != seg2.range_start or
801                     seg1.range_size != seg2.range_size or
802                     seg1.segment_offset != seg2.segment_offset):
803                     return False
804
805         return True
806
807     def __ne__(self, other):
808         return not self.__eq__(other)
809
810     @synchronized
811     def set_committed(self):
812         """Set committed flag to False"""
813         self._committed = True
814
815     @synchronized
816     def committed(self):
817         """Get whether this is committed or not."""
818         return self._committed
819
820     @synchronized
821     def set_closed(self):
822         """Set current block as pending and closed flag to False"""
823         self._closed = True
824         self.parent._my_block_manager().repack_small_blocks()
825
826     @synchronized
827     def closed(self):
828         """Get whether this is closed or not."""
829         return self._closed
830
831     @must_be_writable
832     @synchronized
833     def truncate(self, size):
834         """Shrink the size of the file.
835
836         If `size` is less than the size of the file, the file contents after
837         `size` will be discarded.  If `size` is greater than the current size
838         of the file, an IOError will be raised.
839
840         """
841         if size < self.size():
842             new_segs = []
843             for r in self._segments:
844                 range_end = r.range_start+r.range_size
845                 if r.range_start >= size:
846                     # segment is past the trucate size, all done
847                     break
848                 elif size < range_end:
849                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
850                     nr.segment_offset = r.segment_offset
851                     new_segs.append(nr)
852                     break
853                 else:
854                     new_segs.append(r)
855
856             self._segments = new_segs
857             self._committed = False
858         elif size > self.size():
859             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
860
861     def readfrom(self, offset, size, num_retries, exact=False):
862         """Read up to `size` bytes from the file starting at `offset`.
863
864         :exact:
865          If False (default), return less data than requested if the read
866          crosses a block boundary and the next block isn't cached.  If True,
867          only return less data than requested when hitting EOF.
868         """
869
870         with self.lock:
871             if size == 0 or offset >= self.size():
872                 return ''
873             readsegs = locators_and_ranges(self._segments, offset, size)
874             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
875
876         locs = set()
877         data = []
878         for lr in readsegs:
879             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
880             if block:
881                 blockview = memoryview(block)
882                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
883                 locs.add(lr.locator)
884             else:
885                 break
886
887         for lr in prefetch:
888             if lr.locator not in locs:
889                 self.parent._my_block_manager().block_prefetch(lr.locator)
890                 locs.add(lr.locator)
891
892         return ''.join(data)
893
894     def _repack_writes(self, num_retries):
895         """Test if the buffer block has more data than actual segments.
896
897         This happens when a buffered write over-writes a file range written in
898         a previous buffered write.  Re-pack the buffer block for efficiency
899         and to avoid leaking information.
900
901         """
902         segs = self._segments
903
904         # Sum up the segments to get the total bytes of the file referencing
905         # into the buffer block.
906         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
907         write_total = sum([s.range_size for s in bufferblock_segs])
908
909         if write_total < self._current_bblock.size():
910             # There is more data in the buffer block than is actually accounted for by segments, so
911             # re-pack into a new buffer by copying over to a new buffer block.
912             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
913             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
914             for t in bufferblock_segs:
915                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
916                 t.segment_offset = new_bb.size() - t.range_size
917
918             self._current_bblock = new_bb
919
920     @must_be_writable
921     @synchronized
922     def writeto(self, offset, data, num_retries):
923         """Write `data` to the file starting at `offset`.
924
925         This will update existing bytes and/or extend the size of the file as
926         necessary.
927
928         """
929         if len(data) == 0:
930             return
931
932         if offset > self.size():
933             raise ArgumentError("Offset is past the end of the file")
934
935         if len(data) > config.KEEP_BLOCK_SIZE:
936             # Chunk it up into smaller writes
937             n = 0
938             dataview = memoryview(data)
939             while n < len(data):
940                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
941                 n += config.KEEP_BLOCK_SIZE
942             return
943
944         self._committed = False
945
946         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
947             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
948
949         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
950             self._repack_writes(num_retries)
951             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
952                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
953                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
954
955         self._current_bblock.append(data)
956
957         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
958
959         self.parent.notify(WRITE, self.parent, self.name, (self, self))
960
961         return len(data)
962
963     @synchronized
964     def flush(self, sync=True, num_retries=0):
965         """Flush the current bufferblock to Keep.
966
967         :sync:
968           If True, commit block synchronously, wait until buffer block has been written.
969           If False, commit block asynchronously, return immediately after putting block into
970           the keep put queue.
971         """
972         if self.committed():
973             return
974
975         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
976             if self._current_bblock.state() == _BufferBlock.WRITABLE:
977                 self._repack_writes(num_retries)
978             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
979
980         if sync:
981             to_delete = set()
982             for s in self._segments:
983                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
984                 if bb:
985                     if bb.state() != _BufferBlock.COMMITTED:
986                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
987                     to_delete.add(s.locator)
988                     s.locator = bb.locator()
989             for s in to_delete:
990                self.parent._my_block_manager().delete_bufferblock(s)
991
992         self.parent.notify(MOD, self.parent, self.name, (self, self))
993
994     @must_be_writable
995     @synchronized
996     def add_segment(self, blocks, pos, size):
997         """Add a segment to the end of the file.
998
999         `pos` and `offset` reference a section of the stream described by
1000         `blocks` (a list of Range objects)
1001
1002         """
1003         self._add_segment(blocks, pos, size)
1004
1005     def _add_segment(self, blocks, pos, size):
1006         """Internal implementation of add_segment."""
1007         self._committed = False
1008         for lr in locators_and_ranges(blocks, pos, size):
1009             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1010             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1011             self._segments.append(r)
1012
1013     @synchronized
1014     def size(self):
1015         """Get the file size."""
1016         if self._segments:
1017             n = self._segments[-1]
1018             return n.range_start + n.range_size
1019         else:
1020             return 0
1021
1022     @synchronized
1023     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1024         buf = ""
1025         filestream = []
1026         for segment in self.segments:
1027             loc = segment.locator
1028             if loc.startswith("bufferblock"):
1029                 loc = self._bufferblocks[loc].calculate_locator()
1030             if portable_locators:
1031                 loc = KeepLocator(loc).stripped()
1032             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1033                                  segment.segment_offset, segment.range_size))
1034         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1035         buf += "\n"
1036         return buf
1037
1038     @must_be_writable
1039     @synchronized
1040     def _reparent(self, newparent, newname):
1041         self._committed = False
1042         self.flush(sync=True)
1043         self.parent.remove(self.name)
1044         self.parent = newparent
1045         self.name = newname
1046         self.lock = self.parent.root_collection().lock
1047
1048
1049 class ArvadosFileReader(ArvadosFileReaderBase):
1050     """Wraps ArvadosFile in a file-like object supporting reading only.
1051
1052     Be aware that this class is NOT thread safe as there is no locking around
1053     updating file pointer.
1054
1055     """
1056
1057     def __init__(self, arvadosfile, num_retries=None):
1058         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1059         self.arvadosfile = arvadosfile
1060
1061     def size(self):
1062         return self.arvadosfile.size()
1063
1064     def stream_name(self):
1065         return self.arvadosfile.parent.stream_name()
1066
1067     @_FileLikeObjectBase._before_close
1068     @retry_method
1069     def read(self, size=None, num_retries=None):
1070         """Read up to `size` bytes from the file and return the result.
1071
1072         Starts at the current file position.  If `size` is None, read the
1073         entire remainder of the file.
1074         """
1075         if size is None:
1076             data = []
1077             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1078             while rd:
1079                 data.append(rd)
1080                 self._filepos += len(rd)
1081                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1082             return ''.join(data)
1083         else:
1084             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1085             self._filepos += len(data)
1086             return data
1087
1088     @_FileLikeObjectBase._before_close
1089     @retry_method
1090     def readfrom(self, offset, size, num_retries=None):
1091         """Read up to `size` bytes from the stream, starting at the specified file offset.
1092
1093         This method does not change the file position.
1094         """
1095         return self.arvadosfile.readfrom(offset, size, num_retries)
1096
1097     def flush(self):
1098         pass
1099
1100
1101 class ArvadosFileWriter(ArvadosFileReader):
1102     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1103
1104     Be aware that this class is NOT thread safe as there is no locking around
1105     updating file pointer.
1106
1107     """
1108
1109     def __init__(self, arvadosfile, mode, num_retries=None):
1110         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1111         self.mode = mode
1112
1113     @_FileLikeObjectBase._before_close
1114     @retry_method
1115     def write(self, data, num_retries=None):
1116         if self.mode[0] == "a":
1117             self.arvadosfile.writeto(self.size(), data, num_retries)
1118         else:
1119             self.arvadosfile.writeto(self._filepos, data, num_retries)
1120             self._filepos += len(data)
1121         return len(data)
1122
1123     @_FileLikeObjectBase._before_close
1124     @retry_method
1125     def writelines(self, seq, num_retries=None):
1126         for s in seq:
1127             self.write(s, num_retries=num_retries)
1128
1129     @_FileLikeObjectBase._before_close
1130     def truncate(self, size=None):
1131         if size is None:
1132             size = self._filepos
1133         self.arvadosfile.truncate(size)
1134         if self._filepos > self.size():
1135             self._filepos = self.size()
1136
1137     @_FileLikeObjectBase._before_close
1138     def flush(self):
1139         self.arvadosfile.flush()
1140
1141     def close(self, flush=True):
1142         if not self.closed:
1143             if flush:
1144                 self.flush()
1145             self.arvadosfile.set_closed()
1146             super(ArvadosFileWriter, self).close()