3198: Many tests. Fixed lots of bugs.
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6
7 from collections import deque
8 from stat import *
9
10 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
11 from keep import *
12 from .stream import StreamReader, normalize_stream, locator_block_size
13 from .ranges import Range, LocatorAndRange
14 import config
15 import errors
16 import util
17
18 _logger = logging.getLogger('arvados.collection')
19
20 class CollectionBase(object):
21     def __enter__(self):
22         return self
23
24     def __exit__(self, exc_type, exc_value, traceback):
25         pass
26
27     def _my_keep(self):
28         if self._keep_client is None:
29             self._keep_client = KeepClient(api_client=self._api_client,
30                                            num_retries=self.num_retries)
31         return self._keep_client
32
33     def stripped_manifest(self):
34         """
35         Return the manifest for the current collection with all
36         non-portable hints (i.e., permission signatures and other
37         hints other than size hints) removed from the locators.
38         """
39         raw = self.manifest_text()
40         clean = []
41         for line in raw.split("\n"):
42             fields = line.split()
43             if fields:
44                 clean_fields = fields[:1] + [
45                     (re.sub(r'\+[^\d][^\+]*', '', x)
46                      if re.match(util.keep_locator_pattern, x)
47                      else x)
48                     for x in fields[1:]]
49                 clean += [' '.join(clean_fields), "\n"]
50         return ''.join(clean)
51
52
53 class CollectionReader(CollectionBase):
54     def __init__(self, manifest_locator_or_text, api_client=None,
55                  keep_client=None, num_retries=0):
56         """Instantiate a CollectionReader.
57
58         This class parses Collection manifests to provide a simple interface
59         to read its underlying files.
60
61         Arguments:
62         * manifest_locator_or_text: One of a Collection UUID, portable data
63           hash, or full manifest text.
64         * api_client: The API client to use to look up Collections.  If not
65           provided, CollectionReader will build one from available Arvados
66           configuration.
67         * keep_client: The KeepClient to use to download Collection data.
68           If not provided, CollectionReader will build one from available
69           Arvados configuration.
70         * num_retries: The default number of times to retry failed
71           service requests.  Default 0.  You may change this value
72           after instantiation, but note those changes may not
73           propagate to related objects like the Keep client.
74         """
75         self._api_client = api_client
76         self._keep_client = keep_client
77         self.num_retries = num_retries
78         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
79             self._manifest_locator = manifest_locator_or_text
80             self._manifest_text = None
81         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
82             self._manifest_locator = manifest_locator_or_text
83             self._manifest_text = None
84         elif re.match(util.manifest_pattern, manifest_locator_or_text):
85             self._manifest_text = manifest_locator_or_text
86             self._manifest_locator = None
87         else:
88             raise errors.ArgumentError(
89                 "Argument to CollectionReader must be a manifest or a collection UUID")
90         self._api_response = None
91         self._streams = None
92
93     def _populate_from_api_server(self):
94         # As in KeepClient itself, we must wait until the last
95         # possible moment to instantiate an API client, in order to
96         # avoid tripping up clients that don't have access to an API
97         # server.  If we do build one, make sure our Keep client uses
98         # it.  If instantiation fails, we'll fall back to the except
99         # clause, just like any other Collection lookup
100         # failure. Return an exception, or None if successful.
101         try:
102             if self._api_client is None:
103                 self._api_client = arvados.api('v1')
104                 self._keep_client = None  # Make a new one with the new api.
105             self._api_response = self._api_client.collections().get(
106                 uuid=self._manifest_locator).execute(
107                 num_retries=self.num_retries)
108             self._manifest_text = self._api_response['manifest_text']
109             return None
110         except Exception as e:
111             return e
112
113     def _populate_from_keep(self):
114         # Retrieve a manifest directly from Keep. This has a chance of
115         # working if [a] the locator includes a permission signature
116         # or [b] the Keep services are operating in world-readable
117         # mode. Return an exception, or None if successful.
118         try:
119             self._manifest_text = self._my_keep().get(
120                 self._manifest_locator, num_retries=self.num_retries)
121         except Exception as e:
122             return e
123
124     def _populate(self):
125         error_via_api = None
126         error_via_keep = None
127         should_try_keep = ((self._manifest_text is None) and
128                            util.keep_locator_pattern.match(
129                 self._manifest_locator))
130         if ((self._manifest_text is None) and
131             util.signed_locator_pattern.match(self._manifest_locator)):
132             error_via_keep = self._populate_from_keep()
133         if self._manifest_text is None:
134             error_via_api = self._populate_from_api_server()
135             if error_via_api is not None and not should_try_keep:
136                 raise error_via_api
137         if ((self._manifest_text is None) and
138             not error_via_keep and
139             should_try_keep):
140             # Looks like a keep locator, and we didn't already try keep above
141             error_via_keep = self._populate_from_keep()
142         if self._manifest_text is None:
143             # Nothing worked!
144             raise arvados.errors.NotFoundError(
145                 ("Failed to retrieve collection '{}' " +
146                  "from either API server ({}) or Keep ({})."
147                  ).format(
148                     self._manifest_locator,
149                     error_via_api,
150                     error_via_keep))
151         self._streams = [sline.split()
152                          for sline in self._manifest_text.split("\n")
153                          if sline]
154
155     def _populate_first(orig_func):
156         # Decorator for methods that read actual Collection data.
157         @functools.wraps(orig_func)
158         def wrapper(self, *args, **kwargs):
159             if self._streams is None:
160                 self._populate()
161             return orig_func(self, *args, **kwargs)
162         return wrapper
163
164     @_populate_first
165     def api_response(self):
166         """api_response() -> dict or None
167
168         Returns information about this Collection fetched from the API server.
169         If the Collection exists in Keep but not the API server, currently
170         returns None.  Future versions may provide a synthetic response.
171         """
172         return self._api_response
173
174     @_populate_first
175     def normalize(self):
176         # Rearrange streams
177         streams = {}
178         for s in self.all_streams():
179             for f in s.all_files():
180                 streamname, filename = split(s.name() + "/" + f.name())
181                 if streamname not in streams:
182                     streams[streamname] = {}
183                 if filename not in streams[streamname]:
184                     streams[streamname][filename] = []
185                 for r in f.segments:
186                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
187
188         self._streams = [normalize_stream(s, streams[s])
189                          for s in sorted(streams)]
190
191         # Regenerate the manifest text based on the normalized streams
192         self._manifest_text = ''.join(
193             [StreamReader(stream, keep=self._my_keep()).manifest_text()
194              for stream in self._streams])
195
196     @_populate_first
197     def open(self, streampath, filename=None):
198         """open(streampath[, filename]) -> file-like object
199
200         Pass in the path of a file to read from the Collection, either as a
201         single string or as two separate stream name and file name arguments.
202         This method returns a file-like object to read that file.
203         """
204         if filename is None:
205             streampath, filename = split(streampath)
206         keep_client = self._my_keep()
207         for stream_s in self._streams:
208             stream = StreamReader(stream_s, keep_client,
209                                   num_retries=self.num_retries)
210             if stream.name() == streampath:
211                 break
212         else:
213             raise ValueError("stream '{}' not found in Collection".
214                              format(streampath))
215         try:
216             return stream.files()[filename]
217         except KeyError:
218             raise ValueError("file '{}' not found in Collection stream '{}'".
219                              format(filename, streampath))
220
221     @_populate_first
222     def all_streams(self):
223         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
224                 for s in self._streams]
225
226     def all_files(self):
227         for s in self.all_streams():
228             for f in s.all_files():
229                 yield f
230
231     @_populate_first
232     def manifest_text(self, strip=False, normalize=False):
233         if normalize:
234             cr = CollectionReader(self.manifest_text())
235             cr.normalize()
236             return cr.manifest_text(strip=strip, normalize=False)
237         elif strip:
238             return self.stripped_manifest()
239         else:
240             return self._manifest_text
241
242
243 class _WriterFile(ArvadosFileBase):
244     def __init__(self, coll_writer, name):
245         super(_WriterFile, self).__init__(name, 'wb')
246         self.dest = coll_writer
247
248     def close(self):
249         super(_WriterFile, self).close()
250         self.dest.finish_current_file()
251
252     @ArvadosFileBase._before_close
253     def write(self, data):
254         self.dest.write(data)
255
256     @ArvadosFileBase._before_close
257     def writelines(self, seq):
258         for data in seq:
259             self.write(data)
260
261     @ArvadosFileBase._before_close
262     def flush(self):
263         self.dest.flush_data()
264
265
266 class CollectionWriter(CollectionBase):
267     def __init__(self, api_client=None, num_retries=0):
268         """Instantiate a CollectionWriter.
269
270         CollectionWriter lets you build a new Arvados Collection from scratch.
271         Write files to it.  The CollectionWriter will upload data to Keep as
272         appropriate, and provide you with the Collection manifest text when
273         you're finished.
274
275         Arguments:
276         * api_client: The API client to use to look up Collections.  If not
277           provided, CollectionReader will build one from available Arvados
278           configuration.
279         * num_retries: The default number of times to retry failed
280           service requests.  Default 0.  You may change this value
281           after instantiation, but note those changes may not
282           propagate to related objects like the Keep client.
283         """
284         self._api_client = api_client
285         self.num_retries = num_retries
286         self._keep_client = None
287         self._data_buffer = []
288         self._data_buffer_len = 0
289         self._current_stream_files = []
290         self._current_stream_length = 0
291         self._current_stream_locators = []
292         self._current_stream_name = '.'
293         self._current_file_name = None
294         self._current_file_pos = 0
295         self._finished_streams = []
296         self._close_file = None
297         self._queued_file = None
298         self._queued_dirents = deque()
299         self._queued_trees = deque()
300         self._last_open = None
301
302     def __exit__(self, exc_type, exc_value, traceback):
303         if exc_type is None:
304             self.finish()
305
306     def do_queued_work(self):
307         # The work queue consists of three pieces:
308         # * _queued_file: The file object we're currently writing to the
309         #   Collection.
310         # * _queued_dirents: Entries under the current directory
311         #   (_queued_trees[0]) that we want to write or recurse through.
312         #   This may contain files from subdirectories if
313         #   max_manifest_depth == 0 for this directory.
314         # * _queued_trees: Directories that should be written as separate
315         #   streams to the Collection.
316         # This function handles the smallest piece of work currently queued
317         # (current file, then current directory, then next directory) until
318         # no work remains.  The _work_THING methods each do a unit of work on
319         # THING.  _queue_THING methods add a THING to the work queue.
320         while True:
321             if self._queued_file:
322                 self._work_file()
323             elif self._queued_dirents:
324                 self._work_dirents()
325             elif self._queued_trees:
326                 self._work_trees()
327             else:
328                 break
329
330     def _work_file(self):
331         while True:
332             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
333             if not buf:
334                 break
335             self.write(buf)
336         self.finish_current_file()
337         if self._close_file:
338             self._queued_file.close()
339         self._close_file = None
340         self._queued_file = None
341
342     def _work_dirents(self):
343         path, stream_name, max_manifest_depth = self._queued_trees[0]
344         if stream_name != self.current_stream_name():
345             self.start_new_stream(stream_name)
346         while self._queued_dirents:
347             dirent = self._queued_dirents.popleft()
348             target = os.path.join(path, dirent)
349             if os.path.isdir(target):
350                 self._queue_tree(target,
351                                  os.path.join(stream_name, dirent),
352                                  max_manifest_depth - 1)
353             else:
354                 self._queue_file(target, dirent)
355                 break
356         if not self._queued_dirents:
357             self._queued_trees.popleft()
358
359     def _work_trees(self):
360         path, stream_name, max_manifest_depth = self._queued_trees[0]
361         d = util.listdir_recursive(
362             path, max_depth = (None if max_manifest_depth == 0 else 0))
363         if d:
364             self._queue_dirents(stream_name, d)
365         else:
366             self._queued_trees.popleft()
367
368     def _queue_file(self, source, filename=None):
369         assert (self._queued_file is None), "tried to queue more than one file"
370         if not hasattr(source, 'read'):
371             source = open(source, 'rb')
372             self._close_file = True
373         else:
374             self._close_file = False
375         if filename is None:
376             filename = os.path.basename(source.name)
377         self.start_new_file(filename)
378         self._queued_file = source
379
380     def _queue_dirents(self, stream_name, dirents):
381         assert (not self._queued_dirents), "tried to queue more than one tree"
382         self._queued_dirents = deque(sorted(dirents))
383
384     def _queue_tree(self, path, stream_name, max_manifest_depth):
385         self._queued_trees.append((path, stream_name, max_manifest_depth))
386
387     def write_file(self, source, filename=None):
388         self._queue_file(source, filename)
389         self.do_queued_work()
390
391     def write_directory_tree(self,
392                              path, stream_name='.', max_manifest_depth=-1):
393         self._queue_tree(path, stream_name, max_manifest_depth)
394         self.do_queued_work()
395
396     def write(self, newdata):
397         if hasattr(newdata, '__iter__'):
398             for s in newdata:
399                 self.write(s)
400             return
401         self._data_buffer.append(newdata)
402         self._data_buffer_len += len(newdata)
403         self._current_stream_length += len(newdata)
404         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
405             self.flush_data()
406
407     def open(self, streampath, filename=None):
408         """open(streampath[, filename]) -> file-like object
409
410         Pass in the path of a file to write to the Collection, either as a
411         single string or as two separate stream name and file name arguments.
412         This method returns a file-like object you can write to add it to the
413         Collection.
414
415         You may only have one file object from the Collection open at a time,
416         so be sure to close the object when you're done.  Using the object in
417         a with statement makes that easy::
418
419           with cwriter.open('./doc/page1.txt') as outfile:
420               outfile.write(page1_data)
421           with cwriter.open('./doc/page2.txt') as outfile:
422               outfile.write(page2_data)
423         """
424         if filename is None:
425             streampath, filename = split(streampath)
426         if self._last_open and not self._last_open.closed:
427             raise errors.AssertionError(
428                 "can't open '{}' when '{}' is still open".format(
429                     filename, self._last_open.name))
430         if streampath != self.current_stream_name():
431             self.start_new_stream(streampath)
432         self.set_current_file_name(filename)
433         self._last_open = _WriterFile(self, filename)
434         return self._last_open
435
436     def flush_data(self):
437         data_buffer = ''.join(self._data_buffer)
438         if data_buffer:
439             self._current_stream_locators.append(
440                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
441             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
442             self._data_buffer_len = len(self._data_buffer[0])
443
444     def start_new_file(self, newfilename=None):
445         self.finish_current_file()
446         self.set_current_file_name(newfilename)
447
448     def set_current_file_name(self, newfilename):
449         if re.search(r'[\t\n]', newfilename):
450             raise errors.AssertionError(
451                 "Manifest filenames cannot contain whitespace: %s" %
452                 newfilename)
453         elif re.search(r'\x00', newfilename):
454             raise errors.AssertionError(
455                 "Manifest filenames cannot contain NUL characters: %s" %
456                 newfilename)
457         self._current_file_name = newfilename
458
459     def current_file_name(self):
460         return self._current_file_name
461
462     def finish_current_file(self):
463         if self._current_file_name is None:
464             if self._current_file_pos == self._current_stream_length:
465                 return
466             raise errors.AssertionError(
467                 "Cannot finish an unnamed file " +
468                 "(%d bytes at offset %d in '%s' stream)" %
469                 (self._current_stream_length - self._current_file_pos,
470                  self._current_file_pos,
471                  self._current_stream_name))
472         self._current_stream_files.append([
473                 self._current_file_pos,
474                 self._current_stream_length - self._current_file_pos,
475                 self._current_file_name])
476         self._current_file_pos = self._current_stream_length
477         self._current_file_name = None
478
479     def start_new_stream(self, newstreamname='.'):
480         self.finish_current_stream()
481         self.set_current_stream_name(newstreamname)
482
483     def set_current_stream_name(self, newstreamname):
484         if re.search(r'[\t\n]', newstreamname):
485             raise errors.AssertionError(
486                 "Manifest stream names cannot contain whitespace")
487         self._current_stream_name = '.' if newstreamname=='' else newstreamname
488
489     def current_stream_name(self):
490         return self._current_stream_name
491
492     def finish_current_stream(self):
493         self.finish_current_file()
494         self.flush_data()
495         if not self._current_stream_files:
496             pass
497         elif self._current_stream_name is None:
498             raise errors.AssertionError(
499                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
500                 (self._current_stream_length, len(self._current_stream_files)))
501         else:
502             if not self._current_stream_locators:
503                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
504             self._finished_streams.append([self._current_stream_name,
505                                            self._current_stream_locators,
506                                            self._current_stream_files])
507         self._current_stream_files = []
508         self._current_stream_length = 0
509         self._current_stream_locators = []
510         self._current_stream_name = None
511         self._current_file_pos = 0
512         self._current_file_name = None
513
514     def finish(self):
515         # Store the manifest in Keep and return its locator.
516         return self._my_keep().put(self.manifest_text())
517
518     def portable_data_hash(self):
519         stripped = self.stripped_manifest()
520         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
521
522     def manifest_text(self):
523         self.finish_current_stream()
524         manifest = ''
525
526         for stream in self._finished_streams:
527             if not re.search(r'^\.(/.*)?$', stream[0]):
528                 manifest += './'
529             manifest += stream[0].replace(' ', '\\040')
530             manifest += ' ' + ' '.join(stream[1])
531             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
532             manifest += "\n"
533
534         return manifest
535
536     def data_locators(self):
537         ret = []
538         for name, locators, files in self._finished_streams:
539             ret += locators
540         return ret
541
542
543 class ResumableCollectionWriter(CollectionWriter):
544     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
545                    '_current_stream_locators', '_current_stream_name',
546                    '_current_file_name', '_current_file_pos', '_close_file',
547                    '_data_buffer', '_dependencies', '_finished_streams',
548                    '_queued_dirents', '_queued_trees']
549
550     def __init__(self, api_client=None, num_retries=0):
551         self._dependencies = {}
552         super(ResumableCollectionWriter, self).__init__(
553             api_client, num_retries=num_retries)
554
555     @classmethod
556     def from_state(cls, state, *init_args, **init_kwargs):
557         # Try to build a new writer from scratch with the given state.
558         # If the state is not suitable to resume (because files have changed,
559         # been deleted, aren't predictable, etc.), raise a
560         # StaleWriterStateError.  Otherwise, return the initialized writer.
561         # The caller is responsible for calling writer.do_queued_work()
562         # appropriately after it's returned.
563         writer = cls(*init_args, **init_kwargs)
564         for attr_name in cls.STATE_PROPS:
565             attr_value = state[attr_name]
566             attr_class = getattr(writer, attr_name).__class__
567             # Coerce the value into the same type as the initial value, if
568             # needed.
569             if attr_class not in (type(None), attr_value.__class__):
570                 attr_value = attr_class(attr_value)
571             setattr(writer, attr_name, attr_value)
572         # Check dependencies before we try to resume anything.
573         if any(KeepLocator(ls).permission_expired()
574                for ls in writer._current_stream_locators):
575             raise errors.StaleWriterStateError(
576                 "locators include expired permission hint")
577         writer.check_dependencies()
578         if state['_current_file'] is not None:
579             path, pos = state['_current_file']
580             try:
581                 writer._queued_file = open(path, 'rb')
582                 writer._queued_file.seek(pos)
583             except IOError as error:
584                 raise errors.StaleWriterStateError(
585                     "failed to reopen active file {}: {}".format(path, error))
586         return writer
587
588     def check_dependencies(self):
589         for path, orig_stat in self._dependencies.items():
590             if not S_ISREG(orig_stat[ST_MODE]):
591                 raise errors.StaleWriterStateError("{} not file".format(path))
592             try:
593                 now_stat = tuple(os.stat(path))
594             except OSError as error:
595                 raise errors.StaleWriterStateError(
596                     "failed to stat {}: {}".format(path, error))
597             if ((not S_ISREG(now_stat[ST_MODE])) or
598                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
599                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
600                 raise errors.StaleWriterStateError("{} changed".format(path))
601
602     def dump_state(self, copy_func=lambda x: x):
603         state = {attr: copy_func(getattr(self, attr))
604                  for attr in self.STATE_PROPS}
605         if self._queued_file is None:
606             state['_current_file'] = None
607         else:
608             state['_current_file'] = (os.path.realpath(self._queued_file.name),
609                                       self._queued_file.tell())
610         return state
611
612     def _queue_file(self, source, filename=None):
613         try:
614             src_path = os.path.realpath(source)
615         except Exception:
616             raise errors.AssertionError("{} not a file path".format(source))
617         try:
618             path_stat = os.stat(src_path)
619         except OSError as stat_error:
620             path_stat = None
621         super(ResumableCollectionWriter, self)._queue_file(source, filename)
622         fd_stat = os.fstat(self._queued_file.fileno())
623         if not S_ISREG(fd_stat.st_mode):
624             # We won't be able to resume from this cache anyway, so don't
625             # worry about further checks.
626             self._dependencies[source] = tuple(fd_stat)
627         elif path_stat is None:
628             raise errors.AssertionError(
629                 "could not stat {}: {}".format(source, stat_error))
630         elif path_stat.st_ino != fd_stat.st_ino:
631             raise errors.AssertionError(
632                 "{} changed between open and stat calls".format(source))
633         else:
634             self._dependencies[src_path] = tuple(fd_stat)
635
636     def write(self, data):
637         if self._queued_file is None:
638             raise errors.AssertionError(
639                 "resumable writer can't accept unsourced data")
640         return super(ResumableCollectionWriter, self).write(data)
641
642
643 class Collection(CollectionBase):
644     def __init__(self, parent=None, manifest_locator_or_text=None, api_client=None,
645                  keep_client=None, num_retries=0, block_manager=None):
646
647         self._parent = parent
648         self._items = None
649         self._api_client = api_client
650         self._keep_client = keep_client
651         self._block_manager = block_manager
652
653         self.num_retries = num_retries
654         self._manifest_locator = None
655         self._manifest_text = None
656         self._api_response = None
657
658         if manifest_locator_or_text:
659             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
660                 self._manifest_locator = manifest_locator_or_text
661             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
662                 self._manifest_locator = manifest_locator_or_text
663             elif re.match(util.manifest_pattern, manifest_locator_or_text):
664                 self._manifest_text = manifest_locator_or_text
665             else:
666                 raise errors.ArgumentError(
667                     "Argument to CollectionReader must be a manifest or a collection UUID")
668
669     def _my_api(self):
670         if self._api_client is None:
671             if self._parent is not None:
672                 return self._parent._my_api()
673             self._api_client = arvados.api('v1')
674             self._keep_client = None  # Make a new one with the new api.
675         return self._api_client
676
677     def _my_keep(self):
678         if self._keep_client is None:
679             if self._parent is not None:
680                 return self._parent._my_keep()
681             self._keep_client = KeepClient(api_client=self._my_api(),
682                                            num_retries=self.num_retries)
683         return self._keep_client
684
685     def _my_block_manager(self):
686         if self._block_manager is None:
687             if self._parent is not None:
688                 return self._parent._my_block_manager()
689             self._block_manager = BlockManager(self._my_keep())
690         return self._block_manager
691
692     def _populate_from_api_server(self):
693         # As in KeepClient itself, we must wait until the last
694         # possible moment to instantiate an API client, in order to
695         # avoid tripping up clients that don't have access to an API
696         # server.  If we do build one, make sure our Keep client uses
697         # it.  If instantiation fails, we'll fall back to the except
698         # clause, just like any other Collection lookup
699         # failure. Return an exception, or None if successful.
700         try:
701             self._api_response = self._my_api().collections().get(
702                 uuid=self._manifest_locator).execute(
703                     num_retries=self.num_retries)
704             self._manifest_text = self._api_response['manifest_text']
705             return None
706         except Exception as e:
707             return e
708
709     def _populate_from_keep(self):
710         # Retrieve a manifest directly from Keep. This has a chance of
711         # working if [a] the locator includes a permission signature
712         # or [b] the Keep services are operating in world-readable
713         # mode. Return an exception, or None if successful.
714         try:
715             self._manifest_text = self._my_keep().get(
716                 self._manifest_locator, num_retries=self.num_retries)
717         except Exception as e:
718             return e
719
720     def _populate(self):
721         self._items = {}
722         if self._manifest_locator is None and self._manifest_text is None:
723             return
724         error_via_api = None
725         error_via_keep = None
726         should_try_keep = ((self._manifest_text is None) and
727                            util.keep_locator_pattern.match(
728                 self._manifest_locator))
729         if ((self._manifest_text is None) and
730             util.signed_locator_pattern.match(self._manifest_locator)):
731             error_via_keep = self._populate_from_keep()
732         if self._manifest_text is None:
733             error_via_api = self._populate_from_api_server()
734             if error_via_api is not None and not should_try_keep:
735                 raise error_via_api
736         if ((self._manifest_text is None) and
737             not error_via_keep and
738             should_try_keep):
739             # Looks like a keep locator, and we didn't already try keep above
740             error_via_keep = self._populate_from_keep()
741         if self._manifest_text is None:
742             # Nothing worked!
743             raise arvados.errors.NotFoundError(
744                 ("Failed to retrieve collection '{}' " +
745                  "from either API server ({}) or Keep ({})."
746                  ).format(
747                     self._manifest_locator,
748                     error_via_api,
749                     error_via_keep))
750         # populate
751         import_manifest(self._manifest_text, self)
752
753     def _populate_first(orig_func):
754         # Decorator for methods that read actual Collection data.
755         @functools.wraps(orig_func)
756         def wrapper(self, *args, **kwargs):
757             if self._items is None:
758                 self._populate()
759             return orig_func(self, *args, **kwargs)
760         return wrapper
761
762     def __enter__(self):
763         return self
764
765     def __exit__(self, exc_type, exc_value, traceback):
766         self.save(no_locator=True)
767         if self._block_manager is not None:
768             self._block_manager.stop_threads()
769
770     @_populate_first
771     def find(self, path, create=False):
772         p = path.split("/")
773         if p[0] == '.':
774             del p[0]
775
776         if len(p) > 0:
777             item = self._items.get(p[0])
778             if len(p) == 1:
779                 # item must be a file
780                 if item is None and create:
781                     # create new file
782                     item = ArvadosFile(self, keep=self._keep_client)
783                     self._items[p[0]] = item
784                 return item
785             else:
786                 if item is None and create:
787                     # create new collection
788                     item = Collection(parent=self, num_retries=self.num_retries)
789                     self._items[p[0]] = item
790                 del p[0]
791                 return item.find("/".join(p), create=create)
792         else:
793             return self
794
795     @_populate_first
796     def api_response(self):
797         """api_response() -> dict or None
798
799         Returns information about this Collection fetched from the API server.
800         If the Collection exists in Keep but not the API server, currently
801         returns None.  Future versions may provide a synthetic response.
802         """
803         return self._api_response
804
805     def open(self, path, mode):
806         mode = mode.replace("b", "")
807         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
808             raise ArgumentError("Bad mode '%s'" % mode)
809         create = (mode != "r")
810
811         f = self.find(path, create=create)
812         if f is None:
813             raise IOError((errno.ENOENT, "File not found"))
814         if not isinstance(f, ArvadosFile):
815             raise IOError((errno.EISDIR, "Path must refer to a file."))
816
817         if mode[0] == "w":
818             f.truncate(0)
819
820         if mode == "r":
821             return ArvadosFileReader(f, path, mode)
822         else:
823             return ArvadosFileWriter(f, path, mode)
824
825     @_populate_first
826     def modified(self):
827         for k,v in self._items.items():
828             if v.modified():
829                 return True
830         return False
831
832     @_populate_first
833     def set_unmodified(self):
834         for k,v in self._items.items():
835             v.set_unmodified()
836
837     @_populate_first
838     def __iter__(self):
839         self._items.iterkeys()
840
841     @_populate_first
842     def iterkeys(self):
843         self._items.iterkeys()
844
845     @_populate_first
846     def __getitem__(self, k):
847         return self._items[k]
848
849     @_populate_first
850     def __contains__(self, k):
851         return k in self._items
852
853     @_populate_first
854     def __len__(self):
855        return len(self._items)
856
857     @_populate_first
858     def __delitem__(self, p):
859         del self._items[p]
860
861     @_populate_first
862     def keys(self):
863         return self._items.keys()
864
865     @_populate_first
866     def values(self):
867         return self._items.values()
868
869     @_populate_first
870     def items(self):
871         return self._items.items()
872
873     @_populate_first
874     def exists(self, path):
875         return self.find(path) != None
876
877     @_populate_first
878     def remove(self, path):
879         p = path.split("/")
880         if p[0] == '.':
881             del p[0]
882
883         if len(p) > 0:
884             item = self._items.get(p[0])
885             if item is None:
886                 raise IOError((errno.ENOENT, "File not found"))
887             if len(p) == 1:
888                 del self._items[p[0]]
889             else:
890                 del p[0]
891                 item.remove("/".join(p))
892         else:
893             raise IOError((errno.ENOENT, "File not found"))
894
895     @_populate_first
896     def manifest_text(self, strip=False, normalize=False):
897         if self.modified() or self._manifest_text is None or normalize:
898             return export_manifest(self, stream_name=".", portable_locators=strip)
899         else:
900             if strip:
901                 return self.stripped_manifest()
902             else:
903                 return self._manifest_text
904
905     def portable_data_hash(self):
906         stripped = self.manifest_text(strip=True)
907         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
908
909     @_populate_first
910     def save(self, no_locator=False):
911         if self.modified():
912             self._my_block_manager().commit_all()
913             self._my_keep().put(self.manifest_text(strip=True))
914             if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
915                 self._api_response = self._my_api().collections().update(
916                     uuid=self._manifest_locator,
917                     body={'manifest_text': self.manifest_text(strip=False)}
918                     ).execute(
919                         num_retries=self.num_retries)
920             elif not no_locator:
921                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
922             self.set_unmodified()
923
924     @_populate_first
925     def save_as(self, name, owner_uuid=None):
926         self._my_block_manager().commit_all()
927         self._my_keep().put(self.manifest_text(strip=True))
928         body = {"manifest_text": self.manifest_text(strip=False),
929                 "name": name}
930         if owner_uuid:
931             body["owner_uuid"] = owner_uuid
932         self._api_response = self._my_api().collections().create(body=body).execute(num_retries=self.num_retries)
933         self._manifest_locator = self._api_response["uuid"]
934         self.set_unmodified()
935
936
937 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
938     if into_collection is not None:
939         if len(into_collection) > 0:
940             raise ArgumentError("Can only import manifest into an empty collection")
941         c = into_collection
942     else:
943         c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
944
945     STREAM_NAME = 0
946     BLOCKS = 1
947     SEGMENTS = 2
948
949     stream_name = None
950     state = STREAM_NAME
951
952     for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
953         tok = n.group(1)
954         sep = n.group(2)
955
956         if state == STREAM_NAME:
957             # starting a new stream
958             stream_name = tok.replace('\\040', ' ')
959             blocks = []
960             segments = []
961             streamoffset = 0L
962             state = BLOCKS
963             continue
964
965         if state == BLOCKS:
966             s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
967             if s:
968                 blocksize = long(s.group(1))
969                 blocks.append(Range(tok, streamoffset, blocksize))
970                 streamoffset += blocksize
971             else:
972                 state = SEGMENTS
973
974         if state == SEGMENTS:
975             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
976             if s:
977                 pos = long(s.group(1))
978                 size = long(s.group(2))
979                 name = s.group(3).replace('\\040', ' ')
980                 f = c.find("%s/%s" % (stream_name, name), create=True)
981                 f.add_segment(blocks, pos, size)
982             else:
983                 # error!
984                 raise errors.SyntaxError("Invalid manifest format")
985
986         if sep == "\n":
987             stream_name = None
988             state = STREAM_NAME
989
990     c.set_unmodified()
991     return c
992
993 def export_manifest(item, stream_name=".", portable_locators=False):
994     buf = ""
995     if isinstance(item, Collection):
996         stream = {}
997         sorted_keys = sorted(item.keys())
998         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
999             v = item[k]
1000             st = []
1001             for s in v._segments:
1002                 loc = s.locator
1003                 if loc.startswith("bufferblock"):
1004                     loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1005                 st.append(LocatorAndRange(loc, locator_block_size(loc),
1006                                      s.segment_offset, s.range_size))
1007             stream[k] = st
1008         if stream:
1009             buf += ' '.join(normalize_stream(stream_name, stream))
1010             buf += "\n"
1011         for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
1012             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k))
1013     elif isinstance(item, ArvadosFile):
1014         st = []
1015         for s in item._segments:
1016             loc = s.locator
1017             if loc.startswith("bufferblock"):
1018                 loc = item._bufferblocks[loc].calculate_locator()
1019             st.append(LocatorAndRange(loc, locator_block_size(loc),
1020                                  s.segment_offset, s.range_size))
1021         stream[stream_name] = st
1022         buf += ' '.join(normalize_stream(stream_name, stream))
1023         buf += "\n"
1024     return buf