11510: Repack writes any time there's more than one segment referencing the same...
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41
42 class UnownedBlockError(Exception):
43     """Raised when there's an writable block without an owner on the BlockManager."""
44     pass
45
46
47 class _FileLikeObjectBase(object):
48     def __init__(self, name, mode):
49         self.name = name
50         self.mode = mode
51         self.closed = False
52
53     @staticmethod
54     def _before_close(orig_func):
55         @functools.wraps(orig_func)
56         def before_close_wrapper(self, *args, **kwargs):
57             if self.closed:
58                 raise ValueError("I/O operation on closed stream file")
59             return orig_func(self, *args, **kwargs)
60         return before_close_wrapper
61
62     def __enter__(self):
63         return self
64
65     def __exit__(self, exc_type, exc_value, traceback):
66         try:
67             self.close()
68         except Exception:
69             if exc_type is None:
70                 raise
71
72     def close(self):
73         self.closed = True
74
75
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77     def __init__(self, name, mode, num_retries=None):
78         super(ArvadosFileReaderBase, self).__init__(name, mode)
79         self._filepos = 0L
80         self.num_retries = num_retries
81         self._readline_cache = (None, None)
82
83     def __iter__(self):
84         while True:
85             data = self.readline()
86             if not data:
87                 break
88             yield data
89
90     def decompressed_name(self):
91         return re.sub('\.(bz2|gz)$', '', self.name)
92
93     @_FileLikeObjectBase._before_close
94     def seek(self, pos, whence=os.SEEK_SET):
95         if whence == os.SEEK_CUR:
96             pos += self._filepos
97         elif whence == os.SEEK_END:
98             pos += self.size()
99         if pos < 0L:
100             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
101         self._filepos = pos
102
103     def tell(self):
104         return self._filepos
105
106     @_FileLikeObjectBase._before_close
107     @retry_method
108     def readall(self, size=2**20, num_retries=None):
109         while True:
110             data = self.read(size, num_retries=num_retries)
111             if data == '':
112                 break
113             yield data
114
115     @_FileLikeObjectBase._before_close
116     @retry_method
117     def readline(self, size=float('inf'), num_retries=None):
118         cache_pos, cache_data = self._readline_cache
119         if self.tell() == cache_pos:
120             data = [cache_data]
121             self._filepos += len(cache_data)
122         else:
123             data = ['']
124         data_size = len(data[-1])
125         while (data_size < size) and ('\n' not in data[-1]):
126             next_read = self.read(2 ** 20, num_retries=num_retries)
127             if not next_read:
128                 break
129             data.append(next_read)
130             data_size += len(next_read)
131         data = ''.join(data)
132         try:
133             nextline_index = data.index('\n') + 1
134         except ValueError:
135             nextline_index = len(data)
136         nextline_index = min(nextline_index, size)
137         self._filepos -= len(data) - nextline_index
138         self._readline_cache = (self.tell(), data[nextline_index:])
139         return data[:nextline_index]
140
141     @_FileLikeObjectBase._before_close
142     @retry_method
143     def decompress(self, decompress, size, num_retries=None):
144         for segment in self.readall(size, num_retries=num_retries):
145             data = decompress(segment)
146             if data:
147                 yield data
148
149     @_FileLikeObjectBase._before_close
150     @retry_method
151     def readall_decompressed(self, size=2**20, num_retries=None):
152         self.seek(0)
153         if self.name.endswith('.bz2'):
154             dc = bz2.BZ2Decompressor()
155             return self.decompress(dc.decompress, size,
156                                    num_retries=num_retries)
157         elif self.name.endswith('.gz'):
158             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
159             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
160                                    size, num_retries=num_retries)
161         else:
162             return self.readall(size, num_retries=num_retries)
163
164     @_FileLikeObjectBase._before_close
165     @retry_method
166     def readlines(self, sizehint=float('inf'), num_retries=None):
167         data = []
168         data_size = 0
169         for s in self.readall(num_retries=num_retries):
170             data.append(s)
171             data_size += len(s)
172             if data_size >= sizehint:
173                 break
174         return ''.join(data).splitlines(True)
175
176     def size(self):
177         raise NotImplementedError()
178
179     def read(self, size, num_retries=None):
180         raise NotImplementedError()
181
182     def readfrom(self, start, size, num_retries=None):
183         raise NotImplementedError()
184
185
186 class StreamFileReader(ArvadosFileReaderBase):
187     class _NameAttribute(str):
188         # The Python file API provides a plain .name attribute.
189         # Older SDK provided a name() method.
190         # This class provides both, for maximum compatibility.
191         def __call__(self):
192             return self
193
194     def __init__(self, stream, segments, name):
195         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
196         self._stream = stream
197         self.segments = segments
198
199     def stream_name(self):
200         return self._stream.name()
201
202     def size(self):
203         n = self.segments[-1]
204         return n.range_start + n.range_size
205
206     @_FileLikeObjectBase._before_close
207     @retry_method
208     def read(self, size, num_retries=None):
209         """Read up to 'size' bytes from the stream, starting at the current file position"""
210         if size == 0:
211             return ''
212
213         data = ''
214         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
215         if available_chunks:
216             lr = available_chunks[0]
217             data = self._stream.readfrom(lr.locator+lr.segment_offset,
218                                           lr.segment_size,
219                                           num_retries=num_retries)
220
221         self._filepos += len(data)
222         return data
223
224     @_FileLikeObjectBase._before_close
225     @retry_method
226     def readfrom(self, start, size, num_retries=None):
227         """Read up to 'size' bytes from the stream, starting at 'start'"""
228         if size == 0:
229             return ''
230
231         data = []
232         for lr in locators_and_ranges(self.segments, start, size):
233             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
234                                               num_retries=num_retries))
235         return ''.join(data)
236
237     def as_manifest(self):
238         segs = []
239         for r in self.segments:
240             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
241         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
242
243
244 def synchronized(orig_func):
245     @functools.wraps(orig_func)
246     def synchronized_wrapper(self, *args, **kwargs):
247         with self.lock:
248             return orig_func(self, *args, **kwargs)
249     return synchronized_wrapper
250
251
252 class StateChangeError(Exception):
253     def __init__(self, message, state, nextstate):
254         super(StateChangeError, self).__init__(message)
255         self.state = state
256         self.nextstate = nextstate
257
258 class _BufferBlock(object):
259     """A stand-in for a Keep block that is in the process of being written.
260
261     Writers can append to it, get the size, and compute the Keep locator.
262     There are three valid states:
263
264     WRITABLE
265       Can append to block.
266
267     PENDING
268       Block is in the process of being uploaded to Keep, append is an error.
269
270     COMMITTED
271       The block has been written to Keep, its internal buffer has been
272       released, fetching the block will fetch it via keep client (since we
273       discarded the internal copy), and identifiers referring to the BufferBlock
274       can be replaced with the block locator.
275
276     """
277
278     WRITABLE = 0
279     PENDING = 1
280     COMMITTED = 2
281     ERROR = 3
282
283     def __init__(self, blockid, starting_capacity, owner):
284         """
285         :blockid:
286           the identifier for this block
287
288         :starting_capacity:
289           the initial buffer capacity
290
291         :owner:
292           ArvadosFile that owns this block
293
294         """
295         self.blockid = blockid
296         self.buffer_block = bytearray(starting_capacity)
297         self.buffer_view = memoryview(self.buffer_block)
298         self.write_pointer = 0
299         self._state = _BufferBlock.WRITABLE
300         self._locator = None
301         self.owner = owner
302         self.lock = threading.Lock()
303         self.wait_for_commit = threading.Event()
304         self.error = None
305
306     @synchronized
307     def append(self, data):
308         """Append some data to the buffer.
309
310         Only valid if the block is in WRITABLE state.  Implements an expanding
311         buffer, doubling capacity as needed to accomdate all the data.
312
313         """
314         if self._state == _BufferBlock.WRITABLE:
315             while (self.write_pointer+len(data)) > len(self.buffer_block):
316                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
317                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
318                 self.buffer_block = new_buffer_block
319                 self.buffer_view = memoryview(self.buffer_block)
320             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
321             self.write_pointer += len(data)
322             self._locator = None
323         else:
324             raise AssertionError("Buffer block is not writable")
325
326     STATE_TRANSITIONS = frozenset([
327             (WRITABLE, PENDING),
328             (PENDING, COMMITTED),
329             (PENDING, ERROR),
330             (ERROR, PENDING)])
331
332     @synchronized
333     def set_state(self, nextstate, val=None):
334         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
335             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
336         self._state = nextstate
337
338         if self._state == _BufferBlock.PENDING:
339             self.wait_for_commit.clear()
340
341         if self._state == _BufferBlock.COMMITTED:
342             self._locator = val
343             self.buffer_view = None
344             self.buffer_block = None
345             self.wait_for_commit.set()
346
347         if self._state == _BufferBlock.ERROR:
348             self.error = val
349             self.wait_for_commit.set()
350
351     @synchronized
352     def state(self):
353         return self._state
354
355     def size(self):
356         """The amount of data written to the buffer."""
357         return self.write_pointer
358
359     @synchronized
360     def locator(self):
361         """The Keep locator for this buffer's contents."""
362         if self._locator is None:
363             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
364         return self._locator
365
366     @synchronized
367     def clone(self, new_blockid, owner):
368         if self._state == _BufferBlock.COMMITTED:
369             raise AssertionError("Cannot duplicate committed buffer block")
370         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
371         bufferblock.append(self.buffer_view[0:self.size()])
372         return bufferblock
373
374     @synchronized
375     def clear(self):
376         self.owner = None
377         self.buffer_block = None
378         self.buffer_view = None
379
380
381 class NoopLock(object):
382     def __enter__(self):
383         return self
384
385     def __exit__(self, exc_type, exc_value, traceback):
386         pass
387
388     def acquire(self, blocking=False):
389         pass
390
391     def release(self):
392         pass
393
394
395 def must_be_writable(orig_func):
396     @functools.wraps(orig_func)
397     def must_be_writable_wrapper(self, *args, **kwargs):
398         if not self.writable():
399             raise IOError(errno.EROFS, "Collection is read-only.")
400         return orig_func(self, *args, **kwargs)
401     return must_be_writable_wrapper
402
403
404 class _BlockManager(object):
405     """BlockManager handles buffer blocks.
406
407     Also handles background block uploads, and background block prefetch for a
408     Collection of ArvadosFiles.
409
410     """
411
412     DEFAULT_PUT_THREADS = 2
413     DEFAULT_GET_THREADS = 2
414
415     def __init__(self, keep, copies=None, put_threads=None):
416         """keep: KeepClient object to use"""
417         self._keep = keep
418         self._bufferblocks = collections.OrderedDict()
419         self._put_queue = None
420         self._put_threads = None
421         self._prefetch_queue = None
422         self._prefetch_threads = None
423         self.lock = threading.Lock()
424         self.prefetch_enabled = True
425         if put_threads:
426             self.num_put_threads = put_threads
427         else:
428             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
429         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
430         self.copies = copies
431         self._pending_write_size = 0
432         self.threads_lock = threading.Lock()
433         self.padding_block = None
434
435     @synchronized
436     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
437         """Allocate a new, empty bufferblock in WRITABLE state and return it.
438
439         :blockid:
440           optional block identifier, otherwise one will be automatically assigned
441
442         :starting_capacity:
443           optional capacity, otherwise will use default capacity
444
445         :owner:
446           ArvadosFile that owns this block
447
448         """
449         return self._alloc_bufferblock(blockid, starting_capacity, owner)
450
451     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
452         if blockid is None:
453             blockid = "%s" % uuid.uuid4()
454         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
455         self._bufferblocks[bufferblock.blockid] = bufferblock
456         return bufferblock
457
458     @synchronized
459     def dup_block(self, block, owner):
460         """Create a new bufferblock initialized with the content of an existing bufferblock.
461
462         :block:
463           the buffer block to copy.
464
465         :owner:
466           ArvadosFile that owns the new block
467
468         """
469         new_blockid = "bufferblock%i" % len(self._bufferblocks)
470         bufferblock = block.clone(new_blockid, owner)
471         self._bufferblocks[bufferblock.blockid] = bufferblock
472         return bufferblock
473
474     @synchronized
475     def is_bufferblock(self, locator):
476         return locator in self._bufferblocks
477
478     def _commit_bufferblock_worker(self):
479         """Background uploader thread."""
480
481         while True:
482             try:
483                 bufferblock = self._put_queue.get()
484                 if bufferblock is None:
485                     return
486
487                 if self.copies is None:
488                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
489                 else:
490                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
491                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
492
493             except Exception as e:
494                 bufferblock.set_state(_BufferBlock.ERROR, e)
495             finally:
496                 if self._put_queue is not None:
497                     self._put_queue.task_done()
498
499     def start_put_threads(self):
500         with self.threads_lock:
501             if self._put_threads is None:
502                 # Start uploader threads.
503
504                 # If we don't limit the Queue size, the upload queue can quickly
505                 # grow to take up gigabytes of RAM if the writing process is
506                 # generating data more quickly than it can be send to the Keep
507                 # servers.
508                 #
509                 # With two upload threads and a queue size of 2, this means up to 4
510                 # blocks pending.  If they are full 64 MiB blocks, that means up to
511                 # 256 MiB of internal buffering, which is the same size as the
512                 # default download block cache in KeepClient.
513                 self._put_queue = Queue.Queue(maxsize=2)
514
515                 self._put_threads = []
516                 for i in xrange(0, self.num_put_threads):
517                     thread = threading.Thread(target=self._commit_bufferblock_worker)
518                     self._put_threads.append(thread)
519                     thread.daemon = True
520                     thread.start()
521
522     def _block_prefetch_worker(self):
523         """The background downloader thread."""
524         while True:
525             try:
526                 b = self._prefetch_queue.get()
527                 if b is None:
528                     return
529                 self._keep.get(b)
530             except Exception:
531                 _logger.exception("Exception doing block prefetch")
532
533     @synchronized
534     def start_get_threads(self):
535         if self._prefetch_threads is None:
536             self._prefetch_queue = Queue.Queue()
537             self._prefetch_threads = []
538             for i in xrange(0, self.num_get_threads):
539                 thread = threading.Thread(target=self._block_prefetch_worker)
540                 self._prefetch_threads.append(thread)
541                 thread.daemon = True
542                 thread.start()
543
544
545     @synchronized
546     def stop_threads(self):
547         """Shut down and wait for background upload and download threads to finish."""
548
549         if self._put_threads is not None:
550             for t in self._put_threads:
551                 self._put_queue.put(None)
552             for t in self._put_threads:
553                 t.join()
554         self._put_threads = None
555         self._put_queue = None
556
557         if self._prefetch_threads is not None:
558             for t in self._prefetch_threads:
559                 self._prefetch_queue.put(None)
560             for t in self._prefetch_threads:
561                 t.join()
562         self._prefetch_threads = None
563         self._prefetch_queue = None
564
565     def __enter__(self):
566         return self
567
568     def __exit__(self, exc_type, exc_value, traceback):
569         self.stop_threads()
570
571     @synchronized
572     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
573         """Packs small blocks together before uploading"""
574         self._pending_write_size += closed_file_size
575
576         # Check if there are enough small blocks for filling up one in full
577         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
578
579             # Search blocks ready for getting packed together before being committed to Keep.
580             # A WRITABLE block always has an owner.
581             # A WRITABLE block with its owner.closed() implies that it's
582             # size is <= KEEP_BLOCK_SIZE/2.
583             try:
584                 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
585             except AttributeError:
586                 # Writable blocks without owner shouldn't exist.
587                 raise UnownedBlockError()
588
589             if len(small_blocks) <= 1:
590                 # Not enough small blocks for repacking
591                 return
592
593             # Update the pending write size count with its true value, just in case
594             # some small file was opened, written and closed several times.
595             self._pending_write_size = sum([b.size() for b in small_blocks])
596             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
597                 return
598
599             new_bb = self._alloc_bufferblock()
600             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
601                 bb = small_blocks.pop(0)
602                 arvfile = bb.owner
603                 self._pending_write_size -= bb.size()
604                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
605                 arvfile.set_segments([Range(new_bb.blockid,
606                                             0,
607                                             bb.size(),
608                                             new_bb.write_pointer - bb.size())])
609                 self._delete_bufferblock(bb.blockid)
610             self.commit_bufferblock(new_bb, sync=sync)
611
612     def commit_bufferblock(self, block, sync):
613         """Initiate a background upload of a bufferblock.
614
615         :block:
616           The block object to upload
617
618         :sync:
619           If `sync` is True, upload the block synchronously.
620           If `sync` is False, upload the block asynchronously.  This will
621           return immediately unless the upload queue is at capacity, in
622           which case it will wait on an upload queue slot.
623
624         """
625         try:
626             # Mark the block as PENDING so to disallow any more appends.
627             block.set_state(_BufferBlock.PENDING)
628         except StateChangeError as e:
629             if e.state == _BufferBlock.PENDING:
630                 if sync:
631                     block.wait_for_commit.wait()
632                 else:
633                     return
634             if block.state() == _BufferBlock.COMMITTED:
635                 return
636             elif block.state() == _BufferBlock.ERROR:
637                 raise block.error
638             else:
639                 raise
640
641         if sync:
642             try:
643                 if self.copies is None:
644                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
645                 else:
646                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
647                 block.set_state(_BufferBlock.COMMITTED, loc)
648             except Exception as e:
649                 block.set_state(_BufferBlock.ERROR, e)
650                 raise
651         else:
652             self.start_put_threads()
653             self._put_queue.put(block)
654
655     @synchronized
656     def get_bufferblock(self, locator):
657         return self._bufferblocks.get(locator)
658
659     @synchronized
660     def get_padding_block(self):
661         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
662         when using truncate() to extend the size of a file."""
663
664         if self.padding_block is None:
665             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
666             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
667             self.commit_bufferblock(self.padding_block, False)
668         return self.padding_block
669
670     @synchronized
671     def delete_bufferblock(self, locator):
672         self._delete_bufferblock(locator)
673
674     def _delete_bufferblock(self, locator):
675         bb = self._bufferblocks[locator]
676         bb.clear()
677         del self._bufferblocks[locator]
678
679     def get_block_contents(self, locator, num_retries, cache_only=False):
680         """Fetch a block.
681
682         First checks to see if the locator is a BufferBlock and return that, if
683         not, passes the request through to KeepClient.get().
684
685         """
686         with self.lock:
687             if locator in self._bufferblocks:
688                 bufferblock = self._bufferblocks[locator]
689                 if bufferblock.state() != _BufferBlock.COMMITTED:
690                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
691                 else:
692                     locator = bufferblock._locator
693         if cache_only:
694             return self._keep.get_from_cache(locator)
695         else:
696             return self._keep.get(locator, num_retries=num_retries)
697
698     def commit_all(self):
699         """Commit all outstanding buffer blocks.
700
701         This is a synchronous call, and will not return until all buffer blocks
702         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
703
704         """
705         self.repack_small_blocks(force=True, sync=True)
706
707         with self.lock:
708             items = self._bufferblocks.items()
709
710         for k,v in items:
711             if v.state() != _BufferBlock.COMMITTED and v.owner:
712                 v.owner.flush(sync=False)
713
714         with self.lock:
715             if self._put_queue is not None:
716                 self._put_queue.join()
717
718                 err = []
719                 for k,v in items:
720                     if v.state() == _BufferBlock.ERROR:
721                         err.append((v.locator(), v.error))
722                 if err:
723                     raise KeepWriteError("Error writing some blocks", err, label="block")
724
725         for k,v in items:
726             # flush again with sync=True to remove committed bufferblocks from
727             # the segments.
728             if v.owner:
729                 v.owner.flush(sync=True)
730
731     def block_prefetch(self, locator):
732         """Initiate a background download of a block.
733
734         This assumes that the underlying KeepClient implements a block cache,
735         so repeated requests for the same block will not result in repeated
736         downloads (unless the block is evicted from the cache.)  This method
737         does not block.
738
739         """
740
741         if not self.prefetch_enabled:
742             return
743
744         if self._keep.get_from_cache(locator) is not None:
745             return
746
747         with self.lock:
748             if locator in self._bufferblocks:
749                 return
750
751         self.start_get_threads()
752         self._prefetch_queue.put(locator)
753
754
755 class ArvadosFile(object):
756     """Represent a file in a Collection.
757
758     ArvadosFile manages the underlying representation of a file in Keep as a
759     sequence of segments spanning a set of blocks, and implements random
760     read/write access.
761
762     This object may be accessed from multiple threads.
763
764     """
765
766     def __init__(self, parent, name, stream=[], segments=[]):
767         """
768         ArvadosFile constructor.
769
770         :stream:
771           a list of Range objects representing a block stream
772
773         :segments:
774           a list of Range objects representing segments
775         """
776         self.parent = parent
777         self.name = name
778         self._writers = set()
779         self._committed = False
780         self._segments = []
781         self.lock = parent.root_collection().lock
782         for s in segments:
783             self._add_segment(stream, s.locator, s.range_size)
784         self._current_bblock = None
785
786     def writable(self):
787         return self.parent.writable()
788
789     @synchronized
790     def permission_expired(self, as_of_dt=None):
791         """Returns True if any of the segment's locators is expired"""
792         for r in self._segments:
793             if KeepLocator(r.locator).permission_expired(as_of_dt):
794                 return True
795         return False
796
797     @synchronized
798     def segments(self):
799         return copy.copy(self._segments)
800
801     @synchronized
802     def clone(self, new_parent, new_name):
803         """Make a copy of this file."""
804         cp = ArvadosFile(new_parent, new_name)
805         cp.replace_contents(self)
806         return cp
807
808     @must_be_writable
809     @synchronized
810     def replace_contents(self, other):
811         """Replace segments of this file with segments from another `ArvadosFile` object."""
812
813         map_loc = {}
814         self._segments = []
815         for other_segment in other.segments():
816             new_loc = other_segment.locator
817             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
818                 if other_segment.locator not in map_loc:
819                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
820                     if bufferblock.state() != _BufferBlock.WRITABLE:
821                         map_loc[other_segment.locator] = bufferblock.locator()
822                     else:
823                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
824                 new_loc = map_loc[other_segment.locator]
825
826             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
827
828         self.set_committed(False)
829
830     def __eq__(self, other):
831         if other is self:
832             return True
833         if not isinstance(other, ArvadosFile):
834             return False
835
836         othersegs = other.segments()
837         with self.lock:
838             if len(self._segments) != len(othersegs):
839                 return False
840             for i in xrange(0, len(othersegs)):
841                 seg1 = self._segments[i]
842                 seg2 = othersegs[i]
843                 loc1 = seg1.locator
844                 loc2 = seg2.locator
845
846                 if self.parent._my_block_manager().is_bufferblock(loc1):
847                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
848
849                 if other.parent._my_block_manager().is_bufferblock(loc2):
850                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
851
852                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
853                     seg1.range_start != seg2.range_start or
854                     seg1.range_size != seg2.range_size or
855                     seg1.segment_offset != seg2.segment_offset):
856                     return False
857
858         return True
859
860     def __ne__(self, other):
861         return not self.__eq__(other)
862
863     @synchronized
864     def set_segments(self, segs):
865         self._segments = segs
866
867     @synchronized
868     def set_committed(self, value=True):
869         """Set committed flag.
870
871         If value is True, set committed to be True.
872
873         If value is False, set committed to be False for this and all parents.
874         """
875         if value == self._committed:
876             return
877         self._committed = value
878         if self._committed is False and self.parent is not None:
879             self.parent.set_committed(False)
880
881     @synchronized
882     def committed(self):
883         """Get whether this is committed or not."""
884         return self._committed
885
886     @synchronized
887     def add_writer(self, writer):
888         """Add an ArvadosFileWriter reference to the list of writers"""
889         if isinstance(writer, ArvadosFileWriter):
890             self._writers.add(writer)
891
892     @synchronized
893     def remove_writer(self, writer, flush):
894         """
895         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
896         and do some block maintenance tasks.
897         """
898         self._writers.remove(writer)
899
900         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
901             # File writer closed, not small enough for repacking
902             self.flush()
903         elif self.closed():
904             # All writers closed and size is adequate for repacking
905             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
906
907     def closed(self):
908         """
909         Get whether this is closed or not. When the writers list is empty, the file
910         is supposed to be closed.
911         """
912         return len(self._writers) == 0
913
914     @must_be_writable
915     @synchronized
916     def truncate(self, size):
917         """Shrink or expand the size of the file.
918
919         If `size` is less than the size of the file, the file contents after
920         `size` will be discarded.  If `size` is greater than the current size
921         of the file, it will be filled with zero bytes.
922
923         """
924         if size < self.size():
925             new_segs = []
926             for r in self._segments:
927                 range_end = r.range_start+r.range_size
928                 if r.range_start >= size:
929                     # segment is past the trucate size, all done
930                     break
931                 elif size < range_end:
932                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
933                     nr.segment_offset = r.segment_offset
934                     new_segs.append(nr)
935                     break
936                 else:
937                     new_segs.append(r)
938
939             self._segments = new_segs
940             self.set_committed(False)
941         elif size > self.size():
942             padding = self.parent._my_block_manager().get_padding_block()
943             diff = size - self.size()
944             while diff > config.KEEP_BLOCK_SIZE:
945                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
946                 diff -= config.KEEP_BLOCK_SIZE
947             if diff > 0:
948                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
949             self.set_committed(False)
950         else:
951             # size == self.size()
952             pass
953
954     def readfrom(self, offset, size, num_retries, exact=False):
955         """Read up to `size` bytes from the file starting at `offset`.
956
957         :exact:
958          If False (default), return less data than requested if the read
959          crosses a block boundary and the next block isn't cached.  If True,
960          only return less data than requested when hitting EOF.
961         """
962
963         with self.lock:
964             if size == 0 or offset >= self.size():
965                 return ''
966             readsegs = locators_and_ranges(self._segments, offset, size)
967             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
968
969         locs = set()
970         data = []
971         for lr in readsegs:
972             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
973             if block:
974                 blockview = memoryview(block)
975                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
976                 locs.add(lr.locator)
977             else:
978                 break
979
980         for lr in prefetch:
981             if lr.locator not in locs:
982                 self.parent._my_block_manager().block_prefetch(lr.locator)
983                 locs.add(lr.locator)
984
985         return ''.join(data)
986
987     def _repack_writes(self, num_retries):
988         """Optimize buffer block by repacking segments in file sequence.
989
990         When the client makes random writes, they appear in the buffer block in
991         the sequence they were written rather than the sequence they appear in
992         the file.  This makes for inefficient, fragmented manifests.  Attempt
993         to optimize by repacking writes in file sequence.
994
995         """
996         segs = self._segments
997
998         # Collect the segments that reference the buffer block.
999         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1000
1001         if len(bufferblock_segs) > 1:
1002             # Collect total data referenced by segments (could be smaller than
1003             # bufferblock size if a portion of the file was written and
1004             # then overwritten).
1005             write_total = sum([s.range_size for s in bufferblock_segs])
1006
1007             # If there's more than one segment referencing this block, it is
1008             # due to out-of-order writes and will produce a fragmented
1009             # manifest, so try to optimize by re-packing into a new buffer.
1010             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1011             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1012             for t in bufferblock_segs:
1013                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1014                 t.segment_offset = new_bb.size() - t.range_size
1015
1016             self._current_bblock = new_bb
1017
1018     @must_be_writable
1019     @synchronized
1020     def writeto(self, offset, data, num_retries):
1021         """Write `data` to the file starting at `offset`.
1022
1023         This will update existing bytes and/or extend the size of the file as
1024         necessary.
1025
1026         """
1027         if len(data) == 0:
1028             return
1029
1030         if offset > self.size():
1031             self.truncate(offset)
1032
1033         if len(data) > config.KEEP_BLOCK_SIZE:
1034             # Chunk it up into smaller writes
1035             n = 0
1036             dataview = memoryview(data)
1037             while n < len(data):
1038                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1039                 n += config.KEEP_BLOCK_SIZE
1040             return
1041
1042         self.set_committed(False)
1043
1044         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1045             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1046
1047         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1048             self._repack_writes(num_retries)
1049             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1050                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1051                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1052
1053         self._current_bblock.append(data)
1054
1055         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1056
1057         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1058
1059         return len(data)
1060
1061     @synchronized
1062     def flush(self, sync=True, num_retries=0):
1063         """Flush the current bufferblock to Keep.
1064
1065         :sync:
1066           If True, commit block synchronously, wait until buffer block has been written.
1067           If False, commit block asynchronously, return immediately after putting block into
1068           the keep put queue.
1069         """
1070         if self.committed():
1071             return
1072
1073         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1074             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1075                 self._repack_writes(num_retries)
1076             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1077
1078         if sync:
1079             to_delete = set()
1080             for s in self._segments:
1081                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1082                 if bb:
1083                     if bb.state() != _BufferBlock.COMMITTED:
1084                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1085                     to_delete.add(s.locator)
1086                     s.locator = bb.locator()
1087             for s in to_delete:
1088                self.parent._my_block_manager().delete_bufferblock(s)
1089
1090         self.parent.notify(MOD, self.parent, self.name, (self, self))
1091
1092     @must_be_writable
1093     @synchronized
1094     def add_segment(self, blocks, pos, size):
1095         """Add a segment to the end of the file.
1096
1097         `pos` and `offset` reference a section of the stream described by
1098         `blocks` (a list of Range objects)
1099
1100         """
1101         self._add_segment(blocks, pos, size)
1102
1103     def _add_segment(self, blocks, pos, size):
1104         """Internal implementation of add_segment."""
1105         self.set_committed(False)
1106         for lr in locators_and_ranges(blocks, pos, size):
1107             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1108             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1109             self._segments.append(r)
1110
1111     @synchronized
1112     def size(self):
1113         """Get the file size."""
1114         if self._segments:
1115             n = self._segments[-1]
1116             return n.range_start + n.range_size
1117         else:
1118             return 0
1119
1120     @synchronized
1121     def manifest_text(self, stream_name=".", portable_locators=False,
1122                       normalize=False, only_committed=False):
1123         buf = ""
1124         filestream = []
1125         for segment in self.segments:
1126             loc = segment.locator
1127             if self.parent._my_block_manager().is_bufferblock(loc):
1128                 if only_committed:
1129                     continue
1130                 loc = self._bufferblocks[loc].calculate_locator()
1131             if portable_locators:
1132                 loc = KeepLocator(loc).stripped()
1133             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1134                                  segment.segment_offset, segment.range_size))
1135         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1136         buf += "\n"
1137         return buf
1138
1139     @must_be_writable
1140     @synchronized
1141     def _reparent(self, newparent, newname):
1142         self.set_committed(False)
1143         self.flush(sync=True)
1144         self.parent.remove(self.name)
1145         self.parent = newparent
1146         self.name = newname
1147         self.lock = self.parent.root_collection().lock
1148
1149
1150 class ArvadosFileReader(ArvadosFileReaderBase):
1151     """Wraps ArvadosFile in a file-like object supporting reading only.
1152
1153     Be aware that this class is NOT thread safe as there is no locking around
1154     updating file pointer.
1155
1156     """
1157
1158     def __init__(self, arvadosfile, num_retries=None):
1159         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1160         self.arvadosfile = arvadosfile
1161
1162     def size(self):
1163         return self.arvadosfile.size()
1164
1165     def stream_name(self):
1166         return self.arvadosfile.parent.stream_name()
1167
1168     @_FileLikeObjectBase._before_close
1169     @retry_method
1170     def read(self, size=None, num_retries=None):
1171         """Read up to `size` bytes from the file and return the result.
1172
1173         Starts at the current file position.  If `size` is None, read the
1174         entire remainder of the file.
1175         """
1176         if size is None:
1177             data = []
1178             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1179             while rd:
1180                 data.append(rd)
1181                 self._filepos += len(rd)
1182                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1183             return ''.join(data)
1184         else:
1185             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1186             self._filepos += len(data)
1187             return data
1188
1189     @_FileLikeObjectBase._before_close
1190     @retry_method
1191     def readfrom(self, offset, size, num_retries=None):
1192         """Read up to `size` bytes from the stream, starting at the specified file offset.
1193
1194         This method does not change the file position.
1195         """
1196         return self.arvadosfile.readfrom(offset, size, num_retries)
1197
1198     def flush(self):
1199         pass
1200
1201
1202 class ArvadosFileWriter(ArvadosFileReader):
1203     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1204
1205     Be aware that this class is NOT thread safe as there is no locking around
1206     updating file pointer.
1207
1208     """
1209
1210     def __init__(self, arvadosfile, mode, num_retries=None):
1211         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1212         self.mode = mode
1213         self.arvadosfile.add_writer(self)
1214
1215     @_FileLikeObjectBase._before_close
1216     @retry_method
1217     def write(self, data, num_retries=None):
1218         if self.mode[0] == "a":
1219             self.arvadosfile.writeto(self.size(), data, num_retries)
1220         else:
1221             self.arvadosfile.writeto(self._filepos, data, num_retries)
1222             self._filepos += len(data)
1223         return len(data)
1224
1225     @_FileLikeObjectBase._before_close
1226     @retry_method
1227     def writelines(self, seq, num_retries=None):
1228         for s in seq:
1229             self.write(s, num_retries=num_retries)
1230
1231     @_FileLikeObjectBase._before_close
1232     def truncate(self, size=None):
1233         if size is None:
1234             size = self._filepos
1235         self.arvadosfile.truncate(size)
1236         if self._filepos > self.size():
1237             self._filepos = self.size()
1238
1239     @_FileLikeObjectBase._before_close
1240     def flush(self):
1241         self.arvadosfile.flush()
1242
1243     def close(self, flush=True):
1244         if not self.closed:
1245             self.arvadosfile.remove_writer(self, flush)
1246             super(ArvadosFileWriter, self).close()