Merge branch 'master' into 8019-crunchrun-log-throttle
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41
42 class UnownedBlockError(Exception):
43     """Raised when there's an writable block without an owner on the BlockManager."""
44     pass
45
46
47 class _FileLikeObjectBase(object):
48     def __init__(self, name, mode):
49         self.name = name
50         self.mode = mode
51         self.closed = False
52
53     @staticmethod
54     def _before_close(orig_func):
55         @functools.wraps(orig_func)
56         def before_close_wrapper(self, *args, **kwargs):
57             if self.closed:
58                 raise ValueError("I/O operation on closed stream file")
59             return orig_func(self, *args, **kwargs)
60         return before_close_wrapper
61
62     def __enter__(self):
63         return self
64
65     def __exit__(self, exc_type, exc_value, traceback):
66         try:
67             self.close()
68         except Exception:
69             if exc_type is None:
70                 raise
71
72     def close(self):
73         self.closed = True
74
75
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77     def __init__(self, name, mode, num_retries=None):
78         super(ArvadosFileReaderBase, self).__init__(name, mode)
79         self._filepos = 0L
80         self.num_retries = num_retries
81         self._readline_cache = (None, None)
82
83     def __iter__(self):
84         while True:
85             data = self.readline()
86             if not data:
87                 break
88             yield data
89
90     def decompressed_name(self):
91         return re.sub('\.(bz2|gz)$', '', self.name)
92
93     @_FileLikeObjectBase._before_close
94     def seek(self, pos, whence=os.SEEK_SET):
95         if whence == os.SEEK_CUR:
96             pos += self._filepos
97         elif whence == os.SEEK_END:
98             pos += self.size()
99         if pos < 0L:
100             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
101         self._filepos = pos
102         return self._filepos
103
104     def tell(self):
105         return self._filepos
106
107     def readable(self):
108         return True
109
110     def writable(self):
111         return False
112
113     def seekable(self):
114         return True
115
116     @_FileLikeObjectBase._before_close
117     @retry_method
118     def readall(self, size=2**20, num_retries=None):
119         while True:
120             data = self.read(size, num_retries=num_retries)
121             if data == '':
122                 break
123             yield data
124
125     @_FileLikeObjectBase._before_close
126     @retry_method
127     def readline(self, size=float('inf'), num_retries=None):
128         cache_pos, cache_data = self._readline_cache
129         if self.tell() == cache_pos:
130             data = [cache_data]
131             self._filepos += len(cache_data)
132         else:
133             data = ['']
134         data_size = len(data[-1])
135         while (data_size < size) and ('\n' not in data[-1]):
136             next_read = self.read(2 ** 20, num_retries=num_retries)
137             if not next_read:
138                 break
139             data.append(next_read)
140             data_size += len(next_read)
141         data = ''.join(data)
142         try:
143             nextline_index = data.index('\n') + 1
144         except ValueError:
145             nextline_index = len(data)
146         nextline_index = min(nextline_index, size)
147         self._filepos -= len(data) - nextline_index
148         self._readline_cache = (self.tell(), data[nextline_index:])
149         return data[:nextline_index]
150
151     @_FileLikeObjectBase._before_close
152     @retry_method
153     def decompress(self, decompress, size, num_retries=None):
154         for segment in self.readall(size, num_retries=num_retries):
155             data = decompress(segment)
156             if data:
157                 yield data
158
159     @_FileLikeObjectBase._before_close
160     @retry_method
161     def readall_decompressed(self, size=2**20, num_retries=None):
162         self.seek(0)
163         if self.name.endswith('.bz2'):
164             dc = bz2.BZ2Decompressor()
165             return self.decompress(dc.decompress, size,
166                                    num_retries=num_retries)
167         elif self.name.endswith('.gz'):
168             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
169             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
170                                    size, num_retries=num_retries)
171         else:
172             return self.readall(size, num_retries=num_retries)
173
174     @_FileLikeObjectBase._before_close
175     @retry_method
176     def readlines(self, sizehint=float('inf'), num_retries=None):
177         data = []
178         data_size = 0
179         for s in self.readall(num_retries=num_retries):
180             data.append(s)
181             data_size += len(s)
182             if data_size >= sizehint:
183                 break
184         return ''.join(data).splitlines(True)
185
186     def size(self):
187         raise IOError(errno.ENOSYS, "Not implemented")
188
189     def read(self, size, num_retries=None):
190         raise IOError(errno.ENOSYS, "Not implemented")
191
192     def readfrom(self, start, size, num_retries=None):
193         raise IOError(errno.ENOSYS, "Not implemented")
194
195
196 class StreamFileReader(ArvadosFileReaderBase):
197     class _NameAttribute(str):
198         # The Python file API provides a plain .name attribute.
199         # Older SDK provided a name() method.
200         # This class provides both, for maximum compatibility.
201         def __call__(self):
202             return self
203
204     def __init__(self, stream, segments, name):
205         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
206         self._stream = stream
207         self.segments = segments
208
209     def stream_name(self):
210         return self._stream.name()
211
212     def size(self):
213         n = self.segments[-1]
214         return n.range_start + n.range_size
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def read(self, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at the current file position"""
220         if size == 0:
221             return ''
222
223         data = ''
224         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
225         if available_chunks:
226             lr = available_chunks[0]
227             data = self._stream.readfrom(lr.locator+lr.segment_offset,
228                                           lr.segment_size,
229                                           num_retries=num_retries)
230
231         self._filepos += len(data)
232         return data
233
234     @_FileLikeObjectBase._before_close
235     @retry_method
236     def readfrom(self, start, size, num_retries=None):
237         """Read up to 'size' bytes from the stream, starting at 'start'"""
238         if size == 0:
239             return ''
240
241         data = []
242         for lr in locators_and_ranges(self.segments, start, size):
243             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
244                                               num_retries=num_retries))
245         return ''.join(data)
246
247     def as_manifest(self):
248         segs = []
249         for r in self.segments:
250             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
251         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
252
253
254 def synchronized(orig_func):
255     @functools.wraps(orig_func)
256     def synchronized_wrapper(self, *args, **kwargs):
257         with self.lock:
258             return orig_func(self, *args, **kwargs)
259     return synchronized_wrapper
260
261
262 class StateChangeError(Exception):
263     def __init__(self, message, state, nextstate):
264         super(StateChangeError, self).__init__(message)
265         self.state = state
266         self.nextstate = nextstate
267
268 class _BufferBlock(object):
269     """A stand-in for a Keep block that is in the process of being written.
270
271     Writers can append to it, get the size, and compute the Keep locator.
272     There are three valid states:
273
274     WRITABLE
275       Can append to block.
276
277     PENDING
278       Block is in the process of being uploaded to Keep, append is an error.
279
280     COMMITTED
281       The block has been written to Keep, its internal buffer has been
282       released, fetching the block will fetch it via keep client (since we
283       discarded the internal copy), and identifiers referring to the BufferBlock
284       can be replaced with the block locator.
285
286     """
287
288     WRITABLE = 0
289     PENDING = 1
290     COMMITTED = 2
291     ERROR = 3
292
293     def __init__(self, blockid, starting_capacity, owner):
294         """
295         :blockid:
296           the identifier for this block
297
298         :starting_capacity:
299           the initial buffer capacity
300
301         :owner:
302           ArvadosFile that owns this block
303
304         """
305         self.blockid = blockid
306         self.buffer_block = bytearray(starting_capacity)
307         self.buffer_view = memoryview(self.buffer_block)
308         self.write_pointer = 0
309         self._state = _BufferBlock.WRITABLE
310         self._locator = None
311         self.owner = owner
312         self.lock = threading.Lock()
313         self.wait_for_commit = threading.Event()
314         self.error = None
315
316     @synchronized
317     def append(self, data):
318         """Append some data to the buffer.
319
320         Only valid if the block is in WRITABLE state.  Implements an expanding
321         buffer, doubling capacity as needed to accomdate all the data.
322
323         """
324         if self._state == _BufferBlock.WRITABLE:
325             while (self.write_pointer+len(data)) > len(self.buffer_block):
326                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
327                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
328                 self.buffer_block = new_buffer_block
329                 self.buffer_view = memoryview(self.buffer_block)
330             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
331             self.write_pointer += len(data)
332             self._locator = None
333         else:
334             raise AssertionError("Buffer block is not writable")
335
336     STATE_TRANSITIONS = frozenset([
337             (WRITABLE, PENDING),
338             (PENDING, COMMITTED),
339             (PENDING, ERROR),
340             (ERROR, PENDING)])
341
342     @synchronized
343     def set_state(self, nextstate, val=None):
344         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
345             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
346         self._state = nextstate
347
348         if self._state == _BufferBlock.PENDING:
349             self.wait_for_commit.clear()
350
351         if self._state == _BufferBlock.COMMITTED:
352             self._locator = val
353             self.buffer_view = None
354             self.buffer_block = None
355             self.wait_for_commit.set()
356
357         if self._state == _BufferBlock.ERROR:
358             self.error = val
359             self.wait_for_commit.set()
360
361     @synchronized
362     def state(self):
363         return self._state
364
365     def size(self):
366         """The amount of data written to the buffer."""
367         return self.write_pointer
368
369     @synchronized
370     def locator(self):
371         """The Keep locator for this buffer's contents."""
372         if self._locator is None:
373             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
374         return self._locator
375
376     @synchronized
377     def clone(self, new_blockid, owner):
378         if self._state == _BufferBlock.COMMITTED:
379             raise AssertionError("Cannot duplicate committed buffer block")
380         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
381         bufferblock.append(self.buffer_view[0:self.size()])
382         return bufferblock
383
384     @synchronized
385     def clear(self):
386         self.owner = None
387         self.buffer_block = None
388         self.buffer_view = None
389
390
391 class NoopLock(object):
392     def __enter__(self):
393         return self
394
395     def __exit__(self, exc_type, exc_value, traceback):
396         pass
397
398     def acquire(self, blocking=False):
399         pass
400
401     def release(self):
402         pass
403
404
405 def must_be_writable(orig_func):
406     @functools.wraps(orig_func)
407     def must_be_writable_wrapper(self, *args, **kwargs):
408         if not self.writable():
409             raise IOError(errno.EROFS, "Collection is read-only.")
410         return orig_func(self, *args, **kwargs)
411     return must_be_writable_wrapper
412
413
414 class _BlockManager(object):
415     """BlockManager handles buffer blocks.
416
417     Also handles background block uploads, and background block prefetch for a
418     Collection of ArvadosFiles.
419
420     """
421
422     DEFAULT_PUT_THREADS = 2
423     DEFAULT_GET_THREADS = 2
424
425     def __init__(self, keep, copies=None, put_threads=None):
426         """keep: KeepClient object to use"""
427         self._keep = keep
428         self._bufferblocks = collections.OrderedDict()
429         self._put_queue = None
430         self._put_threads = None
431         self._prefetch_queue = None
432         self._prefetch_threads = None
433         self.lock = threading.Lock()
434         self.prefetch_enabled = True
435         if put_threads:
436             self.num_put_threads = put_threads
437         else:
438             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
439         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
440         self.copies = copies
441         self._pending_write_size = 0
442         self.threads_lock = threading.Lock()
443         self.padding_block = None
444
445     @synchronized
446     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
447         """Allocate a new, empty bufferblock in WRITABLE state and return it.
448
449         :blockid:
450           optional block identifier, otherwise one will be automatically assigned
451
452         :starting_capacity:
453           optional capacity, otherwise will use default capacity
454
455         :owner:
456           ArvadosFile that owns this block
457
458         """
459         return self._alloc_bufferblock(blockid, starting_capacity, owner)
460
461     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
462         if blockid is None:
463             blockid = "%s" % uuid.uuid4()
464         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
465         self._bufferblocks[bufferblock.blockid] = bufferblock
466         return bufferblock
467
468     @synchronized
469     def dup_block(self, block, owner):
470         """Create a new bufferblock initialized with the content of an existing bufferblock.
471
472         :block:
473           the buffer block to copy.
474
475         :owner:
476           ArvadosFile that owns the new block
477
478         """
479         new_blockid = "bufferblock%i" % len(self._bufferblocks)
480         bufferblock = block.clone(new_blockid, owner)
481         self._bufferblocks[bufferblock.blockid] = bufferblock
482         return bufferblock
483
484     @synchronized
485     def is_bufferblock(self, locator):
486         return locator in self._bufferblocks
487
488     def _commit_bufferblock_worker(self):
489         """Background uploader thread."""
490
491         while True:
492             try:
493                 bufferblock = self._put_queue.get()
494                 if bufferblock is None:
495                     return
496
497                 if self.copies is None:
498                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
499                 else:
500                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
501                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
502
503             except Exception as e:
504                 bufferblock.set_state(_BufferBlock.ERROR, e)
505             finally:
506                 if self._put_queue is not None:
507                     self._put_queue.task_done()
508
509     def start_put_threads(self):
510         with self.threads_lock:
511             if self._put_threads is None:
512                 # Start uploader threads.
513
514                 # If we don't limit the Queue size, the upload queue can quickly
515                 # grow to take up gigabytes of RAM if the writing process is
516                 # generating data more quickly than it can be send to the Keep
517                 # servers.
518                 #
519                 # With two upload threads and a queue size of 2, this means up to 4
520                 # blocks pending.  If they are full 64 MiB blocks, that means up to
521                 # 256 MiB of internal buffering, which is the same size as the
522                 # default download block cache in KeepClient.
523                 self._put_queue = Queue.Queue(maxsize=2)
524
525                 self._put_threads = []
526                 for i in xrange(0, self.num_put_threads):
527                     thread = threading.Thread(target=self._commit_bufferblock_worker)
528                     self._put_threads.append(thread)
529                     thread.daemon = True
530                     thread.start()
531
532     def _block_prefetch_worker(self):
533         """The background downloader thread."""
534         while True:
535             try:
536                 b = self._prefetch_queue.get()
537                 if b is None:
538                     return
539                 self._keep.get(b)
540             except Exception:
541                 _logger.exception("Exception doing block prefetch")
542
543     @synchronized
544     def start_get_threads(self):
545         if self._prefetch_threads is None:
546             self._prefetch_queue = Queue.Queue()
547             self._prefetch_threads = []
548             for i in xrange(0, self.num_get_threads):
549                 thread = threading.Thread(target=self._block_prefetch_worker)
550                 self._prefetch_threads.append(thread)
551                 thread.daemon = True
552                 thread.start()
553
554
555     @synchronized
556     def stop_threads(self):
557         """Shut down and wait for background upload and download threads to finish."""
558
559         if self._put_threads is not None:
560             for t in self._put_threads:
561                 self._put_queue.put(None)
562             for t in self._put_threads:
563                 t.join()
564         self._put_threads = None
565         self._put_queue = None
566
567         if self._prefetch_threads is not None:
568             for t in self._prefetch_threads:
569                 self._prefetch_queue.put(None)
570             for t in self._prefetch_threads:
571                 t.join()
572         self._prefetch_threads = None
573         self._prefetch_queue = None
574
575     def __enter__(self):
576         return self
577
578     def __exit__(self, exc_type, exc_value, traceback):
579         self.stop_threads()
580
581     @synchronized
582     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
583         """Packs small blocks together before uploading"""
584         self._pending_write_size += closed_file_size
585
586         # Check if there are enough small blocks for filling up one in full
587         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
588
589             # Search blocks ready for getting packed together before being committed to Keep.
590             # A WRITABLE block always has an owner.
591             # A WRITABLE block with its owner.closed() implies that it's
592             # size is <= KEEP_BLOCK_SIZE/2.
593             try:
594                 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
595             except AttributeError:
596                 # Writable blocks without owner shouldn't exist.
597                 raise UnownedBlockError()
598
599             if len(small_blocks) <= 1:
600                 # Not enough small blocks for repacking
601                 return
602
603             # Update the pending write size count with its true value, just in case
604             # some small file was opened, written and closed several times.
605             self._pending_write_size = sum([b.size() for b in small_blocks])
606             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
607                 return
608
609             new_bb = self._alloc_bufferblock()
610             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
611                 bb = small_blocks.pop(0)
612                 arvfile = bb.owner
613                 self._pending_write_size -= bb.size()
614                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
615                 arvfile.set_segments([Range(new_bb.blockid,
616                                             0,
617                                             bb.size(),
618                                             new_bb.write_pointer - bb.size())])
619                 self._delete_bufferblock(bb.blockid)
620             self.commit_bufferblock(new_bb, sync=sync)
621
622     def commit_bufferblock(self, block, sync):
623         """Initiate a background upload of a bufferblock.
624
625         :block:
626           The block object to upload
627
628         :sync:
629           If `sync` is True, upload the block synchronously.
630           If `sync` is False, upload the block asynchronously.  This will
631           return immediately unless the upload queue is at capacity, in
632           which case it will wait on an upload queue slot.
633
634         """
635         try:
636             # Mark the block as PENDING so to disallow any more appends.
637             block.set_state(_BufferBlock.PENDING)
638         except StateChangeError as e:
639             if e.state == _BufferBlock.PENDING:
640                 if sync:
641                     block.wait_for_commit.wait()
642                 else:
643                     return
644             if block.state() == _BufferBlock.COMMITTED:
645                 return
646             elif block.state() == _BufferBlock.ERROR:
647                 raise block.error
648             else:
649                 raise
650
651         if sync:
652             try:
653                 if self.copies is None:
654                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
655                 else:
656                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
657                 block.set_state(_BufferBlock.COMMITTED, loc)
658             except Exception as e:
659                 block.set_state(_BufferBlock.ERROR, e)
660                 raise
661         else:
662             self.start_put_threads()
663             self._put_queue.put(block)
664
665     @synchronized
666     def get_bufferblock(self, locator):
667         return self._bufferblocks.get(locator)
668
669     @synchronized
670     def get_padding_block(self):
671         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
672         when using truncate() to extend the size of a file.
673
674         For reference (and possible future optimization), the md5sum of the
675         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
676
677         """
678
679         if self.padding_block is None:
680             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
681             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
682             self.commit_bufferblock(self.padding_block, False)
683         return self.padding_block
684
685     @synchronized
686     def delete_bufferblock(self, locator):
687         self._delete_bufferblock(locator)
688
689     def _delete_bufferblock(self, locator):
690         bb = self._bufferblocks[locator]
691         bb.clear()
692         del self._bufferblocks[locator]
693
694     def get_block_contents(self, locator, num_retries, cache_only=False):
695         """Fetch a block.
696
697         First checks to see if the locator is a BufferBlock and return that, if
698         not, passes the request through to KeepClient.get().
699
700         """
701         with self.lock:
702             if locator in self._bufferblocks:
703                 bufferblock = self._bufferblocks[locator]
704                 if bufferblock.state() != _BufferBlock.COMMITTED:
705                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
706                 else:
707                     locator = bufferblock._locator
708         if cache_only:
709             return self._keep.get_from_cache(locator)
710         else:
711             return self._keep.get(locator, num_retries=num_retries)
712
713     def commit_all(self):
714         """Commit all outstanding buffer blocks.
715
716         This is a synchronous call, and will not return until all buffer blocks
717         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
718
719         """
720         self.repack_small_blocks(force=True, sync=True)
721
722         with self.lock:
723             items = self._bufferblocks.items()
724
725         for k,v in items:
726             if v.state() != _BufferBlock.COMMITTED and v.owner:
727                 v.owner.flush(sync=False)
728
729         with self.lock:
730             if self._put_queue is not None:
731                 self._put_queue.join()
732
733                 err = []
734                 for k,v in items:
735                     if v.state() == _BufferBlock.ERROR:
736                         err.append((v.locator(), v.error))
737                 if err:
738                     raise KeepWriteError("Error writing some blocks", err, label="block")
739
740         for k,v in items:
741             # flush again with sync=True to remove committed bufferblocks from
742             # the segments.
743             if v.owner:
744                 v.owner.flush(sync=True)
745
746     def block_prefetch(self, locator):
747         """Initiate a background download of a block.
748
749         This assumes that the underlying KeepClient implements a block cache,
750         so repeated requests for the same block will not result in repeated
751         downloads (unless the block is evicted from the cache.)  This method
752         does not block.
753
754         """
755
756         if not self.prefetch_enabled:
757             return
758
759         if self._keep.get_from_cache(locator) is not None:
760             return
761
762         with self.lock:
763             if locator in self._bufferblocks:
764                 return
765
766         self.start_get_threads()
767         self._prefetch_queue.put(locator)
768
769
770 class ArvadosFile(object):
771     """Represent a file in a Collection.
772
773     ArvadosFile manages the underlying representation of a file in Keep as a
774     sequence of segments spanning a set of blocks, and implements random
775     read/write access.
776
777     This object may be accessed from multiple threads.
778
779     """
780
781     def __init__(self, parent, name, stream=[], segments=[]):
782         """
783         ArvadosFile constructor.
784
785         :stream:
786           a list of Range objects representing a block stream
787
788         :segments:
789           a list of Range objects representing segments
790         """
791         self.parent = parent
792         self.name = name
793         self._writers = set()
794         self._committed = False
795         self._segments = []
796         self.lock = parent.root_collection().lock
797         for s in segments:
798             self._add_segment(stream, s.locator, s.range_size)
799         self._current_bblock = None
800
801     def writable(self):
802         return self.parent.writable()
803
804     @synchronized
805     def permission_expired(self, as_of_dt=None):
806         """Returns True if any of the segment's locators is expired"""
807         for r in self._segments:
808             if KeepLocator(r.locator).permission_expired(as_of_dt):
809                 return True
810         return False
811
812     @synchronized
813     def segments(self):
814         return copy.copy(self._segments)
815
816     @synchronized
817     def clone(self, new_parent, new_name):
818         """Make a copy of this file."""
819         cp = ArvadosFile(new_parent, new_name)
820         cp.replace_contents(self)
821         return cp
822
823     @must_be_writable
824     @synchronized
825     def replace_contents(self, other):
826         """Replace segments of this file with segments from another `ArvadosFile` object."""
827
828         map_loc = {}
829         self._segments = []
830         for other_segment in other.segments():
831             new_loc = other_segment.locator
832             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
833                 if other_segment.locator not in map_loc:
834                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
835                     if bufferblock.state() != _BufferBlock.WRITABLE:
836                         map_loc[other_segment.locator] = bufferblock.locator()
837                     else:
838                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
839                 new_loc = map_loc[other_segment.locator]
840
841             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
842
843         self.set_committed(False)
844
845     def __eq__(self, other):
846         if other is self:
847             return True
848         if not isinstance(other, ArvadosFile):
849             return False
850
851         othersegs = other.segments()
852         with self.lock:
853             if len(self._segments) != len(othersegs):
854                 return False
855             for i in xrange(0, len(othersegs)):
856                 seg1 = self._segments[i]
857                 seg2 = othersegs[i]
858                 loc1 = seg1.locator
859                 loc2 = seg2.locator
860
861                 if self.parent._my_block_manager().is_bufferblock(loc1):
862                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
863
864                 if other.parent._my_block_manager().is_bufferblock(loc2):
865                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
866
867                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
868                     seg1.range_start != seg2.range_start or
869                     seg1.range_size != seg2.range_size or
870                     seg1.segment_offset != seg2.segment_offset):
871                     return False
872
873         return True
874
875     def __ne__(self, other):
876         return not self.__eq__(other)
877
878     @synchronized
879     def set_segments(self, segs):
880         self._segments = segs
881
882     @synchronized
883     def set_committed(self, value=True):
884         """Set committed flag.
885
886         If value is True, set committed to be True.
887
888         If value is False, set committed to be False for this and all parents.
889         """
890         if value == self._committed:
891             return
892         self._committed = value
893         if self._committed is False and self.parent is not None:
894             self.parent.set_committed(False)
895
896     @synchronized
897     def committed(self):
898         """Get whether this is committed or not."""
899         return self._committed
900
901     @synchronized
902     def add_writer(self, writer):
903         """Add an ArvadosFileWriter reference to the list of writers"""
904         if isinstance(writer, ArvadosFileWriter):
905             self._writers.add(writer)
906
907     @synchronized
908     def remove_writer(self, writer, flush):
909         """
910         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
911         and do some block maintenance tasks.
912         """
913         self._writers.remove(writer)
914
915         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
916             # File writer closed, not small enough for repacking
917             self.flush()
918         elif self.closed():
919             # All writers closed and size is adequate for repacking
920             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
921
922     def closed(self):
923         """
924         Get whether this is closed or not. When the writers list is empty, the file
925         is supposed to be closed.
926         """
927         return len(self._writers) == 0
928
929     @must_be_writable
930     @synchronized
931     def truncate(self, size):
932         """Shrink or expand the size of the file.
933
934         If `size` is less than the size of the file, the file contents after
935         `size` will be discarded.  If `size` is greater than the current size
936         of the file, it will be filled with zero bytes.
937
938         """
939         if size < self.size():
940             new_segs = []
941             for r in self._segments:
942                 range_end = r.range_start+r.range_size
943                 if r.range_start >= size:
944                     # segment is past the trucate size, all done
945                     break
946                 elif size < range_end:
947                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
948                     nr.segment_offset = r.segment_offset
949                     new_segs.append(nr)
950                     break
951                 else:
952                     new_segs.append(r)
953
954             self._segments = new_segs
955             self.set_committed(False)
956         elif size > self.size():
957             padding = self.parent._my_block_manager().get_padding_block()
958             diff = size - self.size()
959             while diff > config.KEEP_BLOCK_SIZE:
960                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
961                 diff -= config.KEEP_BLOCK_SIZE
962             if diff > 0:
963                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
964             self.set_committed(False)
965         else:
966             # size == self.size()
967             pass
968
969     def readfrom(self, offset, size, num_retries, exact=False):
970         """Read up to `size` bytes from the file starting at `offset`.
971
972         :exact:
973          If False (default), return less data than requested if the read
974          crosses a block boundary and the next block isn't cached.  If True,
975          only return less data than requested when hitting EOF.
976         """
977
978         with self.lock:
979             if size == 0 or offset >= self.size():
980                 return ''
981             readsegs = locators_and_ranges(self._segments, offset, size)
982             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
983
984         locs = set()
985         data = []
986         for lr in readsegs:
987             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
988             if block:
989                 blockview = memoryview(block)
990                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
991                 locs.add(lr.locator)
992             else:
993                 break
994
995         for lr in prefetch:
996             if lr.locator not in locs:
997                 self.parent._my_block_manager().block_prefetch(lr.locator)
998                 locs.add(lr.locator)
999
1000         return ''.join(data)
1001
1002     def _repack_writes(self, num_retries):
1003         """Optimize buffer block by repacking segments in file sequence.
1004
1005         When the client makes random writes, they appear in the buffer block in
1006         the sequence they were written rather than the sequence they appear in
1007         the file.  This makes for inefficient, fragmented manifests.  Attempt
1008         to optimize by repacking writes in file sequence.
1009
1010         """
1011         segs = self._segments
1012
1013         # Collect the segments that reference the buffer block.
1014         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1015
1016         # Collect total data referenced by segments (could be smaller than
1017         # bufferblock size if a portion of the file was written and
1018         # then overwritten).
1019         write_total = sum([s.range_size for s in bufferblock_segs])
1020
1021         if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
1022             # If there's more than one segment referencing this block, it is
1023             # due to out-of-order writes and will produce a fragmented
1024             # manifest, so try to optimize by re-packing into a new buffer.
1025             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1026             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1027             for t in bufferblock_segs:
1028                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1029                 t.segment_offset = new_bb.size() - t.range_size
1030
1031             self._current_bblock = new_bb
1032
1033     @must_be_writable
1034     @synchronized
1035     def writeto(self, offset, data, num_retries):
1036         """Write `data` to the file starting at `offset`.
1037
1038         This will update existing bytes and/or extend the size of the file as
1039         necessary.
1040
1041         """
1042         if len(data) == 0:
1043             return
1044
1045         if offset > self.size():
1046             self.truncate(offset)
1047
1048         if len(data) > config.KEEP_BLOCK_SIZE:
1049             # Chunk it up into smaller writes
1050             n = 0
1051             dataview = memoryview(data)
1052             while n < len(data):
1053                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1054                 n += config.KEEP_BLOCK_SIZE
1055             return
1056
1057         self.set_committed(False)
1058
1059         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1060             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1061
1062         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1063             self._repack_writes(num_retries)
1064             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1065                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1066                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1067
1068         self._current_bblock.append(data)
1069
1070         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1071
1072         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1073
1074         return len(data)
1075
1076     @synchronized
1077     def flush(self, sync=True, num_retries=0):
1078         """Flush the current bufferblock to Keep.
1079
1080         :sync:
1081           If True, commit block synchronously, wait until buffer block has been written.
1082           If False, commit block asynchronously, return immediately after putting block into
1083           the keep put queue.
1084         """
1085         if self.committed():
1086             return
1087
1088         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1089             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1090                 self._repack_writes(num_retries)
1091             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1092
1093         if sync:
1094             to_delete = set()
1095             for s in self._segments:
1096                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1097                 if bb:
1098                     if bb.state() != _BufferBlock.COMMITTED:
1099                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1100                     to_delete.add(s.locator)
1101                     s.locator = bb.locator()
1102             for s in to_delete:
1103                self.parent._my_block_manager().delete_bufferblock(s)
1104
1105         self.parent.notify(MOD, self.parent, self.name, (self, self))
1106
1107     @must_be_writable
1108     @synchronized
1109     def add_segment(self, blocks, pos, size):
1110         """Add a segment to the end of the file.
1111
1112         `pos` and `offset` reference a section of the stream described by
1113         `blocks` (a list of Range objects)
1114
1115         """
1116         self._add_segment(blocks, pos, size)
1117
1118     def _add_segment(self, blocks, pos, size):
1119         """Internal implementation of add_segment."""
1120         self.set_committed(False)
1121         for lr in locators_and_ranges(blocks, pos, size):
1122             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1123             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1124             self._segments.append(r)
1125
1126     @synchronized
1127     def size(self):
1128         """Get the file size."""
1129         if self._segments:
1130             n = self._segments[-1]
1131             return n.range_start + n.range_size
1132         else:
1133             return 0
1134
1135     @synchronized
1136     def manifest_text(self, stream_name=".", portable_locators=False,
1137                       normalize=False, only_committed=False):
1138         buf = ""
1139         filestream = []
1140         for segment in self.segments:
1141             loc = segment.locator
1142             if self.parent._my_block_manager().is_bufferblock(loc):
1143                 if only_committed:
1144                     continue
1145                 loc = self._bufferblocks[loc].calculate_locator()
1146             if portable_locators:
1147                 loc = KeepLocator(loc).stripped()
1148             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1149                                  segment.segment_offset, segment.range_size))
1150         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1151         buf += "\n"
1152         return buf
1153
1154     @must_be_writable
1155     @synchronized
1156     def _reparent(self, newparent, newname):
1157         self.set_committed(False)
1158         self.flush(sync=True)
1159         self.parent.remove(self.name)
1160         self.parent = newparent
1161         self.name = newname
1162         self.lock = self.parent.root_collection().lock
1163
1164
1165 class ArvadosFileReader(ArvadosFileReaderBase):
1166     """Wraps ArvadosFile in a file-like object supporting reading only.
1167
1168     Be aware that this class is NOT thread safe as there is no locking around
1169     updating file pointer.
1170
1171     """
1172
1173     def __init__(self, arvadosfile, num_retries=None):
1174         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1175         self.arvadosfile = arvadosfile
1176
1177     def size(self):
1178         return self.arvadosfile.size()
1179
1180     def stream_name(self):
1181         return self.arvadosfile.parent.stream_name()
1182
1183     @_FileLikeObjectBase._before_close
1184     @retry_method
1185     def read(self, size=None, num_retries=None):
1186         """Read up to `size` bytes from the file and return the result.
1187
1188         Starts at the current file position.  If `size` is None, read the
1189         entire remainder of the file.
1190         """
1191         if size is None:
1192             data = []
1193             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1194             while rd:
1195                 data.append(rd)
1196                 self._filepos += len(rd)
1197                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1198             return ''.join(data)
1199         else:
1200             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1201             self._filepos += len(data)
1202             return data
1203
1204     @_FileLikeObjectBase._before_close
1205     @retry_method
1206     def readfrom(self, offset, size, num_retries=None):
1207         """Read up to `size` bytes from the stream, starting at the specified file offset.
1208
1209         This method does not change the file position.
1210         """
1211         return self.arvadosfile.readfrom(offset, size, num_retries)
1212
1213     def flush(self):
1214         pass
1215
1216
1217 class ArvadosFileWriter(ArvadosFileReader):
1218     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1219
1220     Be aware that this class is NOT thread safe as there is no locking around
1221     updating file pointer.
1222
1223     """
1224
1225     def __init__(self, arvadosfile, mode, num_retries=None):
1226         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1227         self.mode = mode
1228         self.arvadosfile.add_writer(self)
1229
1230     def writable(self):
1231         return True
1232
1233     @_FileLikeObjectBase._before_close
1234     @retry_method
1235     def write(self, data, num_retries=None):
1236         if self.mode[0] == "a":
1237             self.arvadosfile.writeto(self.size(), data, num_retries)
1238         else:
1239             self.arvadosfile.writeto(self._filepos, data, num_retries)
1240             self._filepos += len(data)
1241         return len(data)
1242
1243     @_FileLikeObjectBase._before_close
1244     @retry_method
1245     def writelines(self, seq, num_retries=None):
1246         for s in seq:
1247             self.write(s, num_retries=num_retries)
1248
1249     @_FileLikeObjectBase._before_close
1250     def truncate(self, size=None):
1251         if size is None:
1252             size = self._filepos
1253         self.arvadosfile.truncate(size)
1254
1255     @_FileLikeObjectBase._before_close
1256     def flush(self):
1257         self.arvadosfile.flush()
1258
1259     def close(self, flush=True):
1260         if not self.closed:
1261             self.arvadosfile.remove_writer(self, flush)
1262             super(ArvadosFileWriter, self).close()