3198: Adjust language of errors to say what the error is instead of saying what the
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace")
311         self._current_stream_name = '.' if newstreamname=='' else newstreamname
312
313     def current_stream_name(self):
314         return self._current_stream_name
315
316     def finish_current_stream(self):
317         self.finish_current_file()
318         self.flush_data()
319         if not self._current_stream_files:
320             pass
321         elif self._current_stream_name is None:
322             raise errors.AssertionError(
323                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324                 (self._current_stream_length, len(self._current_stream_files)))
325         else:
326             if not self._current_stream_locators:
327                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328             self._finished_streams.append([self._current_stream_name,
329                                            self._current_stream_locators,
330                                            self._current_stream_files])
331         self._current_stream_files = []
332         self._current_stream_length = 0
333         self._current_stream_locators = []
334         self._current_stream_name = None
335         self._current_file_pos = 0
336         self._current_file_name = None
337
338     def finish(self):
339         """Store the manifest in Keep and return its locator.
340
341         This is useful for storing manifest fragments (task outputs)
342         temporarily in Keep during a Crunch job.
343
344         In other cases you should make a collection instead, by
345         sending manifest_text() to the API server's "create
346         collection" endpoint.
347         """
348         return self._my_keep().put(self.manifest_text(), copies=self.replication)
349
350     def portable_data_hash(self):
351         stripped = self.stripped_manifest()
352         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
353
354     def manifest_text(self):
355         self.finish_current_stream()
356         manifest = ''
357
358         for stream in self._finished_streams:
359             if not re.search(r'^\.(/.*)?$', stream[0]):
360                 manifest += './'
361             manifest += stream[0].replace(' ', '\\040')
362             manifest += ' ' + ' '.join(stream[1])
363             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
364             manifest += "\n"
365
366         return manifest
367
368     def data_locators(self):
369         ret = []
370         for name, locators, files in self._finished_streams:
371             ret += locators
372         return ret
373
374
375 class ResumableCollectionWriter(CollectionWriter):
376     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377                    '_current_stream_locators', '_current_stream_name',
378                    '_current_file_name', '_current_file_pos', '_close_file',
379                    '_data_buffer', '_dependencies', '_finished_streams',
380                    '_queued_dirents', '_queued_trees']
381
382     def __init__(self, api_client=None, **kwargs):
383         self._dependencies = {}
384         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
385
386     @classmethod
387     def from_state(cls, state, *init_args, **init_kwargs):
388         # Try to build a new writer from scratch with the given state.
389         # If the state is not suitable to resume (because files have changed,
390         # been deleted, aren't predictable, etc.), raise a
391         # StaleWriterStateError.  Otherwise, return the initialized writer.
392         # The caller is responsible for calling writer.do_queued_work()
393         # appropriately after it's returned.
394         writer = cls(*init_args, **init_kwargs)
395         for attr_name in cls.STATE_PROPS:
396             attr_value = state[attr_name]
397             attr_class = getattr(writer, attr_name).__class__
398             # Coerce the value into the same type as the initial value, if
399             # needed.
400             if attr_class not in (type(None), attr_value.__class__):
401                 attr_value = attr_class(attr_value)
402             setattr(writer, attr_name, attr_value)
403         # Check dependencies before we try to resume anything.
404         if any(KeepLocator(ls).permission_expired()
405                for ls in writer._current_stream_locators):
406             raise errors.StaleWriterStateError(
407                 "locators include expired permission hint")
408         writer.check_dependencies()
409         if state['_current_file'] is not None:
410             path, pos = state['_current_file']
411             try:
412                 writer._queued_file = open(path, 'rb')
413                 writer._queued_file.seek(pos)
414             except IOError as error:
415                 raise errors.StaleWriterStateError(
416                     "failed to reopen active file {}: {}".format(path, error))
417         return writer
418
419     def check_dependencies(self):
420         for path, orig_stat in self._dependencies.items():
421             if not S_ISREG(orig_stat[ST_MODE]):
422                 raise errors.StaleWriterStateError("{} not file".format(path))
423             try:
424                 now_stat = tuple(os.stat(path))
425             except OSError as error:
426                 raise errors.StaleWriterStateError(
427                     "failed to stat {}: {}".format(path, error))
428             if ((not S_ISREG(now_stat[ST_MODE])) or
429                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431                 raise errors.StaleWriterStateError("{} changed".format(path))
432
433     def dump_state(self, copy_func=lambda x: x):
434         state = {attr: copy_func(getattr(self, attr))
435                  for attr in self.STATE_PROPS}
436         if self._queued_file is None:
437             state['_current_file'] = None
438         else:
439             state['_current_file'] = (os.path.realpath(self._queued_file.name),
440                                       self._queued_file.tell())
441         return state
442
443     def _queue_file(self, source, filename=None):
444         try:
445             src_path = os.path.realpath(source)
446         except Exception:
447             raise errors.AssertionError("{} not a file path".format(source))
448         try:
449             path_stat = os.stat(src_path)
450         except OSError as stat_error:
451             path_stat = None
452         super(ResumableCollectionWriter, self)._queue_file(source, filename)
453         fd_stat = os.fstat(self._queued_file.fileno())
454         if not S_ISREG(fd_stat.st_mode):
455             # We won't be able to resume from this cache anyway, so don't
456             # worry about further checks.
457             self._dependencies[source] = tuple(fd_stat)
458         elif path_stat is None:
459             raise errors.AssertionError(
460                 "could not stat {}: {}".format(source, stat_error))
461         elif path_stat.st_ino != fd_stat.st_ino:
462             raise errors.AssertionError(
463                 "{} changed between open and stat calls".format(source))
464         else:
465             self._dependencies[src_path] = tuple(fd_stat)
466
467     def write(self, data):
468         if self._queued_file is None:
469             raise errors.AssertionError(
470                 "resumable writer can't accept unsourced data")
471         return super(ResumableCollectionWriter, self).write(data)
472
473
474 ADD = "add"
475 DEL = "del"
476 MOD = "mod"
477 FILE = "file"
478 COLLECTION = "collection"
479
480 class RichCollectionBase(CollectionBase):
481     """Base class for Collections and Subcollections.
482
483     Implements the majority of functionality relating to accessing items in the
484     Collection.
485
486     """
487
488     def __init__(self, parent=None):
489         self.parent = parent
490         self._modified = True
491         self._callback = None
492         self._items = {}
493
494     def _my_api(self):
495         raise NotImplementedError()
496
497     def _my_keep(self):
498         raise NotImplementedError()
499
500     def _my_block_manager(self):
501         raise NotImplementedError()
502
503     def writable(self):
504         raise NotImplementedError()
505
506     def root_collection(self):
507         raise NotImplementedError()
508
509     def notify(self, event, collection, name, item):
510         raise NotImplementedError()
511
512     def stream_name(self):
513         raise NotImplementedError()
514
515     @must_be_writable
516     @synchronized
517     def find_or_create(self, path, create_type):
518         """Recursively search the specified file path.
519
520         May return either a `Collection` or `ArvadosFile`.  If not found, will
521         create a new item at the specified path based on `create_type`.  Will
522         create intermediate subcollections needed to contain the final item in
523         the path.
524
525         :create_type:
526           One of `arvados.collection.FILE` or
527           `arvados.collection.COLLECTION`.  If the path is not found, and value
528           of create_type is FILE then create and return a new ArvadosFile for
529           the last path component.  If COLLECTION, then create and return a new
530           Collection for the last path component.
531
532         """
533
534         pathcomponents = path.split("/", 1)
535         if pathcomponents[0]:
536             item = self._items.get(pathcomponents[0])
537             if len(pathcomponents) == 1:
538                 if item is None:
539                     # create new file
540                     if create_type == COLLECTION:
541                         item = Subcollection(self, pathcomponents[0])
542                     else:
543                         item = ArvadosFile(self, pathcomponents[0])
544                     self._items[pathcomponents[0]] = item
545                     self._modified = True
546                     self.notify(ADD, self, pathcomponents[0], item)
547                 return item
548             else:
549                 if item is None:
550                     # create new collection
551                     item = Subcollection(self, pathcomponents[0])
552                     self._items[pathcomponents[0]] = item
553                     self._modified = True
554                     self.notify(ADD, self, pathcomponents[0], item)
555                 if isinstance(item, RichCollectionBase):
556                     return item.find_or_create(pathcomponents[1], create_type)
557                 else:
558                     raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
559         else:
560             return self
561
562     @synchronized
563     def find(self, path):
564         """Recursively search the specified file path.
565
566         May return either a Collection or ArvadosFile.  Return None if not
567         found.
568
569         """
570         if not path:
571             raise errors.ArgumentError("Parameter 'path' is empty.")
572
573         pathcomponents = path.split("/", 1)
574         item = self._items.get(pathcomponents[0])
575         if len(pathcomponents) == 1:
576             return item
577         else:
578             if isinstance(item, RichCollectionBase):
579                 if pathcomponents[1]:
580                     return item.find(pathcomponents[1])
581                 else:
582                     return item
583             else:
584                 raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
585
586     def mkdirs(self, path):
587         """Recursive subcollection create.
588
589         Like `os.mkdirs()`.  Will create intermediate subcollections needed to
590         contain the leaf subcollection path.
591
592         """
593         return self.find_or_create(path, COLLECTION)
594
595     def open(self, path, mode="r"):
596         """Open a file-like object for access.
597
598         :path:
599           path to a file in the collection
600         :mode:
601           one of "r", "r+", "w", "w+", "a", "a+"
602           :"r":
603             opens for reading
604           :"r+":
605             opens for reading and writing.  Reads/writes share a file pointer.
606           :"w", "w+":
607             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
608           :"a", "a+":
609             opens for reading and writing.  All writes are appended to
610             the end of the file.  Writing does not affect the file pointer for
611             reading.
612         """
613         mode = mode.replace("b", "")
614         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
615             raise errors.ArgumentError("Bad mode '%s'" % mode)
616         create = (mode != "r")
617
618         if create and not self.writable():
619             raise IOError(errno.EROFS, "Collection is read only")
620
621         if create:
622             arvfile = self.find_or_create(path, FILE)
623         else:
624             arvfile = self.find(path)
625
626         if arvfile is None:
627             raise IOError(errno.ENOENT, "File not found")
628         if not isinstance(arvfile, ArvadosFile):
629             raise IOError(errno.EISDIR, "Is a directory: %s" % path)
630
631         if mode[0] == "w":
632             arvfile.truncate(0)
633
634         name = os.path.basename(path)
635
636         if mode == "r":
637             return ArvadosFileReader(arvfile, mode, num_retries=self.num_retries)
638         else:
639             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
640
641     @synchronized
642     def modified(self):
643         """Test if the collection (or any subcollection or file) has been modified."""
644         if self._modified:
645             return True
646         for v in self._items.values():
647             if v.modified():
648                 return True
649         return False
650
651     @synchronized
652     def set_unmodified(self):
653         """Recursively clear modified flag."""
654         self._modified = False
655         for k,v in self._items.items():
656             v.set_unmodified()
657
658     @synchronized
659     def __iter__(self):
660         """Iterate over names of files and collections contained in this collection."""
661         return iter(self._items.keys())
662
663     @synchronized
664     def __getitem__(self, k):
665         """Get a file or collection that is directly contained by this collection.
666
667         If you want to search a path, use `find()` instead.
668
669         """
670         return self._items[k]
671
672     @synchronized
673     def __contains__(self, k):
674         """Test if there is a file or collection a directly contained by this collection."""
675         return k in self._items
676
677     @synchronized
678     def __len__(self):
679         """Get the number of items directly contained in this collection."""
680         return len(self._items)
681
682     @must_be_writable
683     @synchronized
684     def __delitem__(self, p):
685         """Delete an item by name which is directly contained by this collection."""
686         del self._items[p]
687         self._modified = True
688         self.notify(DEL, self, p, None)
689
690     @synchronized
691     def keys(self):
692         """Get a list of names of files and collections directly contained in this collection."""
693         return self._items.keys()
694
695     @synchronized
696     def values(self):
697         """Get a list of files and collection objects directly contained in this collection."""
698         return self._items.values()
699
700     @synchronized
701     def items(self):
702         """Get a list of (name, object) tuples directly contained in this collection."""
703         return self._items.items()
704
705     def exists(self, path):
706         """Test if there is a file or collection at `path`."""
707         return self.find(path) is not None
708
709     @must_be_writable
710     @synchronized
711     def remove(self, path, recursive=False):
712         """Remove the file or subcollection (directory) at `path`.
713
714         :recursive:
715           Specify whether to remove non-empty subcollections (True), or raise an error (False).
716         """
717
718         if not path:
719             raise errors.ArgumentError("Parameter 'path' is empty.")
720
721         pathcomponents = path.split("/", 1)
722         item = self._items.get(pathcomponents[0])
723         if item is None:
724             raise IOError(errno.ENOENT, "File not found")
725         if len(pathcomponents) == 1:
726             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
727                 raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
728             deleteditem = self._items[pathcomponents[0]]
729             del self._items[pathcomponents[0]]
730             self._modified = True
731             self.notify(DEL, self, pathcomponents[0], deleteditem)
732         else:
733             item.remove(pathcomponents[1])
734
735     def _clonefrom(self, source):
736         for k,v in source.items():
737             self._items[k] = v.clone(self, k)
738
739     def clone(self):
740         raise NotImplementedError()
741
742     @must_be_writable
743     @synchronized
744     def add(self, source_obj, target_name, overwrite=False, reparent=False):
745         """Copy or move a file or subcollection to this collection.
746
747         :source_obj:
748           An ArvadosFile, or Subcollection object
749
750         :target_name:
751           Destination item name.  If the target name already exists and is a
752           file, this will raise an error unless you specify `overwrite=True`.
753
754         :overwrite:
755           Whether to overwrite target file if it already exists.
756
757         :reparent:
758           If True, source_obj will be moved from its parent collection to this collection.
759           If False, source_obj will be copied and the parent collection will be
760           unmodified.
761
762         """
763
764         if target_name in self and not overwrite:
765             raise IOError(errno.EEXIST, "File already exists")
766
767         modified_from = None
768         if target_name in self:
769             modified_from = self[target_name]
770
771         # Actually make the move or copy.
772         if reparent:
773             source_obj._reparent(self, target_name)
774             item = source_obj
775         else:
776             item = source_obj.clone(self, target_name)
777
778         self._items[target_name] = item
779         self._modified = True
780
781         if modified_from:
782             self.notify(MOD, self, target_name, (modified_from, item))
783         else:
784             self.notify(ADD, self, target_name, item)
785
786     def _get_src_target(self, source, target_path, source_collection, create_dest):
787         if source_collection is None:
788             source_collection = self
789
790         # Find the object
791         if isinstance(source, basestring):
792             source_obj = source_collection.find(source)
793             if source_obj is None:
794                 raise IOError(errno.ENOENT, "File not found")
795             sourcecomponents = source.split("/")
796         else:
797             source_obj = source
798             sourcecomponents = None
799
800         # Find parent collection the target path
801         targetcomponents = target_path.split("/")
802
803         # Determine the name to use.
804         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
805
806         if not target_name:
807             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
808
809         if create_dest:
810             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
811         else:
812             if len(targetcomponents) > 1:
813                 target_dir = self.find("/".join(targetcomponents[0:-1]))
814             else:
815                 target_dir = self
816
817         if target_dir is None:
818             raise IOError(errno.ENOENT, "Target directory not found.")
819
820         if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
821             target_dir = target_dir[target_name]
822             target_name = sourcecomponents[-1]
823
824         return (source_obj, target_dir, target_name)
825
826     @must_be_writable
827     @synchronized
828     def copy(self, source, target_path, source_collection=None, overwrite=False):
829         """Copy a file or subcollection to a new path in this collection.
830
831         :source:
832           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
833
834         :target_path:
835           Destination file or path.  If the target path already exists and is a
836           subcollection, the item will be placed inside the subcollection.  If
837           the target path already exists and is a file, this will raise an error
838           unless you specify `overwrite=True`.
839
840         :source_collection:
841           Collection to copy `source_path` from (default `self`)
842
843         :overwrite:
844           Whether to overwrite target file if it already exists.
845         """
846
847         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
848         target_dir.add(source_obj, target_name, overwrite, False)
849
850     @must_be_writable
851     @synchronized
852     def rename(self, source, target_path, source_collection=None, overwrite=False):
853         """Move a file or subcollection from `source_collection` to a new path in this collection.
854
855         :source:
856           A string with a path to source file or subcollection.
857
858         :target_path:
859           Destination file or path.  If the target path already exists and is a
860           subcollection, the item will be placed inside the subcollection.  If
861           the target path already exists and is a file, this will raise an error
862           unless you specify `overwrite=True`.
863
864         :source_collection:
865           Collection to copy `source_path` from (default `self`)
866
867         :overwrite:
868           Whether to overwrite target file if it already exists.
869         """
870
871         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
872         if not source_obj.writable():
873             raise IOError(errno.EROFS, "Source collection is read only.")
874         target_dir.add(source_obj, target_name, overwrite, True)
875
876     def portable_manifest_text(self, stream_name="."):
877         """Get the manifest text for this collection, sub collections and files.
878
879         This method does not flush outstanding blocks to Keep.  It will return
880         a normalized manifest with access tokens stripped.
881
882         :stream_name:
883           Name to use for this stream (directory)
884
885         """
886         return self._get_manifest_text(stream_name, True, True)
887
888     def manifest_text(self, stream_name=".", strip=False, normalize=False):
889         """Get the manifest text for this collection, sub collections and files.
890
891         This method will flush outstanding blocks to Keep.  By default, it will
892         not normalize an unmodified manifest or strip access tokens.
893
894         :stream_name:
895           Name to use for this stream (directory)
896
897         :strip:
898           If True, remove signing tokens from block locators if present.
899           If False (default), block locators are left unchanged.
900
901         :normalize:
902           If True, always export the manifest text in normalized form
903           even if the Collection is not modified.  If False (default) and the collection
904           is not modified, return the original manifest text even if it is not
905           in normalized form.
906
907         """
908
909         self._my_block_manager().commit_all()
910         return self._get_manifest_text(stream_name, strip, normalize)
911
912     @synchronized
913     def _get_manifest_text(self, stream_name, strip, normalize):
914         """Get the manifest text for this collection, sub collections and files.
915
916         :stream_name:
917           Name to use for this stream (directory)
918
919         :strip:
920           If True, remove signing tokens from block locators if present.
921           If False (default), block locators are left unchanged.
922
923         :normalize:
924           If True, always export the manifest text in normalized form
925           even if the Collection is not modified.  If False (default) and the collection
926           is not modified, return the original manifest text even if it is not
927           in normalized form.
928
929         """
930
931         if self.modified() or self._manifest_text is None or normalize:
932             stream = {}
933             buf = []
934             sorted_keys = sorted(self.keys())
935             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
936                 # Create a stream per file `k`
937                 arvfile = self[filename]
938                 filestream = []
939                 for segment in arvfile.segments():
940                     loc = segment.locator
941                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
942                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
943                     if strip:
944                         loc = KeepLocator(loc).stripped()
945                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
946                                          segment.segment_offset, segment.range_size))
947                 stream[filename] = filestream
948             if stream:
949                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
950             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
951                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
952             return "".join(buf)
953         else:
954             if strip:
955                 return self.stripped_manifest()
956             else:
957                 return self._manifest_text
958
959     @synchronized
960     def diff(self, end_collection, prefix=".", holding_collection=None):
961         """Generate list of add/modify/delete actions.
962
963         When given to `apply`, will change `self` to match `end_collection`
964
965         """
966         changes = []
967         if holding_collection is None:
968             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
969         for k in self:
970             if k not in end_collection:
971                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
972         for k in end_collection:
973             if k in self:
974                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
975                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
976                 elif end_collection[k] != self[k]:
977                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
978             else:
979                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
980         return changes
981
982     @must_be_writable
983     @synchronized
984     def apply(self, changes):
985         """Apply changes from `diff`.
986
987         If a change conflicts with a local change, it will be saved to an
988         alternate path indicating the conflict.
989
990         """
991         if changes:
992             self._modified = True
993         for change in changes:
994             event_type = change[0]
995             path = change[1]
996             initial = change[2]
997             local = self.find(path)
998             conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
999                                                                     time.gmtime()))
1000             if event_type == ADD:
1001                 if local is None:
1002                     # No local file at path, safe to copy over new file
1003                     self.copy(initial, path)
1004                 elif local is not None and local != initial:
1005                     # There is already local file and it is different:
1006                     # save change to conflict file.
1007                     self.copy(initial, conflictpath)
1008             elif event_type == MOD:
1009                 final = change[3]
1010                 if local == initial:
1011                     # Local matches the "initial" item so it has not
1012                     # changed locally and is safe to update.
1013                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1014                         # Replace contents of local file with new contents
1015                         local.replace_contents(final)
1016                     else:
1017                         # Overwrite path with new item; this can happen if
1018                         # path was a file and is now a collection or vice versa
1019                         self.copy(final, path, overwrite=True)
1020                 else:
1021                     # Local is missing (presumably deleted) or local doesn't
1022                     # match the "start" value, so save change to conflict file
1023                     self.copy(final, conflictpath)
1024             elif event_type == DEL:
1025                 if local == initial:
1026                     # Local item matches "initial" value, so it is safe to remove.
1027                     self.remove(path, recursive=True)
1028                 # else, the file is modified or already removed, in either
1029                 # case we don't want to try to remove it.
1030
1031     def portable_data_hash(self):
1032         """Get the portable data hash for this collection's manifest."""
1033         stripped = self.portable_manifest_text()
1034         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1035
1036     @synchronized
1037     def subscribe(self, callback):
1038         if self._callback is None:
1039             self._callback = callback
1040         else:
1041             raise errors.ArgumentError("A callback is already set on this collection.")
1042
1043     @synchronized
1044     def unsubscribe(self):
1045         if self._callback is not None:
1046             self._callback = None
1047
1048     @synchronized
1049     def notify(self, event, collection, name, item):
1050         if self._callback:
1051             self._callback(event, collection, name, item)
1052         self.root_collection().notify(event, collection, name, item)
1053
1054     @synchronized
1055     def __eq__(self, other):
1056         if other is self:
1057             return True
1058         if not isinstance(other, RichCollectionBase):
1059             return False
1060         if len(self._items) != len(other):
1061             return False
1062         for k in self._items:
1063             if k not in other:
1064                 return False
1065             if self._items[k] != other[k]:
1066                 return False
1067         return True
1068
1069     def __ne__(self, other):
1070         return not self.__eq__(other)
1071
1072     @synchronized
1073     def flush(self):
1074         """Flush bufferblocks to Keep."""
1075         for e in self.values():
1076             e.flush()
1077
1078
1079 class Collection(RichCollectionBase):
1080     """Represents the root of an Arvados Collection.
1081
1082     This class is threadsafe.  The root collection object, all subcollections
1083     and files are protected by a single lock (i.e. each access locks the entire
1084     collection).
1085
1086     Brief summary of
1087     useful methods:
1088
1089     :To read an existing file:
1090       `c.open("myfile", "r")`
1091
1092     :To write a new file:
1093       `c.open("myfile", "w")`
1094
1095     :To determine if a file exists:
1096       `c.find("myfile") is not None`
1097
1098     :To copy a file:
1099       `c.copy("source", "dest")`
1100
1101     :To delete a file:
1102       `c.remove("myfile")`
1103
1104     :To save to an existing collection record:
1105       `c.save()`
1106
1107     :To save a new collection record:
1108     `c.save_new()`
1109
1110     :To merge remote changes into this object:
1111       `c.update()`
1112
1113     Must be associated with an API server Collection record (during
1114     initialization, or using `save_new`) to use `save` or `update`
1115
1116     """
1117
1118     def __init__(self, manifest_locator_or_text=None,
1119                  api_client=None,
1120                  keep_client=None,
1121                  num_retries=None,
1122                  parent=None,
1123                  apiconfig=None,
1124                  block_manager=None):
1125         """Collection constructor.
1126
1127         :manifest_locator_or_text:
1128           One of Arvados collection UUID, block locator of
1129           a manifest, raw manifest text, or None (to create an empty collection).
1130         :parent:
1131           the parent Collection, may be None.
1132         :apiconfig:
1133           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1134           Prefer this over supplying your own api_client and keep_client (except in testing).
1135           Will use default config settings if not specified.
1136         :api_client:
1137           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1138         :keep_client:
1139           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1140         :num_retries:
1141           the number of retries for API and Keep requests.
1142         :block_manager:
1143           the block manager to use.  If not specified, create one.
1144
1145         """
1146         super(Collection, self).__init__(parent)
1147         self._api_client = api_client
1148         self._keep_client = keep_client
1149         self._block_manager = block_manager
1150
1151         if apiconfig:
1152             self._config = apiconfig
1153         else:
1154             self._config = config.settings()
1155
1156         self.num_retries = num_retries if num_retries is not None else 0
1157         self._manifest_locator = None
1158         self._manifest_text = None
1159         self._api_response = None
1160
1161         self.lock = threading.RLock()
1162         self.events = None
1163
1164         if manifest_locator_or_text:
1165             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1166                 self._manifest_locator = manifest_locator_or_text
1167             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1168                 self._manifest_locator = manifest_locator_or_text
1169             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1170                 self._manifest_text = manifest_locator_or_text
1171             else:
1172                 raise errors.ArgumentError(
1173                     "Argument to CollectionReader is not a manifest or a collection UUID")
1174
1175             try:
1176                 self._populate()
1177             except (IOError, errors.SyntaxError) as e:
1178                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1179
1180     def root_collection(self):
1181         return self
1182
1183     def stream_name(self):
1184         return "."
1185
1186     def writable(self):
1187         return True
1188
1189     @synchronized
1190     @retry_method
1191     def update(self, other=None, num_retries=None):
1192         """Merge the latest collection on the API server with the current collection."""
1193
1194         if other is None:
1195             if self._manifest_locator is None:
1196                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1197             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1198             other = CollectionReader(response["manifest_text"])
1199         baseline = CollectionReader(self._manifest_text)
1200         self.apply(baseline.diff(other))
1201         self._manifest_text = self.manifest_text()
1202
1203     @synchronized
1204     def _my_api(self):
1205         if self._api_client is None:
1206             self._api_client = ThreadSafeApiCache(self._config)
1207             self._keep_client = self._api_client.keep
1208         return self._api_client
1209
1210     @synchronized
1211     def _my_keep(self):
1212         if self._keep_client is None:
1213             if self._api_client is None:
1214                 self._my_api()
1215             else:
1216                 self._keep_client = KeepClient(api_client=self._api_client)
1217         return self._keep_client
1218
1219     @synchronized
1220     def _my_block_manager(self):
1221         if self._block_manager is None:
1222             self._block_manager = _BlockManager(self._my_keep())
1223         return self._block_manager
1224
1225     def _populate_from_api_server(self):
1226         # As in KeepClient itself, we must wait until the last
1227         # possible moment to instantiate an API client, in order to
1228         # avoid tripping up clients that don't have access to an API
1229         # server.  If we do build one, make sure our Keep client uses
1230         # it.  If instantiation fails, we'll fall back to the except
1231         # clause, just like any other Collection lookup
1232         # failure. Return an exception, or None if successful.
1233         try:
1234             self._api_response = self._my_api().collections().get(
1235                 uuid=self._manifest_locator).execute(
1236                     num_retries=self.num_retries)
1237             self._manifest_text = self._api_response['manifest_text']
1238             return None
1239         except Exception as e:
1240             return e
1241
1242     def _populate_from_keep(self):
1243         # Retrieve a manifest directly from Keep. This has a chance of
1244         # working if [a] the locator includes a permission signature
1245         # or [b] the Keep services are operating in world-readable
1246         # mode. Return an exception, or None if successful.
1247         try:
1248             self._manifest_text = self._my_keep().get(
1249                 self._manifest_locator, num_retries=self.num_retries)
1250         except Exception as e:
1251             return e
1252
1253     def _populate(self):
1254         if self._manifest_locator is None and self._manifest_text is None:
1255             return
1256         error_via_api = None
1257         error_via_keep = None
1258         should_try_keep = ((self._manifest_text is None) and
1259                            util.keep_locator_pattern.match(
1260                                self._manifest_locator))
1261         if ((self._manifest_text is None) and
1262             util.signed_locator_pattern.match(self._manifest_locator)):
1263             error_via_keep = self._populate_from_keep()
1264         if self._manifest_text is None:
1265             error_via_api = self._populate_from_api_server()
1266             if error_via_api is not None and not should_try_keep:
1267                 raise error_via_api
1268         if ((self._manifest_text is None) and
1269             not error_via_keep and
1270             should_try_keep):
1271             # Looks like a keep locator, and we didn't already try keep above
1272             error_via_keep = self._populate_from_keep()
1273         if self._manifest_text is None:
1274             # Nothing worked!
1275             raise errors.NotFoundError(
1276                 ("Failed to retrieve collection '{}' " +
1277                  "from either API server ({}) or Keep ({})."
1278                  ).format(
1279                     self._manifest_locator,
1280                     error_via_api,
1281                     error_via_keep))
1282         # populate
1283         self._baseline_manifest = self._manifest_text
1284         self._import_manifest(self._manifest_text)
1285
1286
1287     def _has_collection_uuid(self):
1288         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1289
1290     def __enter__(self):
1291         return self
1292
1293     def __exit__(self, exc_type, exc_value, traceback):
1294         """Support scoped auto-commit in a with: block."""
1295         if exc_type is None:
1296             if self.writable() and self._has_collection_uuid():
1297                 self.save()
1298         if self._block_manager is not None:
1299             self._block_manager.stop_threads()
1300
1301     @synchronized
1302     def manifest_locator(self):
1303         """Get the manifest locator, if any.
1304
1305         The manifest locator will be set when the collection is loaded from an
1306         API server record or the portable data hash of a manifest.
1307
1308         The manifest locator will be None if the collection is newly created or
1309         was created directly from manifest text.  The method `save_new()` will
1310         assign a manifest locator.
1311
1312         """
1313         return self._manifest_locator
1314
1315     @synchronized
1316     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1317         if new_config is None:
1318             new_config = self._config
1319         if readonly:
1320             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1321         else:
1322             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1323
1324         newcollection._clonefrom(self)
1325         return newcollection
1326
1327     @synchronized
1328     def api_response(self):
1329         """Returns information about this Collection fetched from the API server.
1330
1331         If the Collection exists in Keep but not the API server, currently
1332         returns None.  Future versions may provide a synthetic response.
1333
1334         """
1335         return self._api_response
1336
1337     def find_or_create(self, path, create_type):
1338         """See `RichCollectionBase.find_or_create`"""
1339         if path == ".":
1340             return self
1341         else:
1342             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1343
1344     def find(self, path):
1345         """See `RichCollectionBase.find`"""
1346         if path == ".":
1347             return self
1348         else:
1349             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1350
1351     def remove(self, path, recursive=False):
1352         """See `RichCollectionBase.remove`"""
1353         if path == ".":
1354             raise errors.ArgumentError("Cannot remove '.'")
1355         else:
1356             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1357
1358     @must_be_writable
1359     @synchronized
1360     @retry_method
1361     def save(self, merge=True, num_retries=None):
1362         """Save collection to an existing collection record.
1363
1364         Commit pending buffer blocks to Keep, merge with remote record (if
1365         merge=True, the default), and update the collection record.  Returns
1366         the current manifest text.
1367
1368         Will raise AssertionError if not associated with a collection record on
1369         the API server.  If you want to save a manifest to Keep only, see
1370         `save_new()`.
1371
1372         :merge:
1373           Update and merge remote changes before saving.  Otherwise, any
1374           remote changes will be ignored and overwritten.
1375
1376         :num_retries:
1377           Retry count on API calls (if None,  use the collection default)
1378
1379         """
1380         if self.modified():
1381             if not self._has_collection_uuid():
1382                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1383
1384             self._my_block_manager().commit_all()
1385
1386             if merge:
1387                 self.update()
1388
1389             text = self.manifest_text(strip=False)
1390             self._api_response = self._my_api().collections().update(
1391                 uuid=self._manifest_locator,
1392                 body={'manifest_text': text}
1393                 ).execute(
1394                     num_retries=num_retries)
1395             self._manifest_text = self._api_response["manifest_text"]
1396             self.set_unmodified()
1397
1398         return self._manifest_text
1399
1400
1401     @must_be_writable
1402     @synchronized
1403     @retry_method
1404     def save_new(self, name=None,
1405                  create_collection_record=True,
1406                  owner_uuid=None,
1407                  ensure_unique_name=False,
1408                  num_retries=None):
1409         """Save collection to a new collection record.
1410
1411         Commit pending buffer blocks to Keep and, when create_collection_record
1412         is True (default), create a new collection record.  After creating a
1413         new collection record, this Collection object will be associated with
1414         the new record used by `save()`.  Returns the current manifest text.
1415
1416         :name:
1417           The collection name.
1418
1419         :create_collection_record:
1420            If True, create a collection record on the API server.
1421            If False, only commit blocks to Keep and return the manifest text.
1422
1423         :owner_uuid:
1424           the user, or project uuid that will own this collection.
1425           If None, defaults to the current user.
1426
1427         :ensure_unique_name:
1428           If True, ask the API server to rename the collection
1429           if it conflicts with a collection with the same name and owner.  If
1430           False, a name conflict will result in an error.
1431
1432         :num_retries:
1433           Retry count on API calls (if None,  use the collection default)
1434
1435         """
1436         self._my_block_manager().commit_all()
1437         text = self.manifest_text(strip=False)
1438
1439         if create_collection_record:
1440             if name is None:
1441                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1442                 ensure_unique_name = True
1443
1444             body = {"manifest_text": text,
1445                     "name": name}
1446             if owner_uuid:
1447                 body["owner_uuid"] = owner_uuid
1448
1449             self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1450             text = self._api_response["manifest_text"]
1451
1452             self._manifest_locator = self._api_response["uuid"]
1453
1454             self._manifest_text = text
1455             self.set_unmodified()
1456
1457         return text
1458
1459     @synchronized
1460     def _import_manifest(self, manifest_text):
1461         """Import a manifest into a `Collection`.
1462
1463         :manifest_text:
1464           The manifest text to import from.
1465
1466         """
1467         if len(self) > 0:
1468             raise ArgumentError("Can only import manifest into an empty collection")
1469
1470         STREAM_NAME = 0
1471         BLOCKS = 1
1472         SEGMENTS = 2
1473
1474         stream_name = None
1475         state = STREAM_NAME
1476
1477         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1478             tok = token_and_separator.group(1)
1479             sep = token_and_separator.group(2)
1480
1481             if state == STREAM_NAME:
1482                 # starting a new stream
1483                 stream_name = tok.replace('\\040', ' ')
1484                 blocks = []
1485                 segments = []
1486                 streamoffset = 0L
1487                 state = BLOCKS
1488                 self.mkdirs(stream_name)
1489                 continue
1490
1491             if state == BLOCKS:
1492                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1493                 if block_locator:
1494                     blocksize = long(block_locator.group(1))
1495                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1496                     streamoffset += blocksize
1497                 else:
1498                     state = SEGMENTS
1499
1500             if state == SEGMENTS:
1501                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1502                 if file_segment:
1503                     pos = long(file_segment.group(1))
1504                     size = long(file_segment.group(2))
1505                     name = file_segment.group(3).replace('\\040', ' ')
1506                     filepath = os.path.join(stream_name, name)
1507                     afile = self.find_or_create(filepath, FILE)
1508                     if isinstance(afile, ArvadosFile):
1509                         afile.add_segment(blocks, pos, size)
1510                     else:
1511                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1512                 else:
1513                     # error!
1514                     raise errors.SyntaxError("Invalid manifest format")
1515
1516             if sep == "\n":
1517                 stream_name = None
1518                 state = STREAM_NAME
1519
1520         self.set_unmodified()
1521
1522     @synchronized
1523     def notify(self, event, collection, name, item):
1524         if self._callback:
1525             self._callback(event, collection, name, item)
1526
1527
1528 class Subcollection(RichCollectionBase):
1529     """This is a subdirectory within a collection that doesn't have its own API
1530     server record.
1531
1532     It falls under the umbrella of the root collection.
1533
1534     """
1535
1536     def __init__(self, parent, name):
1537         super(Subcollection, self).__init__(parent)
1538         self.lock = self.root_collection().lock
1539         self._manifest_text = None
1540         self.name = name
1541         self.num_retries = parent.num_retries
1542
1543     def root_collection(self):
1544         return self.parent.root_collection()
1545
1546     def writable(self):
1547         return self.root_collection().writable()
1548
1549     def _my_api(self):
1550         return self.root_collection()._my_api()
1551
1552     def _my_keep(self):
1553         return self.root_collection()._my_keep()
1554
1555     def _my_block_manager(self):
1556         return self.root_collection()._my_block_manager()
1557
1558     def stream_name(self):
1559         return os.path.join(self.parent.stream_name(), self.name)
1560
1561     @synchronized
1562     def clone(self, new_parent, new_name):
1563         c = Subcollection(new_parent, new_name)
1564         c._clonefrom(self)
1565         return c
1566
1567     @must_be_writable
1568     @synchronized
1569     def _reparent(self, newparent, newname):
1570         self._modified = True
1571         self.flush()
1572         self.parent.remove(self.name, recursive=True)
1573         self.parent = newparent
1574         self.name = newname
1575         self.lock = self.parent.root_collection().lock
1576
1577
1578 class CollectionReader(Collection):
1579     """A read-only collection object.
1580
1581     Initialize from an api collection record locator, a portable data hash of a
1582     manifest, or raw manifest text.  See `Collection` constructor for detailed
1583     options.
1584
1585     """
1586     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1587         self._in_init = True
1588         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1589         self._in_init = False
1590
1591         # Forego any locking since it should never change once initialized.
1592         self.lock = NoopLock()
1593
1594         # Backwards compatability with old CollectionReader
1595         # all_streams() and all_files()
1596         self._streams = None
1597
1598     def writable(self):
1599         return self._in_init
1600
1601     def _populate_streams(orig_func):
1602         @functools.wraps(orig_func)
1603         def populate_streams_wrapper(self, *args, **kwargs):
1604             # Defer populating self._streams until needed since it creates a copy of the manifest.
1605             if self._streams is None:
1606                 if self._manifest_text:
1607                     self._streams = [sline.split()
1608                                      for sline in self._manifest_text.split("\n")
1609                                      if sline]
1610                 else:
1611                     self._streams = []
1612             return orig_func(self, *args, **kwargs)
1613         return populate_streams_wrapper
1614
1615     @_populate_streams
1616     def normalize(self):
1617         """Normalize the streams returned by `all_streams`.
1618
1619         This method is kept for backwards compatability and only affects the
1620         behavior of `all_streams()` and `all_files()`
1621
1622         """
1623
1624         # Rearrange streams
1625         streams = {}
1626         for s in self.all_streams():
1627             for f in s.all_files():
1628                 streamname, filename = split(s.name() + "/" + f.name())
1629                 if streamname not in streams:
1630                     streams[streamname] = {}
1631                 if filename not in streams[streamname]:
1632                     streams[streamname][filename] = []
1633                 for r in f.segments:
1634                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1635
1636         self._streams = [normalize_stream(s, streams[s])
1637                          for s in sorted(streams)]
1638     @_populate_streams
1639     def all_streams(self):
1640         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1641                 for s in self._streams]
1642
1643     @_populate_streams
1644     def all_files(self):
1645         for s in self.all_streams():
1646             for f in s.all_files():
1647                 yield f