3198: Start using BlockManager. Needs tests.
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5
6 from collections import deque
7 from stat import *
8
9 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
10 from keep import *
11 from .stream import StreamReader, normalize_stream, locator_block_size
12 from .ranges import Range, LocatorAndRange
13 import config
14 import errors
15 import util
16
17 _logger = logging.getLogger('arvados.collection')
18
19 class CollectionBase(object):
20     def __enter__(self):
21         return self
22
23     def __exit__(self, exc_type, exc_value, traceback):
24         pass
25
26     def _my_keep(self):
27         if self._keep_client is None:
28             self._keep_client = KeepClient(api_client=self._api_client,
29                                            num_retries=self.num_retries)
30         return self._keep_client
31
32     def stripped_manifest(self):
33         """
34         Return the manifest for the current collection with all
35         non-portable hints (i.e., permission signatures and other
36         hints other than size hints) removed from the locators.
37         """
38         raw = self.manifest_text()
39         clean = []
40         for line in raw.split("\n"):
41             fields = line.split()
42             if fields:
43                 clean_fields = fields[:1] + [
44                     (re.sub(r'\+[^\d][^\+]*', '', x)
45                      if re.match(util.keep_locator_pattern, x)
46                      else x)
47                     for x in fields[1:]]
48                 clean += [' '.join(clean_fields), "\n"]
49         return ''.join(clean)
50
51
52 class CollectionReader(CollectionBase):
53     def __init__(self, manifest_locator_or_text, api_client=None,
54                  keep_client=None, num_retries=0):
55         """Instantiate a CollectionReader.
56
57         This class parses Collection manifests to provide a simple interface
58         to read its underlying files.
59
60         Arguments:
61         * manifest_locator_or_text: One of a Collection UUID, portable data
62           hash, or full manifest text.
63         * api_client: The API client to use to look up Collections.  If not
64           provided, CollectionReader will build one from available Arvados
65           configuration.
66         * keep_client: The KeepClient to use to download Collection data.
67           If not provided, CollectionReader will build one from available
68           Arvados configuration.
69         * num_retries: The default number of times to retry failed
70           service requests.  Default 0.  You may change this value
71           after instantiation, but note those changes may not
72           propagate to related objects like the Keep client.
73         """
74         self._api_client = api_client
75         self._keep_client = keep_client
76         self.num_retries = num_retries
77         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
78             self._manifest_locator = manifest_locator_or_text
79             self._manifest_text = None
80         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
81             self._manifest_locator = manifest_locator_or_text
82             self._manifest_text = None
83         elif re.match(util.manifest_pattern, manifest_locator_or_text):
84             self._manifest_text = manifest_locator_or_text
85             self._manifest_locator = None
86         else:
87             raise errors.ArgumentError(
88                 "Argument to CollectionReader must be a manifest or a collection UUID")
89         self._api_response = None
90         self._streams = None
91
92     def _populate_from_api_server(self):
93         # As in KeepClient itself, we must wait until the last
94         # possible moment to instantiate an API client, in order to
95         # avoid tripping up clients that don't have access to an API
96         # server.  If we do build one, make sure our Keep client uses
97         # it.  If instantiation fails, we'll fall back to the except
98         # clause, just like any other Collection lookup
99         # failure. Return an exception, or None if successful.
100         try:
101             if self._api_client is None:
102                 self._api_client = arvados.api('v1')
103                 self._keep_client = None  # Make a new one with the new api.
104             self._api_response = self._api_client.collections().get(
105                 uuid=self._manifest_locator).execute(
106                 num_retries=self.num_retries)
107             self._manifest_text = self._api_response['manifest_text']
108             return None
109         except Exception as e:
110             return e
111
112     def _populate_from_keep(self):
113         # Retrieve a manifest directly from Keep. This has a chance of
114         # working if [a] the locator includes a permission signature
115         # or [b] the Keep services are operating in world-readable
116         # mode. Return an exception, or None if successful.
117         try:
118             self._manifest_text = self._my_keep().get(
119                 self._manifest_locator, num_retries=self.num_retries)
120         except Exception as e:
121             return e
122
123     def _populate(self):
124         error_via_api = None
125         error_via_keep = None
126         should_try_keep = ((self._manifest_text is None) and
127                            util.keep_locator_pattern.match(
128                 self._manifest_locator))
129         if ((self._manifest_text is None) and
130             util.signed_locator_pattern.match(self._manifest_locator)):
131             error_via_keep = self._populate_from_keep()
132         if self._manifest_text is None:
133             error_via_api = self._populate_from_api_server()
134             if error_via_api is not None and not should_try_keep:
135                 raise error_via_api
136         if ((self._manifest_text is None) and
137             not error_via_keep and
138             should_try_keep):
139             # Looks like a keep locator, and we didn't already try keep above
140             error_via_keep = self._populate_from_keep()
141         if self._manifest_text is None:
142             # Nothing worked!
143             raise arvados.errors.NotFoundError(
144                 ("Failed to retrieve collection '{}' " +
145                  "from either API server ({}) or Keep ({})."
146                  ).format(
147                     self._manifest_locator,
148                     error_via_api,
149                     error_via_keep))
150         self._streams = [sline.split()
151                          for sline in self._manifest_text.split("\n")
152                          if sline]
153
154     def _populate_first(orig_func):
155         # Decorator for methods that read actual Collection data.
156         @functools.wraps(orig_func)
157         def wrapper(self, *args, **kwargs):
158             if self._streams is None:
159                 self._populate()
160             return orig_func(self, *args, **kwargs)
161         return wrapper
162
163     @_populate_first
164     def api_response(self):
165         """api_response() -> dict or None
166
167         Returns information about this Collection fetched from the API server.
168         If the Collection exists in Keep but not the API server, currently
169         returns None.  Future versions may provide a synthetic response.
170         """
171         return self._api_response
172
173     @_populate_first
174     def normalize(self):
175         # Rearrange streams
176         streams = {}
177         for s in self.all_streams():
178             for f in s.all_files():
179                 streamname, filename = split(s.name() + "/" + f.name())
180                 if streamname not in streams:
181                     streams[streamname] = {}
182                 if filename not in streams[streamname]:
183                     streams[streamname][filename] = []
184                 for r in f.segments:
185                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
186
187         self._streams = [normalize_stream(s, streams[s])
188                          for s in sorted(streams)]
189
190         # Regenerate the manifest text based on the normalized streams
191         self._manifest_text = ''.join(
192             [StreamReader(stream, keep=self._my_keep()).manifest_text()
193              for stream in self._streams])
194
195     @_populate_first
196     def open(self, streampath, filename=None):
197         """open(streampath[, filename]) -> file-like object
198
199         Pass in the path of a file to read from the Collection, either as a
200         single string or as two separate stream name and file name arguments.
201         This method returns a file-like object to read that file.
202         """
203         if filename is None:
204             streampath, filename = split(streampath)
205         keep_client = self._my_keep()
206         for stream_s in self._streams:
207             stream = StreamReader(stream_s, keep_client,
208                                   num_retries=self.num_retries)
209             if stream.name() == streampath:
210                 break
211         else:
212             raise ValueError("stream '{}' not found in Collection".
213                              format(streampath))
214         try:
215             return stream.files()[filename]
216         except KeyError:
217             raise ValueError("file '{}' not found in Collection stream '{}'".
218                              format(filename, streampath))
219
220     @_populate_first
221     def all_streams(self):
222         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
223                 for s in self._streams]
224
225     def all_files(self):
226         for s in self.all_streams():
227             for f in s.all_files():
228                 yield f
229
230     @_populate_first
231     def manifest_text(self, strip=False, normalize=False):
232         if normalize:
233             cr = CollectionReader(self.manifest_text())
234             cr.normalize()
235             return cr.manifest_text(strip=strip, normalize=False)
236         elif strip:
237             return self.stripped_manifest()
238         else:
239             return self._manifest_text
240
241
242 class _WriterFile(ArvadosFileBase):
243     def __init__(self, coll_writer, name):
244         super(_WriterFile, self).__init__(name, 'wb')
245         self.dest = coll_writer
246
247     def close(self):
248         super(_WriterFile, self).close()
249         self.dest.finish_current_file()
250
251     @ArvadosFileBase._before_close
252     def write(self, data):
253         self.dest.write(data)
254
255     @ArvadosFileBase._before_close
256     def writelines(self, seq):
257         for data in seq:
258             self.write(data)
259
260     @ArvadosFileBase._before_close
261     def flush(self):
262         self.dest.flush_data()
263
264
265 class CollectionWriter(CollectionBase):
266     def __init__(self, api_client=None, num_retries=0):
267         """Instantiate a CollectionWriter.
268
269         CollectionWriter lets you build a new Arvados Collection from scratch.
270         Write files to it.  The CollectionWriter will upload data to Keep as
271         appropriate, and provide you with the Collection manifest text when
272         you're finished.
273
274         Arguments:
275         * api_client: The API client to use to look up Collections.  If not
276           provided, CollectionReader will build one from available Arvados
277           configuration.
278         * num_retries: The default number of times to retry failed
279           service requests.  Default 0.  You may change this value
280           after instantiation, but note those changes may not
281           propagate to related objects like the Keep client.
282         """
283         self._api_client = api_client
284         self.num_retries = num_retries
285         self._keep_client = None
286         self._data_buffer = []
287         self._data_buffer_len = 0
288         self._current_stream_files = []
289         self._current_stream_length = 0
290         self._current_stream_locators = []
291         self._current_stream_name = '.'
292         self._current_file_name = None
293         self._current_file_pos = 0
294         self._finished_streams = []
295         self._close_file = None
296         self._queued_file = None
297         self._queued_dirents = deque()
298         self._queued_trees = deque()
299         self._last_open = None
300
301     def __exit__(self, exc_type, exc_value, traceback):
302         if exc_type is None:
303             self.finish()
304
305     def do_queued_work(self):
306         # The work queue consists of three pieces:
307         # * _queued_file: The file object we're currently writing to the
308         #   Collection.
309         # * _queued_dirents: Entries under the current directory
310         #   (_queued_trees[0]) that we want to write or recurse through.
311         #   This may contain files from subdirectories if
312         #   max_manifest_depth == 0 for this directory.
313         # * _queued_trees: Directories that should be written as separate
314         #   streams to the Collection.
315         # This function handles the smallest piece of work currently queued
316         # (current file, then current directory, then next directory) until
317         # no work remains.  The _work_THING methods each do a unit of work on
318         # THING.  _queue_THING methods add a THING to the work queue.
319         while True:
320             if self._queued_file:
321                 self._work_file()
322             elif self._queued_dirents:
323                 self._work_dirents()
324             elif self._queued_trees:
325                 self._work_trees()
326             else:
327                 break
328
329     def _work_file(self):
330         while True:
331             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
332             if not buf:
333                 break
334             self.write(buf)
335         self.finish_current_file()
336         if self._close_file:
337             self._queued_file.close()
338         self._close_file = None
339         self._queued_file = None
340
341     def _work_dirents(self):
342         path, stream_name, max_manifest_depth = self._queued_trees[0]
343         if stream_name != self.current_stream_name():
344             self.start_new_stream(stream_name)
345         while self._queued_dirents:
346             dirent = self._queued_dirents.popleft()
347             target = os.path.join(path, dirent)
348             if os.path.isdir(target):
349                 self._queue_tree(target,
350                                  os.path.join(stream_name, dirent),
351                                  max_manifest_depth - 1)
352             else:
353                 self._queue_file(target, dirent)
354                 break
355         if not self._queued_dirents:
356             self._queued_trees.popleft()
357
358     def _work_trees(self):
359         path, stream_name, max_manifest_depth = self._queued_trees[0]
360         d = util.listdir_recursive(
361             path, max_depth = (None if max_manifest_depth == 0 else 0))
362         if d:
363             self._queue_dirents(stream_name, d)
364         else:
365             self._queued_trees.popleft()
366
367     def _queue_file(self, source, filename=None):
368         assert (self._queued_file is None), "tried to queue more than one file"
369         if not hasattr(source, 'read'):
370             source = open(source, 'rb')
371             self._close_file = True
372         else:
373             self._close_file = False
374         if filename is None:
375             filename = os.path.basename(source.name)
376         self.start_new_file(filename)
377         self._queued_file = source
378
379     def _queue_dirents(self, stream_name, dirents):
380         assert (not self._queued_dirents), "tried to queue more than one tree"
381         self._queued_dirents = deque(sorted(dirents))
382
383     def _queue_tree(self, path, stream_name, max_manifest_depth):
384         self._queued_trees.append((path, stream_name, max_manifest_depth))
385
386     def write_file(self, source, filename=None):
387         self._queue_file(source, filename)
388         self.do_queued_work()
389
390     def write_directory_tree(self,
391                              path, stream_name='.', max_manifest_depth=-1):
392         self._queue_tree(path, stream_name, max_manifest_depth)
393         self.do_queued_work()
394
395     def write(self, newdata):
396         if hasattr(newdata, '__iter__'):
397             for s in newdata:
398                 self.write(s)
399             return
400         self._data_buffer.append(newdata)
401         self._data_buffer_len += len(newdata)
402         self._current_stream_length += len(newdata)
403         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
404             self.flush_data()
405
406     def open(self, streampath, filename=None):
407         """open(streampath[, filename]) -> file-like object
408
409         Pass in the path of a file to write to the Collection, either as a
410         single string or as two separate stream name and file name arguments.
411         This method returns a file-like object you can write to add it to the
412         Collection.
413
414         You may only have one file object from the Collection open at a time,
415         so be sure to close the object when you're done.  Using the object in
416         a with statement makes that easy::
417
418           with cwriter.open('./doc/page1.txt') as outfile:
419               outfile.write(page1_data)
420           with cwriter.open('./doc/page2.txt') as outfile:
421               outfile.write(page2_data)
422         """
423         if filename is None:
424             streampath, filename = split(streampath)
425         if self._last_open and not self._last_open.closed:
426             raise errors.AssertionError(
427                 "can't open '{}' when '{}' is still open".format(
428                     filename, self._last_open.name))
429         if streampath != self.current_stream_name():
430             self.start_new_stream(streampath)
431         self.set_current_file_name(filename)
432         self._last_open = _WriterFile(self, filename)
433         return self._last_open
434
435     def flush_data(self):
436         data_buffer = ''.join(self._data_buffer)
437         if data_buffer:
438             self._current_stream_locators.append(
439                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
440             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
441             self._data_buffer_len = len(self._data_buffer[0])
442
443     def start_new_file(self, newfilename=None):
444         self.finish_current_file()
445         self.set_current_file_name(newfilename)
446
447     def set_current_file_name(self, newfilename):
448         if re.search(r'[\t\n]', newfilename):
449             raise errors.AssertionError(
450                 "Manifest filenames cannot contain whitespace: %s" %
451                 newfilename)
452         elif re.search(r'\x00', newfilename):
453             raise errors.AssertionError(
454                 "Manifest filenames cannot contain NUL characters: %s" %
455                 newfilename)
456         self._current_file_name = newfilename
457
458     def current_file_name(self):
459         return self._current_file_name
460
461     def finish_current_file(self):
462         if self._current_file_name is None:
463             if self._current_file_pos == self._current_stream_length:
464                 return
465             raise errors.AssertionError(
466                 "Cannot finish an unnamed file " +
467                 "(%d bytes at offset %d in '%s' stream)" %
468                 (self._current_stream_length - self._current_file_pos,
469                  self._current_file_pos,
470                  self._current_stream_name))
471         self._current_stream_files.append([
472                 self._current_file_pos,
473                 self._current_stream_length - self._current_file_pos,
474                 self._current_file_name])
475         self._current_file_pos = self._current_stream_length
476         self._current_file_name = None
477
478     def start_new_stream(self, newstreamname='.'):
479         self.finish_current_stream()
480         self.set_current_stream_name(newstreamname)
481
482     def set_current_stream_name(self, newstreamname):
483         if re.search(r'[\t\n]', newstreamname):
484             raise errors.AssertionError(
485                 "Manifest stream names cannot contain whitespace")
486         self._current_stream_name = '.' if newstreamname=='' else newstreamname
487
488     def current_stream_name(self):
489         return self._current_stream_name
490
491     def finish_current_stream(self):
492         self.finish_current_file()
493         self.flush_data()
494         if not self._current_stream_files:
495             pass
496         elif self._current_stream_name is None:
497             raise errors.AssertionError(
498                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
499                 (self._current_stream_length, len(self._current_stream_files)))
500         else:
501             if not self._current_stream_locators:
502                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
503             self._finished_streams.append([self._current_stream_name,
504                                            self._current_stream_locators,
505                                            self._current_stream_files])
506         self._current_stream_files = []
507         self._current_stream_length = 0
508         self._current_stream_locators = []
509         self._current_stream_name = None
510         self._current_file_pos = 0
511         self._current_file_name = None
512
513     def finish(self):
514         # Store the manifest in Keep and return its locator.
515         return self._my_keep().put(self.manifest_text())
516
517     def portable_data_hash(self):
518         stripped = self.stripped_manifest()
519         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
520
521     def manifest_text(self):
522         self.finish_current_stream()
523         manifest = ''
524
525         for stream in self._finished_streams:
526             if not re.search(r'^\.(/.*)?$', stream[0]):
527                 manifest += './'
528             manifest += stream[0].replace(' ', '\\040')
529             manifest += ' ' + ' '.join(stream[1])
530             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
531             manifest += "\n"
532
533         return manifest
534
535     def data_locators(self):
536         ret = []
537         for name, locators, files in self._finished_streams:
538             ret += locators
539         return ret
540
541
542 class ResumableCollectionWriter(CollectionWriter):
543     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
544                    '_current_stream_locators', '_current_stream_name',
545                    '_current_file_name', '_current_file_pos', '_close_file',
546                    '_data_buffer', '_dependencies', '_finished_streams',
547                    '_queued_dirents', '_queued_trees']
548
549     def __init__(self, api_client=None, num_retries=0):
550         self._dependencies = {}
551         super(ResumableCollectionWriter, self).__init__(
552             api_client, num_retries=num_retries)
553
554     @classmethod
555     def from_state(cls, state, *init_args, **init_kwargs):
556         # Try to build a new writer from scratch with the given state.
557         # If the state is not suitable to resume (because files have changed,
558         # been deleted, aren't predictable, etc.), raise a
559         # StaleWriterStateError.  Otherwise, return the initialized writer.
560         # The caller is responsible for calling writer.do_queued_work()
561         # appropriately after it's returned.
562         writer = cls(*init_args, **init_kwargs)
563         for attr_name in cls.STATE_PROPS:
564             attr_value = state[attr_name]
565             attr_class = getattr(writer, attr_name).__class__
566             # Coerce the value into the same type as the initial value, if
567             # needed.
568             if attr_class not in (type(None), attr_value.__class__):
569                 attr_value = attr_class(attr_value)
570             setattr(writer, attr_name, attr_value)
571         # Check dependencies before we try to resume anything.
572         if any(KeepLocator(ls).permission_expired()
573                for ls in writer._current_stream_locators):
574             raise errors.StaleWriterStateError(
575                 "locators include expired permission hint")
576         writer.check_dependencies()
577         if state['_current_file'] is not None:
578             path, pos = state['_current_file']
579             try:
580                 writer._queued_file = open(path, 'rb')
581                 writer._queued_file.seek(pos)
582             except IOError as error:
583                 raise errors.StaleWriterStateError(
584                     "failed to reopen active file {}: {}".format(path, error))
585         return writer
586
587     def check_dependencies(self):
588         for path, orig_stat in self._dependencies.items():
589             if not S_ISREG(orig_stat[ST_MODE]):
590                 raise errors.StaleWriterStateError("{} not file".format(path))
591             try:
592                 now_stat = tuple(os.stat(path))
593             except OSError as error:
594                 raise errors.StaleWriterStateError(
595                     "failed to stat {}: {}".format(path, error))
596             if ((not S_ISREG(now_stat[ST_MODE])) or
597                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
598                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
599                 raise errors.StaleWriterStateError("{} changed".format(path))
600
601     def dump_state(self, copy_func=lambda x: x):
602         state = {attr: copy_func(getattr(self, attr))
603                  for attr in self.STATE_PROPS}
604         if self._queued_file is None:
605             state['_current_file'] = None
606         else:
607             state['_current_file'] = (os.path.realpath(self._queued_file.name),
608                                       self._queued_file.tell())
609         return state
610
611     def _queue_file(self, source, filename=None):
612         try:
613             src_path = os.path.realpath(source)
614         except Exception:
615             raise errors.AssertionError("{} not a file path".format(source))
616         try:
617             path_stat = os.stat(src_path)
618         except OSError as stat_error:
619             path_stat = None
620         super(ResumableCollectionWriter, self)._queue_file(source, filename)
621         fd_stat = os.fstat(self._queued_file.fileno())
622         if not S_ISREG(fd_stat.st_mode):
623             # We won't be able to resume from this cache anyway, so don't
624             # worry about further checks.
625             self._dependencies[source] = tuple(fd_stat)
626         elif path_stat is None:
627             raise errors.AssertionError(
628                 "could not stat {}: {}".format(source, stat_error))
629         elif path_stat.st_ino != fd_stat.st_ino:
630             raise errors.AssertionError(
631                 "{} changed between open and stat calls".format(source))
632         else:
633             self._dependencies[src_path] = tuple(fd_stat)
634
635     def write(self, data):
636         if self._queued_file is None:
637             raise errors.AssertionError(
638                 "resumable writer can't accept unsourced data")
639         return super(ResumableCollectionWriter, self).write(data)
640
641
642 class Collection(CollectionBase):
643     def __init__(self, manifest_locator_or_text=None, api_client=None,
644                  keep_client=None, num_retries=0, block_manager=None):
645
646         self._items = None
647         self._api_client = api_client
648         self._keep_client = keep_client
649         self.num_retries = num_retries
650         self._manifest_locator = None
651         self._manifest_text = None
652         self._api_response = None
653
654         if block_manager is None:
655             self.block_manager = BlockManager(keep_client)
656
657         if manifest_locator_or_text:
658             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
659                 self._manifest_locator = manifest_locator_or_text
660             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
661                 self._manifest_locator = manifest_locator_or_text
662             elif re.match(util.manifest_pattern, manifest_locator_or_text):
663                 self._manifest_text = manifest_locator_or_text
664             else:
665                 raise errors.ArgumentError(
666                     "Argument to CollectionReader must be a manifest or a collection UUID")
667
668     def _populate_from_api_server(self):
669         # As in KeepClient itself, we must wait until the last
670         # possible moment to instantiate an API client, in order to
671         # avoid tripping up clients that don't have access to an API
672         # server.  If we do build one, make sure our Keep client uses
673         # it.  If instantiation fails, we'll fall back to the except
674         # clause, just like any other Collection lookup
675         # failure. Return an exception, or None if successful.
676         try:
677             if self._api_client is None:
678                 self._api_client = arvados.api('v1')
679                 self._keep_client = None  # Make a new one with the new api.
680             self._api_response = self._api_client.collections().get(
681                 uuid=self._manifest_locator).execute(
682                     num_retries=self.num_retries)
683             self._manifest_text = self._api_response['manifest_text']
684             return None
685         except Exception as e:
686             return e
687
688     def _populate_from_keep(self):
689         # Retrieve a manifest directly from Keep. This has a chance of
690         # working if [a] the locator includes a permission signature
691         # or [b] the Keep services are operating in world-readable
692         # mode. Return an exception, or None if successful.
693         try:
694             self._manifest_text = self._my_keep().get(
695                 self._manifest_locator, num_retries=self.num_retries)
696         except Exception as e:
697             return e
698
699     def _populate(self):
700         self._items = {}
701         if self._manifest_locator is None and self._manifest_text is None:
702             return
703         error_via_api = None
704         error_via_keep = None
705         should_try_keep = ((self._manifest_text is None) and
706                            util.keep_locator_pattern.match(
707                 self._manifest_locator))
708         if ((self._manifest_text is None) and
709             util.signed_locator_pattern.match(self._manifest_locator)):
710             error_via_keep = self._populate_from_keep()
711         if self._manifest_text is None:
712             error_via_api = self._populate_from_api_server()
713             if error_via_api is not None and not should_try_keep:
714                 raise error_via_api
715         if ((self._manifest_text is None) and
716             not error_via_keep and
717             should_try_keep):
718             # Looks like a keep locator, and we didn't already try keep above
719             error_via_keep = self._populate_from_keep()
720         if self._manifest_text is None:
721             # Nothing worked!
722             raise arvados.errors.NotFoundError(
723                 ("Failed to retrieve collection '{}' " +
724                  "from either API server ({}) or Keep ({})."
725                  ).format(
726                     self._manifest_locator,
727                     error_via_api,
728                     error_via_keep))
729         # populate
730         import_manifest(self._manifest_text, self)
731
732     def _populate_first(orig_func):
733         # Decorator for methods that read actual Collection data.
734         @functools.wraps(orig_func)
735         def wrapper(self, *args, **kwargs):
736             if self._items is None:
737                 self._populate()
738             return orig_func(self, *args, **kwargs)
739         return wrapper
740
741     def __enter__(self):
742         return self
743
744     def __exit__(self, exc_type, exc_value, traceback):
745         self.save()
746
747     @_populate_first
748     def find(self, path, create=False):
749         p = path.split("/")
750         if p[0] == '.':
751             del p[0]
752
753         if len(p) > 0:
754             item = self._items.get(p[0])
755             if len(p) == 1:
756                 # item must be a file
757                 if item is None and create:
758                     # create new file
759                     item = ArvadosFile(self.block_manager, keep=self._keep_client)
760                     self._items[p[0]] = item
761                 return item
762             else:
763                 if item is None and create:
764                     # create new collection
765                     item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries, block_manager=self.block_manager)
766                     self._items[p[0]] = item
767                 del p[0]
768                 return item.find("/".join(p), create=create)
769         else:
770             return self
771
772     @_populate_first
773     def api_response(self):
774         """api_response() -> dict or None
775
776         Returns information about this Collection fetched from the API server.
777         If the Collection exists in Keep but not the API server, currently
778         returns None.  Future versions may provide a synthetic response.
779         """
780         return self._api_response
781
782     def open(self, path, mode):
783         mode = mode.replace("b", "")
784         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
785             raise ArgumentError("Bad mode '%s'" % mode)
786         create = (mode != "r")
787
788         f = self.find(path, create=create)
789         if f is None:
790             raise ArgumentError("File not found")
791         if not isinstance(f, ArvadosFile):
792             raise ArgumentError("Path must refer to a file.")
793
794         if mode[0] == "w":
795             f.truncate(0)
796
797         if mode == "r":
798             return ArvadosFileReader(f, path, mode)
799         else:
800             return ArvadosFileWriter(f, path, mode)
801
802     @_populate_first
803     def modified(self):
804         for k,v in self._items.items():
805             if v.modified():
806                 return True
807         return False
808
809     @_populate_first
810     def set_unmodified(self):
811         for k,v in self._items.items():
812             v.set_unmodified()
813
814     @_populate_first
815     def __iter__(self):
816         self._items.iterkeys()
817
818     @_populate_first
819     def iterkeys(self):
820         self._items.iterkeys()
821
822     @_populate_first
823     def __getitem__(self, k):
824         r = self.find(k)
825         if r:
826             return r
827         else:
828             raise KeyError(k)
829
830     @_populate_first
831     def __contains__(self, k):
832         return self.find(k) is not None
833
834     @_populate_first
835     def __len__(self):
836        return len(self._items)
837
838     @_populate_first
839     def __delitem__(self, p):
840         p = path.split("/")
841         if p[0] == '.':
842             del p[0]
843
844         if len(p) > 0:
845             item = self._items.get(p[0])
846             if item is None:
847                 raise NotFoundError()
848             if len(p) == 1:
849                 del self._items[p[0]]
850             else:
851                 del p[0]
852                 del item["/".join(p)]
853         else:
854             raise NotFoundError()
855
856     @_populate_first
857     def keys(self):
858         return self._items.keys()
859
860     @_populate_first
861     def values(self):
862         return self._items.values()
863
864     @_populate_first
865     def items(self):
866         return self._items.items()
867
868     @_populate_first
869     def manifest_text(self, strip=False, normalize=False):
870         if self.modified() or self._manifest_text is None or normalize:
871             return export_manifest(self, stream_name=".", portable_locators=strip)
872         else:
873             if strip:
874                 return self.stripped_manifest()
875             else:
876                 return self._manifest_text
877
878     def portable_data_hash(self):
879         stripped = self.manifest_text(strip=True)
880         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
881
882     @_populate_first
883     def commit_bufferblocks(self):
884         pass
885
886     @_populate_first
887     def save(self):
888         if self.modified():
889             self._my_keep().put(self.manifest_text(strip=True))
890             if re.match(util.collection_uuid_pattern, self._manifest_locator):
891                 self._api_response = self._api_client.collections().update(
892                     uuid=self._manifest_locator,
893                     body={'manifest_text': self.manifest_text(strip=False)}
894                     ).execute(
895                         num_retries=self.num_retries)
896             else:
897                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
898             self.set_unmodified()
899
900     @_populate_first
901     def save_as(self, name, owner_uuid=None):
902         self._my_keep().put(self.manifest_text(strip=True))
903         body = {"manifest_text": self.manifest_text(strip=False),
904                 "name": name}
905         if owner_uuid:
906             body["owner_uuid"] = owner_uuid
907         self._api_response = self._api_client.collections().create(body=body).execute(num_retries=self.num_retries)
908         self._manifest_locator = self._api_response["uuid"]
909         self.set_unmodified()
910
911
912 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
913     if into_collection is not None:
914         if len(into_collection) > 0:
915             raise ArgumentError("Can only import manifest into an empty collection")
916         c = into_collection
917     else:
918         c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
919
920     STREAM_NAME = 0
921     BLOCKS = 1
922     SEGMENTS = 2
923
924     stream_name = None
925     state = STREAM_NAME
926
927     for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
928         tok = n.group(1)
929         sep = n.group(2)
930
931         if state == STREAM_NAME:
932             # starting a new stream
933             stream_name = tok.replace('\\040', ' ')
934             blocks = []
935             segments = []
936             streamoffset = 0L
937             state = BLOCKS
938             continue
939
940         if state == BLOCKS:
941             s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
942             if s:
943                 blocksize = long(s.group(1))
944                 blocks.append(Range(tok, streamoffset, blocksize))
945                 streamoffset += blocksize
946             else:
947                 state = SEGMENTS
948
949         if state == SEGMENTS:
950             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
951             if s:
952                 pos = long(s.group(1))
953                 size = long(s.group(2))
954                 name = s.group(3).replace('\\040', ' ')
955                 f = c.find("%s/%s" % (stream_name, name), create=True)
956                 f.add_segment(blocks, pos, size)
957             else:
958                 # error!
959                 raise errors.SyntaxError("Invalid manifest format")
960
961         if sep == "\n":
962             stream_name = None
963             state = STREAM_NAME
964
965     c.set_unmodified()
966     return c
967
968 def export_manifest(item, stream_name=".", portable_locators=False):
969     buf = ""
970     if isinstance(item, Collection):
971         stream = {}
972         sorted_keys = sorted(item.keys())
973         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
974             v = item[k]
975             st = []
976             for s in v._segments:
977                 loc = s.locator
978                 if loc.startswith("bufferblock"):
979                     loc = v.bbm._bufferblocks[loc].locator()
980                 st.append(LocatorAndRange(loc, locator_block_size(loc),
981                                      s.segment_offset, s.range_size))
982             stream[k] = st
983         buf += ' '.join(normalize_stream(stream_name, stream))
984         buf += "\n"
985         for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
986             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k))
987     elif isinstance(item, ArvadosFile):
988         st = []
989         for s in item._segments:
990             loc = s.locator
991             if loc.startswith("bufferblock"):
992                 loc = item._bufferblocks[loc].calculate_locator()
993             st.append(LocatorAndRange(loc, locator_block_size(loc),
994                                  s.segment_offset, s.range_size))
995         stream[stream_name] = st
996         buf += ' '.join(normalize_stream(stream_name, stream))
997         buf += "\n"
998     return buf