077115ea8bcf7dba14d15cca98d4001bd1200d59
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5
6 from collections import deque
7 from stat import *
8
9 from .arvfile import ArvadosFileBase, split, ArvadosFile
10 from keep import *
11 from .stream import StreamReader, normalize_stream
12 import config
13 import errors
14 import util
15
16 _logger = logging.getLogger('arvados.collection')
17
18 class CollectionBase(object):
19     def __enter__(self):
20         return self
21
22     def __exit__(self, exc_type, exc_value, traceback):
23         pass
24
25     def _my_keep(self):
26         if self._keep_client is None:
27             self._keep_client = KeepClient(api_client=self._api_client,
28                                            num_retries=self.num_retries)
29         return self._keep_client
30
31     def stripped_manifest(self):
32         """
33         Return the manifest for the current collection with all
34         non-portable hints (i.e., permission signatures and other
35         hints other than size hints) removed from the locators.
36         """
37         raw = self.manifest_text()
38         clean = []
39         for line in raw.split("\n"):
40             fields = line.split()
41             if fields:
42                 clean_fields = fields[:1] + [
43                     (re.sub(r'\+[^\d][^\+]*', '', x)
44                      if re.match(util.keep_locator_pattern, x)
45                      else x)
46                     for x in fields[1:]]
47                 clean += [' '.join(clean_fields), "\n"]
48         return ''.join(clean)
49
50
51 class CollectionReader(CollectionBase):
52     def __init__(self, manifest_locator_or_text, api_client=None,
53                  keep_client=None, num_retries=0):
54         """Instantiate a CollectionReader.
55
56         This class parses Collection manifests to provide a simple interface
57         to read its underlying files.
58
59         Arguments:
60         * manifest_locator_or_text: One of a Collection UUID, portable data
61           hash, or full manifest text.
62         * api_client: The API client to use to look up Collections.  If not
63           provided, CollectionReader will build one from available Arvados
64           configuration.
65         * keep_client: The KeepClient to use to download Collection data.
66           If not provided, CollectionReader will build one from available
67           Arvados configuration.
68         * num_retries: The default number of times to retry failed
69           service requests.  Default 0.  You may change this value
70           after instantiation, but note those changes may not
71           propagate to related objects like the Keep client.
72         """
73         self._api_client = api_client
74         self._keep_client = keep_client
75         self.num_retries = num_retries
76         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
77             self._manifest_locator = manifest_locator_or_text
78             self._manifest_text = None
79         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
80             self._manifest_locator = manifest_locator_or_text
81             self._manifest_text = None
82         elif re.match(util.manifest_pattern, manifest_locator_or_text):
83             self._manifest_text = manifest_locator_or_text
84             self._manifest_locator = None
85         else:
86             raise errors.ArgumentError(
87                 "Argument to CollectionReader must be a manifest or a collection UUID")
88         self._api_response = None
89         self._streams = None
90
91     def _populate_from_api_server(self):
92         # As in KeepClient itself, we must wait until the last
93         # possible moment to instantiate an API client, in order to
94         # avoid tripping up clients that don't have access to an API
95         # server.  If we do build one, make sure our Keep client uses
96         # it.  If instantiation fails, we'll fall back to the except
97         # clause, just like any other Collection lookup
98         # failure. Return an exception, or None if successful.
99         try:
100             if self._api_client is None:
101                 self._api_client = arvados.api('v1')
102                 self._keep_client = None  # Make a new one with the new api.
103             self._api_response = self._api_client.collections().get(
104                 uuid=self._manifest_locator).execute(
105                 num_retries=self.num_retries)
106             self._manifest_text = self._api_response['manifest_text']
107             return None
108         except Exception as e:
109             return e
110
111     def _populate_from_keep(self):
112         # Retrieve a manifest directly from Keep. This has a chance of
113         # working if [a] the locator includes a permission signature
114         # or [b] the Keep services are operating in world-readable
115         # mode. Return an exception, or None if successful.
116         try:
117             self._manifest_text = self._my_keep().get(
118                 self._manifest_locator, num_retries=self.num_retries)
119         except Exception as e:
120             return e
121
122     def _populate(self):
123         error_via_api = None
124         error_via_keep = None
125         should_try_keep = ((self._manifest_text is None) and
126                            util.keep_locator_pattern.match(
127                 self._manifest_locator))
128         if ((self._manifest_text is None) and
129             util.signed_locator_pattern.match(self._manifest_locator)):
130             error_via_keep = self._populate_from_keep()
131         if self._manifest_text is None:
132             error_via_api = self._populate_from_api_server()
133             if error_via_api is not None and not should_try_keep:
134                 raise error_via_api
135         if ((self._manifest_text is None) and
136             not error_via_keep and
137             should_try_keep):
138             # Looks like a keep locator, and we didn't already try keep above
139             error_via_keep = self._populate_from_keep()
140         if self._manifest_text is None:
141             # Nothing worked!
142             raise arvados.errors.NotFoundError(
143                 ("Failed to retrieve collection '{}' " +
144                  "from either API server ({}) or Keep ({})."
145                  ).format(
146                     self._manifest_locator,
147                     error_via_api,
148                     error_via_keep))
149         self._streams = [sline.split()
150                          for sline in self._manifest_text.split("\n")
151                          if sline]
152
153     def _populate_first(orig_func):
154         # Decorator for methods that read actual Collection data.
155         @functools.wraps(orig_func)
156         def wrapper(self, *args, **kwargs):
157             if self._streams is None:
158                 self._populate()
159             return orig_func(self, *args, **kwargs)
160         return wrapper
161
162     @_populate_first
163     def api_response(self):
164         """api_response() -> dict or None
165
166         Returns information about this Collection fetched from the API server.
167         If the Collection exists in Keep but not the API server, currently
168         returns None.  Future versions may provide a synthetic response.
169         """
170         return self._api_response
171
172     @_populate_first
173     def normalize(self):
174         # Rearrange streams
175         streams = {}
176         for s in self.all_streams():
177             for f in s.all_files():
178                 streamname, filename = split(s.name() + "/" + f.name())
179                 if streamname not in streams:
180                     streams[streamname] = {}
181                 if filename not in streams[streamname]:
182                     streams[streamname][filename] = []
183                 for r in f.segments:
184                     streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
185
186         self._streams = [normalize_stream(s, streams[s])
187                          for s in sorted(streams)]
188
189         # Regenerate the manifest text based on the normalized streams
190         self._manifest_text = ''.join(
191             [StreamReader(stream, keep=self._my_keep()).manifest_text()
192              for stream in self._streams])
193
194     @_populate_first
195     def open(self, streampath, filename=None):
196         """open(streampath[, filename]) -> file-like object
197
198         Pass in the path of a file to read from the Collection, either as a
199         single string or as two separate stream name and file name arguments.
200         This method returns a file-like object to read that file.
201         """
202         if filename is None:
203             streampath, filename = split(streampath)
204         keep_client = self._my_keep()
205         for stream_s in self._streams:
206             stream = StreamReader(stream_s, keep_client,
207                                   num_retries=self.num_retries)
208             if stream.name() == streampath:
209                 break
210         else:
211             raise ValueError("stream '{}' not found in Collection".
212                              format(streampath))
213         try:
214             return stream.files()[filename]
215         except KeyError:
216             raise ValueError("file '{}' not found in Collection stream '{}'".
217                              format(filename, streampath))
218
219     @_populate_first
220     def all_streams(self):
221         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
222                 for s in self._streams]
223
224     def all_files(self):
225         for s in self.all_streams():
226             for f in s.all_files():
227                 yield f
228
229     @_populate_first
230     def manifest_text(self, strip=False, normalize=False):
231         if normalize:
232             cr = CollectionReader(self.manifest_text())
233             cr.normalize()
234             return cr.manifest_text(strip=strip, normalize=False)
235         elif strip:
236             return self.stripped_manifest()
237         else:
238             return self._manifest_text
239
240
241 class _WriterFile(ArvadosFileBase):
242     def __init__(self, coll_writer, name):
243         super(_WriterFile, self).__init__(name, 'wb')
244         self.dest = coll_writer
245
246     def close(self):
247         super(_WriterFile, self).close()
248         self.dest.finish_current_file()
249
250     @ArvadosFileBase._before_close
251     def write(self, data):
252         self.dest.write(data)
253
254     @ArvadosFileBase._before_close
255     def writelines(self, seq):
256         for data in seq:
257             self.write(data)
258
259     @ArvadosFileBase._before_close
260     def flush(self):
261         self.dest.flush_data()
262
263
264 class CollectionWriter(CollectionBase):
265     def __init__(self, api_client=None, num_retries=0):
266         """Instantiate a CollectionWriter.
267
268         CollectionWriter lets you build a new Arvados Collection from scratch.
269         Write files to it.  The CollectionWriter will upload data to Keep as
270         appropriate, and provide you with the Collection manifest text when
271         you're finished.
272
273         Arguments:
274         * api_client: The API client to use to look up Collections.  If not
275           provided, CollectionReader will build one from available Arvados
276           configuration.
277         * num_retries: The default number of times to retry failed
278           service requests.  Default 0.  You may change this value
279           after instantiation, but note those changes may not
280           propagate to related objects like the Keep client.
281         """
282         self._api_client = api_client
283         self.num_retries = num_retries
284         self._keep_client = None
285         self._data_buffer = []
286         self._data_buffer_len = 0
287         self._current_stream_files = []
288         self._current_stream_length = 0
289         self._current_stream_locators = []
290         self._current_stream_name = '.'
291         self._current_file_name = None
292         self._current_file_pos = 0
293         self._finished_streams = []
294         self._close_file = None
295         self._queued_file = None
296         self._queued_dirents = deque()
297         self._queued_trees = deque()
298         self._last_open = None
299
300     def __exit__(self, exc_type, exc_value, traceback):
301         if exc_type is None:
302             self.finish()
303
304     def do_queued_work(self):
305         # The work queue consists of three pieces:
306         # * _queued_file: The file object we're currently writing to the
307         #   Collection.
308         # * _queued_dirents: Entries under the current directory
309         #   (_queued_trees[0]) that we want to write or recurse through.
310         #   This may contain files from subdirectories if
311         #   max_manifest_depth == 0 for this directory.
312         # * _queued_trees: Directories that should be written as separate
313         #   streams to the Collection.
314         # This function handles the smallest piece of work currently queued
315         # (current file, then current directory, then next directory) until
316         # no work remains.  The _work_THING methods each do a unit of work on
317         # THING.  _queue_THING methods add a THING to the work queue.
318         while True:
319             if self._queued_file:
320                 self._work_file()
321             elif self._queued_dirents:
322                 self._work_dirents()
323             elif self._queued_trees:
324                 self._work_trees()
325             else:
326                 break
327
328     def _work_file(self):
329         while True:
330             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
331             if not buf:
332                 break
333             self.write(buf)
334         self.finish_current_file()
335         if self._close_file:
336             self._queued_file.close()
337         self._close_file = None
338         self._queued_file = None
339
340     def _work_dirents(self):
341         path, stream_name, max_manifest_depth = self._queued_trees[0]
342         if stream_name != self.current_stream_name():
343             self.start_new_stream(stream_name)
344         while self._queued_dirents:
345             dirent = self._queued_dirents.popleft()
346             target = os.path.join(path, dirent)
347             if os.path.isdir(target):
348                 self._queue_tree(target,
349                                  os.path.join(stream_name, dirent),
350                                  max_manifest_depth - 1)
351             else:
352                 self._queue_file(target, dirent)
353                 break
354         if not self._queued_dirents:
355             self._queued_trees.popleft()
356
357     def _work_trees(self):
358         path, stream_name, max_manifest_depth = self._queued_trees[0]
359         d = util.listdir_recursive(
360             path, max_depth = (None if max_manifest_depth == 0 else 0))
361         if d:
362             self._queue_dirents(stream_name, d)
363         else:
364             self._queued_trees.popleft()
365
366     def _queue_file(self, source, filename=None):
367         assert (self._queued_file is None), "tried to queue more than one file"
368         if not hasattr(source, 'read'):
369             source = open(source, 'rb')
370             self._close_file = True
371         else:
372             self._close_file = False
373         if filename is None:
374             filename = os.path.basename(source.name)
375         self.start_new_file(filename)
376         self._queued_file = source
377
378     def _queue_dirents(self, stream_name, dirents):
379         assert (not self._queued_dirents), "tried to queue more than one tree"
380         self._queued_dirents = deque(sorted(dirents))
381
382     def _queue_tree(self, path, stream_name, max_manifest_depth):
383         self._queued_trees.append((path, stream_name, max_manifest_depth))
384
385     def write_file(self, source, filename=None):
386         self._queue_file(source, filename)
387         self.do_queued_work()
388
389     def write_directory_tree(self,
390                              path, stream_name='.', max_manifest_depth=-1):
391         self._queue_tree(path, stream_name, max_manifest_depth)
392         self.do_queued_work()
393
394     def write(self, newdata):
395         if hasattr(newdata, '__iter__'):
396             for s in newdata:
397                 self.write(s)
398             return
399         self._data_buffer.append(newdata)
400         self._data_buffer_len += len(newdata)
401         self._current_stream_length += len(newdata)
402         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
403             self.flush_data()
404
405     def open(self, streampath, filename=None):
406         """open(streampath[, filename]) -> file-like object
407
408         Pass in the path of a file to write to the Collection, either as a
409         single string or as two separate stream name and file name arguments.
410         This method returns a file-like object you can write to add it to the
411         Collection.
412
413         You may only have one file object from the Collection open at a time,
414         so be sure to close the object when you're done.  Using the object in
415         a with statement makes that easy::
416
417           with cwriter.open('./doc/page1.txt') as outfile:
418               outfile.write(page1_data)
419           with cwriter.open('./doc/page2.txt') as outfile:
420               outfile.write(page2_data)
421         """
422         if filename is None:
423             streampath, filename = split(streampath)
424         if self._last_open and not self._last_open.closed:
425             raise errors.AssertionError(
426                 "can't open '{}' when '{}' is still open".format(
427                     filename, self._last_open.name))
428         if streampath != self.current_stream_name():
429             self.start_new_stream(streampath)
430         self.set_current_file_name(filename)
431         self._last_open = _WriterFile(self, filename)
432         return self._last_open
433
434     def flush_data(self):
435         data_buffer = ''.join(self._data_buffer)
436         if data_buffer:
437             self._current_stream_locators.append(
438                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
439             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
440             self._data_buffer_len = len(self._data_buffer[0])
441
442     def start_new_file(self, newfilename=None):
443         self.finish_current_file()
444         self.set_current_file_name(newfilename)
445
446     def set_current_file_name(self, newfilename):
447         if re.search(r'[\t\n]', newfilename):
448             raise errors.AssertionError(
449                 "Manifest filenames cannot contain whitespace: %s" %
450                 newfilename)
451         elif re.search(r'\x00', newfilename):
452             raise errors.AssertionError(
453                 "Manifest filenames cannot contain NUL characters: %s" %
454                 newfilename)
455         self._current_file_name = newfilename
456
457     def current_file_name(self):
458         return self._current_file_name
459
460     def finish_current_file(self):
461         if self._current_file_name is None:
462             if self._current_file_pos == self._current_stream_length:
463                 return
464             raise errors.AssertionError(
465                 "Cannot finish an unnamed file " +
466                 "(%d bytes at offset %d in '%s' stream)" %
467                 (self._current_stream_length - self._current_file_pos,
468                  self._current_file_pos,
469                  self._current_stream_name))
470         self._current_stream_files.append([
471                 self._current_file_pos,
472                 self._current_stream_length - self._current_file_pos,
473                 self._current_file_name])
474         self._current_file_pos = self._current_stream_length
475         self._current_file_name = None
476
477     def start_new_stream(self, newstreamname='.'):
478         self.finish_current_stream()
479         self.set_current_stream_name(newstreamname)
480
481     def set_current_stream_name(self, newstreamname):
482         if re.search(r'[\t\n]', newstreamname):
483             raise errors.AssertionError(
484                 "Manifest stream names cannot contain whitespace")
485         self._current_stream_name = '.' if newstreamname=='' else newstreamname
486
487     def current_stream_name(self):
488         return self._current_stream_name
489
490     def finish_current_stream(self):
491         self.finish_current_file()
492         self.flush_data()
493         if not self._current_stream_files:
494             pass
495         elif self._current_stream_name is None:
496             raise errors.AssertionError(
497                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
498                 (self._current_stream_length, len(self._current_stream_files)))
499         else:
500             if not self._current_stream_locators:
501                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
502             self._finished_streams.append([self._current_stream_name,
503                                            self._current_stream_locators,
504                                            self._current_stream_files])
505         self._current_stream_files = []
506         self._current_stream_length = 0
507         self._current_stream_locators = []
508         self._current_stream_name = None
509         self._current_file_pos = 0
510         self._current_file_name = None
511
512     def finish(self):
513         # Store the manifest in Keep and return its locator.
514         return self._my_keep().put(self.manifest_text())
515
516     def portable_data_hash(self):
517         stripped = self.stripped_manifest()
518         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
519
520     def manifest_text(self):
521         self.finish_current_stream()
522         manifest = ''
523
524         for stream in self._finished_streams:
525             if not re.search(r'^\.(/.*)?$', stream[0]):
526                 manifest += './'
527             manifest += stream[0].replace(' ', '\\040')
528             manifest += ' ' + ' '.join(stream[1])
529             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
530             manifest += "\n"
531
532         return manifest
533
534     def data_locators(self):
535         ret = []
536         for name, locators, files in self._finished_streams:
537             ret += locators
538         return ret
539
540
541 class ResumableCollectionWriter(CollectionWriter):
542     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
543                    '_current_stream_locators', '_current_stream_name',
544                    '_current_file_name', '_current_file_pos', '_close_file',
545                    '_data_buffer', '_dependencies', '_finished_streams',
546                    '_queued_dirents', '_queued_trees']
547
548     def __init__(self, api_client=None, num_retries=0):
549         self._dependencies = {}
550         super(ResumableCollectionWriter, self).__init__(
551             api_client, num_retries=num_retries)
552
553     @classmethod
554     def from_state(cls, state, *init_args, **init_kwargs):
555         # Try to build a new writer from scratch with the given state.
556         # If the state is not suitable to resume (because files have changed,
557         # been deleted, aren't predictable, etc.), raise a
558         # StaleWriterStateError.  Otherwise, return the initialized writer.
559         # The caller is responsible for calling writer.do_queued_work()
560         # appropriately after it's returned.
561         writer = cls(*init_args, **init_kwargs)
562         for attr_name in cls.STATE_PROPS:
563             attr_value = state[attr_name]
564             attr_class = getattr(writer, attr_name).__class__
565             # Coerce the value into the same type as the initial value, if
566             # needed.
567             if attr_class not in (type(None), attr_value.__class__):
568                 attr_value = attr_class(attr_value)
569             setattr(writer, attr_name, attr_value)
570         # Check dependencies before we try to resume anything.
571         if any(KeepLocator(ls).permission_expired()
572                for ls in writer._current_stream_locators):
573             raise errors.StaleWriterStateError(
574                 "locators include expired permission hint")
575         writer.check_dependencies()
576         if state['_current_file'] is not None:
577             path, pos = state['_current_file']
578             try:
579                 writer._queued_file = open(path, 'rb')
580                 writer._queued_file.seek(pos)
581             except IOError as error:
582                 raise errors.StaleWriterStateError(
583                     "failed to reopen active file {}: {}".format(path, error))
584         return writer
585
586     def check_dependencies(self):
587         for path, orig_stat in self._dependencies.items():
588             if not S_ISREG(orig_stat[ST_MODE]):
589                 raise errors.StaleWriterStateError("{} not file".format(path))
590             try:
591                 now_stat = tuple(os.stat(path))
592             except OSError as error:
593                 raise errors.StaleWriterStateError(
594                     "failed to stat {}: {}".format(path, error))
595             if ((not S_ISREG(now_stat[ST_MODE])) or
596                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
597                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
598                 raise errors.StaleWriterStateError("{} changed".format(path))
599
600     def dump_state(self, copy_func=lambda x: x):
601         state = {attr: copy_func(getattr(self, attr))
602                  for attr in self.STATE_PROPS}
603         if self._queued_file is None:
604             state['_current_file'] = None
605         else:
606             state['_current_file'] = (os.path.realpath(self._queued_file.name),
607                                       self._queued_file.tell())
608         return state
609
610     def _queue_file(self, source, filename=None):
611         try:
612             src_path = os.path.realpath(source)
613         except Exception:
614             raise errors.AssertionError("{} not a file path".format(source))
615         try:
616             path_stat = os.stat(src_path)
617         except OSError as stat_error:
618             path_stat = None
619         super(ResumableCollectionWriter, self)._queue_file(source, filename)
620         fd_stat = os.fstat(self._queued_file.fileno())
621         if not S_ISREG(fd_stat.st_mode):
622             # We won't be able to resume from this cache anyway, so don't
623             # worry about further checks.
624             self._dependencies[source] = tuple(fd_stat)
625         elif path_stat is None:
626             raise errors.AssertionError(
627                 "could not stat {}: {}".format(source, stat_error))
628         elif path_stat.st_ino != fd_stat.st_ino:
629             raise errors.AssertionError(
630                 "{} changed between open and stat calls".format(source))
631         else:
632             self._dependencies[src_path] = tuple(fd_stat)
633
634     def write(self, data):
635         if self._queued_file is None:
636             raise errors.AssertionError(
637                 "resumable writer can't accept unsourced data")
638         return super(ResumableCollectionWriter, self).write(data)
639
640
641 class Collection(object):
642     def __init__(self):
643         self.items = {}
644
645     def find_or_create(self, path):
646         p = path.split("/")
647         if p[0] == '.':
648             del p[0]
649
650         if len(p) > 0:
651             item = self.items.get(p[0])
652             if len(p) == 1:
653                 # item must be a file
654                 if item is None:
655                     # create new file
656                     item = ArvadosFile(p[0], 'wb', [], [])
657                     self.items[p[0]] = item
658                 return item
659             else:
660                 if item is None:
661                     # create new collection
662                     item = Collection()
663                     self.items[p[0]] = item
664                 del p[0]
665                 return item.find_or_create("/".join(p))
666         else:
667             return self
668
669
670 def import_manifest(manifest_text):
671     c = Collection()
672
673     STREAM_NAME = 0
674     BLOCKS = 1
675     SEGMENTS = 2
676
677     stream_name = None
678     state = STREAM_NAME
679
680     for n in re.finditer(r'([^ \n]+)([ \n])', manifest_text):
681         tok = n.group(1)
682         sep = n.group(2)
683         if state == STREAM_NAME:
684             # starting a new stream
685             stream_name = tok.replace('\\040', ' ')
686             blocks = []
687             segments = []
688             streamoffset = 0L
689             state = BLOCKS
690             continue
691
692         if state == BLOCKS:
693             s = re.match(r'[0-9a-f]{32}\+(\d)+(\+\S+)*', tok)
694             if s:
695                 blocksize = long(s.group(1))
696                 blocks.append([tok, blocksize, streamoffset])
697                 streamoffset += blocksize
698             else:
699                 state = SEGMENTS
700
701         if state == SEGMENTS:
702             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
703             if s:
704                 pos = long(s.group(1))
705                 size = long(s.group(2))
706                 name = s.group(3).replace('\\040', ' ')
707                 f = c.find_or_create("%s/%s" % (stream_name, name))
708                 f.add_segment(blocks, pos, size)
709             else:
710                 # error!
711                 raise errors.SyntaxError("Invalid manifest format")
712
713         if sep == "\n":
714             stream_name = None
715             state = STREAM_NAME
716
717     return c
718
719 def export_manifest(item, stream_name="."):
720     buf = ""
721     print item
722     if isinstance(item, Collection):
723         for i, j in item.items.values():
724             buf += export_manifest(j, stream_name)
725     else:
726         buf += stream_name
727         buf += " "
728         buf += item.segments
729     return buf