4823: Fix bugs, fix tests, existing tests pass again. Still need to write new tests.
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import time
7
8 from collections import deque
9 from stat import *
10
11 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
12 from keep import *
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
15 import config
16 import errors
17 import util
18 import events
19
20 _logger = logging.getLogger('arvados.collection')
21
22 class CollectionBase(object):
23     def __enter__(self):
24         return self
25
26     def __exit__(self, exc_type, exc_value, traceback):
27         pass
28
29     def _my_keep(self):
30         if self._keep_client is None:
31             self._keep_client = KeepClient(api_client=self._api_client,
32                                            num_retries=self.num_retries)
33         return self._keep_client
34
35     def stripped_manifest(self):
36         """
37         Return the manifest for the current collection with all
38         non-portable hints (i.e., permission signatures and other
39         hints other than size hints) removed from the locators.
40         """
41         raw = self.manifest_text()
42         clean = []
43         for line in raw.split("\n"):
44             fields = line.split()
45             if fields:
46                 clean_fields = fields[:1] + [
47                     (re.sub(r'\+[^\d][^\+]*', '', x)
48                      if re.match(util.keep_locator_pattern, x)
49                      else x)
50                     for x in fields[1:]]
51                 clean += [' '.join(clean_fields), "\n"]
52         return ''.join(clean)
53
54
55 class CollectionReader(CollectionBase):
56     def __init__(self, manifest_locator_or_text, api_client=None,
57                  keep_client=None, num_retries=0):
58         """Instantiate a CollectionReader.
59
60         This class parses Collection manifests to provide a simple interface
61         to read its underlying files.
62
63         Arguments:
64         * manifest_locator_or_text: One of a Collection UUID, portable data
65           hash, or full manifest text.
66         * api_client: The API client to use to look up Collections.  If not
67           provided, CollectionReader will build one from available Arvados
68           configuration.
69         * keep_client: The KeepClient to use to download Collection data.
70           If not provided, CollectionReader will build one from available
71           Arvados configuration.
72         * num_retries: The default number of times to retry failed
73           service requests.  Default 0.  You may change this value
74           after instantiation, but note those changes may not
75           propagate to related objects like the Keep client.
76         """
77         self._api_client = api_client
78         self._keep_client = keep_client
79         self.num_retries = num_retries
80         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
81             self._manifest_locator = manifest_locator_or_text
82             self._manifest_text = None
83         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
84             self._manifest_locator = manifest_locator_or_text
85             self._manifest_text = None
86         elif re.match(util.manifest_pattern, manifest_locator_or_text):
87             self._manifest_text = manifest_locator_or_text
88             self._manifest_locator = None
89         else:
90             raise errors.ArgumentError(
91                 "Argument to CollectionReader must be a manifest or a collection UUID")
92         self._api_response = None
93         self._streams = None
94
95     def _populate_from_api_server(self):
96         # As in KeepClient itself, we must wait until the last
97         # possible moment to instantiate an API client, in order to
98         # avoid tripping up clients that don't have access to an API
99         # server.  If we do build one, make sure our Keep client uses
100         # it.  If instantiation fails, we'll fall back to the except
101         # clause, just like any other Collection lookup
102         # failure. Return an exception, or None if successful.
103         try:
104             if self._api_client is None:
105                 self._api_client = arvados.api('v1')
106                 self._keep_client = None  # Make a new one with the new api.
107             self._api_response = self._api_client.collections().get(
108                 uuid=self._manifest_locator).execute(
109                 num_retries=self.num_retries)
110             self._manifest_text = self._api_response['manifest_text']
111             return None
112         except Exception as e:
113             return e
114
115     def _populate_from_keep(self):
116         # Retrieve a manifest directly from Keep. This has a chance of
117         # working if [a] the locator includes a permission signature
118         # or [b] the Keep services are operating in world-readable
119         # mode. Return an exception, or None if successful.
120         try:
121             self._manifest_text = self._my_keep().get(
122                 self._manifest_locator, num_retries=self.num_retries)
123         except Exception as e:
124             return e
125
126     def _populate(self):
127         error_via_api = None
128         error_via_keep = None
129         should_try_keep = ((self._manifest_text is None) and
130                            util.keep_locator_pattern.match(
131                 self._manifest_locator))
132         if ((self._manifest_text is None) and
133             util.signed_locator_pattern.match(self._manifest_locator)):
134             error_via_keep = self._populate_from_keep()
135         if self._manifest_text is None:
136             error_via_api = self._populate_from_api_server()
137             if error_via_api is not None and not should_try_keep:
138                 raise error_via_api
139         if ((self._manifest_text is None) and
140             not error_via_keep and
141             should_try_keep):
142             # Looks like a keep locator, and we didn't already try keep above
143             error_via_keep = self._populate_from_keep()
144         if self._manifest_text is None:
145             # Nothing worked!
146             raise arvados.errors.NotFoundError(
147                 ("Failed to retrieve collection '{}' " +
148                  "from either API server ({}) or Keep ({})."
149                  ).format(
150                     self._manifest_locator,
151                     error_via_api,
152                     error_via_keep))
153         self._streams = [sline.split()
154                          for sline in self._manifest_text.split("\n")
155                          if sline]
156
157     def _populate_first(orig_func):
158         # Decorator for methods that read actual Collection data.
159         @functools.wraps(orig_func)
160         def wrapper(self, *args, **kwargs):
161             if self._streams is None:
162                 self._populate()
163             return orig_func(self, *args, **kwargs)
164         return wrapper
165
166     @_populate_first
167     def api_response(self):
168         """api_response() -> dict or None
169
170         Returns information about this Collection fetched from the API server.
171         If the Collection exists in Keep but not the API server, currently
172         returns None.  Future versions may provide a synthetic response.
173         """
174         return self._api_response
175
176     @_populate_first
177     def normalize(self):
178         # Rearrange streams
179         streams = {}
180         for s in self.all_streams():
181             for f in s.all_files():
182                 streamname, filename = split(s.name() + "/" + f.name())
183                 if streamname not in streams:
184                     streams[streamname] = {}
185                 if filename not in streams[streamname]:
186                     streams[streamname][filename] = []
187                 for r in f.segments:
188                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
189
190         self._streams = [normalize_stream(s, streams[s])
191                          for s in sorted(streams)]
192
193         # Regenerate the manifest text based on the normalized streams
194         self._manifest_text = ''.join(
195             [StreamReader(stream, keep=self._my_keep()).manifest_text()
196              for stream in self._streams])
197
198     @_populate_first
199     def open(self, streampath, filename=None):
200         """open(streampath[, filename]) -> file-like object
201
202         Pass in the path of a file to read from the Collection, either as a
203         single string or as two separate stream name and file name arguments.
204         This method returns a file-like object to read that file.
205         """
206         if filename is None:
207             streampath, filename = split(streampath)
208         keep_client = self._my_keep()
209         for stream_s in self._streams:
210             stream = StreamReader(stream_s, keep_client,
211                                   num_retries=self.num_retries)
212             if stream.name() == streampath:
213                 break
214         else:
215             raise ValueError("stream '{}' not found in Collection".
216                              format(streampath))
217         try:
218             return stream.files()[filename]
219         except KeyError:
220             raise ValueError("file '{}' not found in Collection stream '{}'".
221                              format(filename, streampath))
222
223     @_populate_first
224     def all_streams(self):
225         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
226                 for s in self._streams]
227
228     def all_files(self):
229         for s in self.all_streams():
230             for f in s.all_files():
231                 yield f
232
233     @_populate_first
234     def manifest_text(self, strip=False, normalize=False):
235         if normalize:
236             cr = CollectionReader(self.manifest_text())
237             cr.normalize()
238             return cr.manifest_text(strip=strip, normalize=False)
239         elif strip:
240             return self.stripped_manifest()
241         else:
242             return self._manifest_text
243
244
245 class _WriterFile(ArvadosFileBase):
246     def __init__(self, coll_writer, name):
247         super(_WriterFile, self).__init__(name, 'wb')
248         self.dest = coll_writer
249
250     def close(self):
251         super(_WriterFile, self).close()
252         self.dest.finish_current_file()
253
254     @ArvadosFileBase._before_close
255     def write(self, data):
256         self.dest.write(data)
257
258     @ArvadosFileBase._before_close
259     def writelines(self, seq):
260         for data in seq:
261             self.write(data)
262
263     @ArvadosFileBase._before_close
264     def flush(self):
265         self.dest.flush_data()
266
267
268 class CollectionWriter(CollectionBase):
269     def __init__(self, api_client=None, num_retries=0):
270         """Instantiate a CollectionWriter.
271
272         CollectionWriter lets you build a new Arvados Collection from scratch.
273         Write files to it.  The CollectionWriter will upload data to Keep as
274         appropriate, and provide you with the Collection manifest text when
275         you're finished.
276
277         Arguments:
278         * api_client: The API client to use to look up Collections.  If not
279           provided, CollectionReader will build one from available Arvados
280           configuration.
281         * num_retries: The default number of times to retry failed
282           service requests.  Default 0.  You may change this value
283           after instantiation, but note those changes may not
284           propagate to related objects like the Keep client.
285         """
286         self._api_client = api_client
287         self.num_retries = num_retries
288         self._keep_client = None
289         self._data_buffer = []
290         self._data_buffer_len = 0
291         self._current_stream_files = []
292         self._current_stream_length = 0
293         self._current_stream_locators = []
294         self._current_stream_name = '.'
295         self._current_file_name = None
296         self._current_file_pos = 0
297         self._finished_streams = []
298         self._close_file = None
299         self._queued_file = None
300         self._queued_dirents = deque()
301         self._queued_trees = deque()
302         self._last_open = None
303
304     def __exit__(self, exc_type, exc_value, traceback):
305         if exc_type is None:
306             self.finish()
307
308     def do_queued_work(self):
309         # The work queue consists of three pieces:
310         # * _queued_file: The file object we're currently writing to the
311         #   Collection.
312         # * _queued_dirents: Entries under the current directory
313         #   (_queued_trees[0]) that we want to write or recurse through.
314         #   This may contain files from subdirectories if
315         #   max_manifest_depth == 0 for this directory.
316         # * _queued_trees: Directories that should be written as separate
317         #   streams to the Collection.
318         # This function handles the smallest piece of work currently queued
319         # (current file, then current directory, then next directory) until
320         # no work remains.  The _work_THING methods each do a unit of work on
321         # THING.  _queue_THING methods add a THING to the work queue.
322         while True:
323             if self._queued_file:
324                 self._work_file()
325             elif self._queued_dirents:
326                 self._work_dirents()
327             elif self._queued_trees:
328                 self._work_trees()
329             else:
330                 break
331
332     def _work_file(self):
333         while True:
334             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
335             if not buf:
336                 break
337             self.write(buf)
338         self.finish_current_file()
339         if self._close_file:
340             self._queued_file.close()
341         self._close_file = None
342         self._queued_file = None
343
344     def _work_dirents(self):
345         path, stream_name, max_manifest_depth = self._queued_trees[0]
346         if stream_name != self.current_stream_name():
347             self.start_new_stream(stream_name)
348         while self._queued_dirents:
349             dirent = self._queued_dirents.popleft()
350             target = os.path.join(path, dirent)
351             if os.path.isdir(target):
352                 self._queue_tree(target,
353                                  os.path.join(stream_name, dirent),
354                                  max_manifest_depth - 1)
355             else:
356                 self._queue_file(target, dirent)
357                 break
358         if not self._queued_dirents:
359             self._queued_trees.popleft()
360
361     def _work_trees(self):
362         path, stream_name, max_manifest_depth = self._queued_trees[0]
363         d = util.listdir_recursive(
364             path, max_depth = (None if max_manifest_depth == 0 else 0))
365         if d:
366             self._queue_dirents(stream_name, d)
367         else:
368             self._queued_trees.popleft()
369
370     def _queue_file(self, source, filename=None):
371         assert (self._queued_file is None), "tried to queue more than one file"
372         if not hasattr(source, 'read'):
373             source = open(source, 'rb')
374             self._close_file = True
375         else:
376             self._close_file = False
377         if filename is None:
378             filename = os.path.basename(source.name)
379         self.start_new_file(filename)
380         self._queued_file = source
381
382     def _queue_dirents(self, stream_name, dirents):
383         assert (not self._queued_dirents), "tried to queue more than one tree"
384         self._queued_dirents = deque(sorted(dirents))
385
386     def _queue_tree(self, path, stream_name, max_manifest_depth):
387         self._queued_trees.append((path, stream_name, max_manifest_depth))
388
389     def write_file(self, source, filename=None):
390         self._queue_file(source, filename)
391         self.do_queued_work()
392
393     def write_directory_tree(self,
394                              path, stream_name='.', max_manifest_depth=-1):
395         self._queue_tree(path, stream_name, max_manifest_depth)
396         self.do_queued_work()
397
398     def write(self, newdata):
399         if hasattr(newdata, '__iter__'):
400             for s in newdata:
401                 self.write(s)
402             return
403         self._data_buffer.append(newdata)
404         self._data_buffer_len += len(newdata)
405         self._current_stream_length += len(newdata)
406         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
407             self.flush_data()
408
409     def open(self, streampath, filename=None):
410         """open(streampath[, filename]) -> file-like object
411
412         Pass in the path of a file to write to the Collection, either as a
413         single string or as two separate stream name and file name arguments.
414         This method returns a file-like object you can write to add it to the
415         Collection.
416
417         You may only have one file object from the Collection open at a time,
418         so be sure to close the object when you're done.  Using the object in
419         a with statement makes that easy::
420
421           with cwriter.open('./doc/page1.txt') as outfile:
422               outfile.write(page1_data)
423           with cwriter.open('./doc/page2.txt') as outfile:
424               outfile.write(page2_data)
425         """
426         if filename is None:
427             streampath, filename = split(streampath)
428         if self._last_open and not self._last_open.closed:
429             raise errors.AssertionError(
430                 "can't open '{}' when '{}' is still open".format(
431                     filename, self._last_open.name))
432         if streampath != self.current_stream_name():
433             self.start_new_stream(streampath)
434         self.set_current_file_name(filename)
435         self._last_open = _WriterFile(self, filename)
436         return self._last_open
437
438     def flush_data(self):
439         data_buffer = ''.join(self._data_buffer)
440         if data_buffer:
441             self._current_stream_locators.append(
442                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
443             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
444             self._data_buffer_len = len(self._data_buffer[0])
445
446     def start_new_file(self, newfilename=None):
447         self.finish_current_file()
448         self.set_current_file_name(newfilename)
449
450     def set_current_file_name(self, newfilename):
451         if re.search(r'[\t\n]', newfilename):
452             raise errors.AssertionError(
453                 "Manifest filenames cannot contain whitespace: %s" %
454                 newfilename)
455         elif re.search(r'\x00', newfilename):
456             raise errors.AssertionError(
457                 "Manifest filenames cannot contain NUL characters: %s" %
458                 newfilename)
459         self._current_file_name = newfilename
460
461     def current_file_name(self):
462         return self._current_file_name
463
464     def finish_current_file(self):
465         if self._current_file_name is None:
466             if self._current_file_pos == self._current_stream_length:
467                 return
468             raise errors.AssertionError(
469                 "Cannot finish an unnamed file " +
470                 "(%d bytes at offset %d in '%s' stream)" %
471                 (self._current_stream_length - self._current_file_pos,
472                  self._current_file_pos,
473                  self._current_stream_name))
474         self._current_stream_files.append([
475                 self._current_file_pos,
476                 self._current_stream_length - self._current_file_pos,
477                 self._current_file_name])
478         self._current_file_pos = self._current_stream_length
479         self._current_file_name = None
480
481     def start_new_stream(self, newstreamname='.'):
482         self.finish_current_stream()
483         self.set_current_stream_name(newstreamname)
484
485     def set_current_stream_name(self, newstreamname):
486         if re.search(r'[\t\n]', newstreamname):
487             raise errors.AssertionError(
488                 "Manifest stream names cannot contain whitespace")
489         self._current_stream_name = '.' if newstreamname=='' else newstreamname
490
491     def current_stream_name(self):
492         return self._current_stream_name
493
494     def finish_current_stream(self):
495         self.finish_current_file()
496         self.flush_data()
497         if not self._current_stream_files:
498             pass
499         elif self._current_stream_name is None:
500             raise errors.AssertionError(
501                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
502                 (self._current_stream_length, len(self._current_stream_files)))
503         else:
504             if not self._current_stream_locators:
505                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
506             self._finished_streams.append([self._current_stream_name,
507                                            self._current_stream_locators,
508                                            self._current_stream_files])
509         self._current_stream_files = []
510         self._current_stream_length = 0
511         self._current_stream_locators = []
512         self._current_stream_name = None
513         self._current_file_pos = 0
514         self._current_file_name = None
515
516     def finish(self):
517         # Store the manifest in Keep and return its locator.
518         return self._my_keep().put(self.manifest_text())
519
520     def portable_data_hash(self):
521         stripped = self.stripped_manifest()
522         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
523
524     def manifest_text(self):
525         self.finish_current_stream()
526         manifest = ''
527
528         for stream in self._finished_streams:
529             if not re.search(r'^\.(/.*)?$', stream[0]):
530                 manifest += './'
531             manifest += stream[0].replace(' ', '\\040')
532             manifest += ' ' + ' '.join(stream[1])
533             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
534             manifest += "\n"
535
536         return manifest
537
538     def data_locators(self):
539         ret = []
540         for name, locators, files in self._finished_streams:
541             ret += locators
542         return ret
543
544
545 class ResumableCollectionWriter(CollectionWriter):
546     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
547                    '_current_stream_locators', '_current_stream_name',
548                    '_current_file_name', '_current_file_pos', '_close_file',
549                    '_data_buffer', '_dependencies', '_finished_streams',
550                    '_queued_dirents', '_queued_trees']
551
552     def __init__(self, api_client=None, num_retries=0):
553         self._dependencies = {}
554         super(ResumableCollectionWriter, self).__init__(
555             api_client, num_retries=num_retries)
556
557     @classmethod
558     def from_state(cls, state, *init_args, **init_kwargs):
559         # Try to build a new writer from scratch with the given state.
560         # If the state is not suitable to resume (because files have changed,
561         # been deleted, aren't predictable, etc.), raise a
562         # StaleWriterStateError.  Otherwise, return the initialized writer.
563         # The caller is responsible for calling writer.do_queued_work()
564         # appropriately after it's returned.
565         writer = cls(*init_args, **init_kwargs)
566         for attr_name in cls.STATE_PROPS:
567             attr_value = state[attr_name]
568             attr_class = getattr(writer, attr_name).__class__
569             # Coerce the value into the same type as the initial value, if
570             # needed.
571             if attr_class not in (type(None), attr_value.__class__):
572                 attr_value = attr_class(attr_value)
573             setattr(writer, attr_name, attr_value)
574         # Check dependencies before we try to resume anything.
575         if any(KeepLocator(ls).permission_expired()
576                for ls in writer._current_stream_locators):
577             raise errors.StaleWriterStateError(
578                 "locators include expired permission hint")
579         writer.check_dependencies()
580         if state['_current_file'] is not None:
581             path, pos = state['_current_file']
582             try:
583                 writer._queued_file = open(path, 'rb')
584                 writer._queued_file.seek(pos)
585             except IOError as error:
586                 raise errors.StaleWriterStateError(
587                     "failed to reopen active file {}: {}".format(path, error))
588         return writer
589
590     def check_dependencies(self):
591         for path, orig_stat in self._dependencies.items():
592             if not S_ISREG(orig_stat[ST_MODE]):
593                 raise errors.StaleWriterStateError("{} not file".format(path))
594             try:
595                 now_stat = tuple(os.stat(path))
596             except OSError as error:
597                 raise errors.StaleWriterStateError(
598                     "failed to stat {}: {}".format(path, error))
599             if ((not S_ISREG(now_stat[ST_MODE])) or
600                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
601                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
602                 raise errors.StaleWriterStateError("{} changed".format(path))
603
604     def dump_state(self, copy_func=lambda x: x):
605         state = {attr: copy_func(getattr(self, attr))
606                  for attr in self.STATE_PROPS}
607         if self._queued_file is None:
608             state['_current_file'] = None
609         else:
610             state['_current_file'] = (os.path.realpath(self._queued_file.name),
611                                       self._queued_file.tell())
612         return state
613
614     def _queue_file(self, source, filename=None):
615         try:
616             src_path = os.path.realpath(source)
617         except Exception:
618             raise errors.AssertionError("{} not a file path".format(source))
619         try:
620             path_stat = os.stat(src_path)
621         except OSError as stat_error:
622             path_stat = None
623         super(ResumableCollectionWriter, self)._queue_file(source, filename)
624         fd_stat = os.fstat(self._queued_file.fileno())
625         if not S_ISREG(fd_stat.st_mode):
626             # We won't be able to resume from this cache anyway, so don't
627             # worry about further checks.
628             self._dependencies[source] = tuple(fd_stat)
629         elif path_stat is None:
630             raise errors.AssertionError(
631                 "could not stat {}: {}".format(source, stat_error))
632         elif path_stat.st_ino != fd_stat.st_ino:
633             raise errors.AssertionError(
634                 "{} changed between open and stat calls".format(source))
635         else:
636             self._dependencies[src_path] = tuple(fd_stat)
637
638     def write(self, data):
639         if self._queued_file is None:
640             raise errors.AssertionError(
641                 "resumable writer can't accept unsourced data")
642         return super(ResumableCollectionWriter, self).write(data)
643
644 ADD = "add"
645 DEL = "del"
646
647 class SynchronizedCollectionBase(CollectionBase):
648     def __init__(self, parent=None):
649         self.parent = parent
650         self._items = {}
651
652     def _my_api(self):
653         raise NotImplementedError()
654
655     def _my_keep(self):
656         raise NotImplementedError()
657
658     def _my_block_manager(self):
659         raise NotImplementedError()
660
661     def _root_lock(self):
662         raise NotImplementedError()
663
664     def _populate(self):
665         raise NotImplementedError()
666
667     def sync_mode(self):
668         raise NotImplementedError()
669
670     def notify(self, collection, event, name, item):
671         raise NotImplementedError()
672
673     @_synchronized
674     def find(self, path, create=False, create_collection=False):
675         """Recursively search the specified file path.  May return either a Collection
676         or ArvadosFile.
677
678         :create:
679           If true, create path components (i.e. Collections) that are
680           missing.  If "create" is False, return None if a path component is
681           not found.
682
683         :create_collection:
684           If the path is not found, "create" is True, and
685           "create_collection" is False, then create and return a new
686           ArvadosFile for the last path component.  If "create_collection" is
687           True, then create and return a new Collection for the last path
688           component.
689
690         """
691         if create and self.sync_mode() == SYNC_READONLY:
692             raise IOError((errno.EROFS, "Collection is read only"))
693
694         p = path.split("/")
695         if p[0] == '.':
696             del p[0]
697
698         if len(p) > 0:
699             item = self._items.get(p[0])
700             if len(p) == 1:
701                 # item must be a file
702                 if item is None and create:
703                     # create new file
704                     if create_collection:
705                         item = Subcollection(self)
706                     else:
707                         item = ArvadosFile(self)
708                     self._items[p[0]] = item
709                     self.notify(self, ADD, p[0], item)
710                 return item
711             else:
712                 if item is None and create:
713                     # create new collection
714                     item = Subcollection(self)
715                     self._items[p[0]] = item
716                     self.notify(self, ADD, p[0], item)
717                 del p[0]
718                 return item.find("/".join(p), create=create)
719         else:
720             return self
721
722     def open(self, path, mode):
723         """Open a file-like object for access.
724
725         :path:
726           path to a file in the collection
727         :mode:
728           one of "r", "r+", "w", "w+", "a", "a+"
729           :"r":
730             opens for reading
731           :"r+":
732             opens for reading and writing.  Reads/writes share a file pointer.
733           :"w", "w+":
734             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
735           :"a", "a+":
736             opens for reading and writing.  All writes are appended to
737             the end of the file.  Writing does not affect the file pointer for
738             reading.
739         """
740         mode = mode.replace("b", "")
741         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
742             raise ArgumentError("Bad mode '%s'" % mode)
743         create = (mode != "r")
744
745         if create and self.sync_mode() == SYNC_READONLY:
746             raise IOError((errno.EROFS, "Collection is read only"))
747
748         f = self.find(path, create=create)
749
750         if f is None:
751             raise IOError((errno.ENOENT, "File not found"))
752         if not isinstance(f, ArvadosFile):
753             raise IOError((errno.EISDIR, "Path must refer to a file."))
754
755         if mode[0] == "w":
756             f.truncate(0)
757
758         if mode == "r":
759             return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
760         else:
761             return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
762
763     @_synchronized
764     def modified(self):
765         """Test if the collection (or any subcollection or file) has been modified
766         since it was created."""
767         for k,v in self._items.items():
768             if v.modified():
769                 return True
770         return False
771
772     @_synchronized
773     def set_unmodified(self):
774         """Recursively clear modified flag"""
775         for k,v in self._items.items():
776             v.set_unmodified()
777
778     @_synchronized
779     def __iter__(self):
780         """Iterate over names of files and collections contained in this collection."""
781         return self._items.keys()
782
783     @_synchronized
784     def iterkeys(self):
785         """Iterate over names of files and collections directly contained in this collection."""
786         return self._items.keys()
787
788     @_synchronized
789     def __getitem__(self, k):
790         """Get a file or collection that is directly contained by this collection.  If
791         you want to search a path, use `find()` instead.
792         """
793         return self._items[k]
794
795     @_synchronized
796     def __contains__(self, k):
797         """If there is a file or collection a directly contained by this collection
798         with name "k"."""
799         return k in self._items
800
801     @_synchronized
802     def __len__(self):
803         """Get the number of items directly contained in this collection"""
804         return len(self._items)
805
806     @_must_be_writable
807     @_synchronized
808     def __delitem__(self, p):
809         """Delete an item by name which is directly contained by this collection."""
810         del self._items[p]
811         self.notify(self, DEL, p, None)
812
813     @_synchronized
814     def keys(self):
815         """Get a list of names of files and collections directly contained in this collection."""
816         return self._items.keys()
817
818     @_synchronized
819     def values(self):
820         """Get a list of files and collection objects directly contained in this collection."""
821         return self._items.values()
822
823     @_synchronized
824     def items(self):
825         """Get a list of (name, object) tuples directly contained in this collection."""
826         return self._items.items()
827
828     def exists(self, path):
829         """Test if there is a file or collection at "path" """
830         return self.find(path) != None
831
832     @_must_be_writable
833     @_synchronized
834     def remove(self, path, rm_r=False):
835         """Remove the file or subcollection (directory) at `path`.
836         :rm_r:
837           Specify whether to remove non-empty subcollections (True), or raise an error (False).
838         """
839         p = path.split("/")
840         if p[0] == '.':
841             # Remove '.' from the front of the path
842             del p[0]
843
844         if len(p) > 0:
845             item = self._items.get(p[0])
846             if item is None:
847                 raise IOError((errno.ENOENT, "File not found"))
848             if len(p) == 1:
849                 if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
850                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
851                 del self._items[p[0]]
852                 self.notify(self, DEL, p[0], None)
853             else:
854                 del p[0]
855                 item.remove("/".join(p))
856         else:
857             raise IOError((errno.ENOENT, "File not found"))
858
859     def _cloneinto(self, target):
860         for k,v in self._items:
861             target._items[k] = v.clone(new_parent=target)
862
863     def clone(self):
864         raise NotImplementedError()
865
866     @_must_be_writable
867     @_synchronized
868     def copyto(self, target_path, source_path, source_collection=None, overwrite=False):
869         """
870         copyto('/foo', '/bar') will overwrite 'foo' if it exists.
871         copyto('/foo/', '/bar') will place 'bar' in subcollection 'foo'
872         """
873         if source_collection is None:
874             source_collection = self
875
876         # Find the object to copy
877         sp = source_path.split("/")
878         source_obj = source_collection.find(source_path)
879         if source_obj is None:
880             raise IOError((errno.ENOENT, "File not found"))
881
882         # Find parent collection the target path
883         tp = target_path.split("/")
884         target_dir = self.find(tp[0:-1].join("/"), create=True, create_collection=True)
885
886         # Determine the name to use.
887         target_name = tp[-1] if tp[-1] else sp[-1]
888
889         if target_name in target_dir and not overwrite:
890             raise IOError((errno.EEXIST, "File already exists"))
891
892         # Actually make the copy.
893         dup = source_obj.clone(target_dir)
894         with target_dir.lock:
895             target_dir._items[target_name] = dup
896
897         self.notify(target_dir, ADD, target_name, dup)
898
899
900     @_synchronized
901     def manifest_text(self, strip=False, normalize=False):
902         """Get the manifest text for this collection, sub collections and files.
903
904         :strip:
905           If True, remove signing tokens from block locators if present.
906           If False, block locators are left unchanged.
907
908         :normalize:
909           If True, always export the manifest text in normalized form
910           even if the Collection is not modified.  If False and the collection
911           is not modified, return the original manifest text even if it is not
912           in normalized form.
913
914         """
915         if self.modified() or self._manifest_text is None or normalize:
916             return export_manifest(self, stream_name=".", portable_locators=strip)
917         else:
918             if strip:
919                 return self.stripped_manifest()
920             else:
921                 return self._manifest_text
922
923     @_must_be_writable
924     @_synchronized
925     def merge(self, other):
926         for k in other.keys():
927             if k in self:
928                 if isinstance(self[k], Subcollection) and isinstance(other[k], Subcollection):
929                     self[k].merge(other[k])
930                 else:
931                     if self[k] != other[k]:
932                         name = "%s~conflict-%s~" % (k, time.strftime("%Y-%m-%d~%H:%M%:%S",
933                                                                      time.gmtime()))
934                         self[name] = other[k].clone(self)
935                         self.notify(self, name, ADD, self[name])
936             else:
937                 self[k] = other[k].clone(self)
938                 self.notify(self, k, ADD, self[k])
939
940     def portable_data_hash(self):
941         """Get the portable data hash for this collection's manifest."""
942         stripped = self.manifest_text(strip=True)
943         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
944
945
946 class Collection(SynchronizedCollectionBase):
947     """Store an Arvados collection, consisting of a set of files and
948     sub-collections.
949     """
950
951     def __init__(self, manifest_locator_or_text=None,
952                  parent=None,
953                  config=None,
954                  api_client=None,
955                  keep_client=None,
956                  num_retries=None,
957                  block_manager=None,
958                  sync=SYNC_READONLY):
959         """:manifest_locator_or_text:
960           One of Arvados collection UUID, block locator of
961           a manifest, raw manifest text, or None (to create an empty collection).
962         :parent:
963           the parent Collection, may be None.
964         :config:
965           the arvados configuration to get the hostname and api token.
966           Prefer this over supplying your own api_client and keep_client (except in testing).
967           Will use default config settings if not specified.
968         :api_client:
969           The API client object to use for requests.  If not specified, create one using `config`.
970         :keep_client:
971           the Keep client to use for requests.  If not specified, create one using `config`.
972         :num_retries:
973           the number of retries for API and Keep requests.
974         :block_manager:
975           the block manager to use.  If not specified, create one.
976         :sync:
977           Set synchronization policy with API server collection record.
978           :SYNC_READONLY:
979             Collection is read only.  No synchronization.  This mode will
980             also forego locking, which gives better performance.
981           :SYNC_EXPLICIT:
982             Synchronize on explicit request via `update()` or `save()`
983           :SYNC_LIVE:
984             Synchronize with server in response to background websocket events,
985             on block write, or on file close.
986
987         """
988         super(Collection, self).__init__(parent)
989         self._api_client = api_client
990         self._keep_client = keep_client
991         self._block_manager = block_manager
992         self._config = config
993         self.num_retries = num_retries
994         self._manifest_locator = None
995         self._manifest_text = None
996         self._api_response = None
997         self._sync = sync
998         self.lock = threading.RLock()
999         self.callbacks = []
1000         self.events = None
1001
1002         if manifest_locator_or_text:
1003             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1004                 self._manifest_locator = manifest_locator_or_text
1005             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1006                 self._manifest_locator = manifest_locator_or_text
1007             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1008                 self._manifest_text = manifest_locator_or_text
1009             else:
1010                 raise errors.ArgumentError(
1011                     "Argument to CollectionReader must be a manifest or a collection UUID")
1012
1013             self._populate()
1014
1015             if self._sync == SYNC_LIVE:
1016                 if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
1017                     raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
1018                 self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message)
1019
1020     @staticmethod
1021     def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
1022         c = Collection(sync=sync)
1023         c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1024         return c
1025
1026     def _root_lock(self):
1027         return self.lock
1028
1029     def sync_mode(self):
1030         return self._sync
1031
1032     def on_message(self):
1033         self.update()
1034
1035     @_synchronized
1036     def update(self):
1037         n = self._my_api().collections().get(uuid=self._manifest_locator, select=["manifest_text"]).execute()
1038         other = import_collection(n["manifest_text"])
1039         self.merge(other)
1040
1041     @_synchronized
1042     def _my_api(self):
1043         if self._api_client is None:
1044             self._api_client = arvados.api.SafeApi(self._config)
1045             self._keep_client = self._api_client.keep
1046         return self._api_client
1047
1048     @_synchronized
1049     def _my_keep(self):
1050         if self._keep_client is None:
1051             if self._api_client is None:
1052                 self._my_api()
1053             else:
1054                 self._keep_client = KeepClient(api=self._api_client)
1055         return self._keep_client
1056
1057     @_synchronized
1058     def _my_block_manager(self):
1059         if self._block_manager is None:
1060             self._block_manager = BlockManager(self._my_keep())
1061         return self._block_manager
1062
1063     def _populate_from_api_server(self):
1064         # As in KeepClient itself, we must wait until the last
1065         # possible moment to instantiate an API client, in order to
1066         # avoid tripping up clients that don't have access to an API
1067         # server.  If we do build one, make sure our Keep client uses
1068         # it.  If instantiation fails, we'll fall back to the except
1069         # clause, just like any other Collection lookup
1070         # failure. Return an exception, or None if successful.
1071         try:
1072             self._api_response = self._my_api().collections().get(
1073                 uuid=self._manifest_locator).execute(
1074                     num_retries=self.num_retries)
1075             self._manifest_text = self._api_response['manifest_text']
1076             return None
1077         except Exception as e:
1078             return e
1079
1080     def _populate_from_keep(self):
1081         # Retrieve a manifest directly from Keep. This has a chance of
1082         # working if [a] the locator includes a permission signature
1083         # or [b] the Keep services are operating in world-readable
1084         # mode. Return an exception, or None if successful.
1085         try:
1086             self._manifest_text = self._my_keep().get(
1087                 self._manifest_locator, num_retries=self.num_retries)
1088         except Exception as e:
1089             return e
1090
1091     def _populate(self):
1092         if self._manifest_locator is None and self._manifest_text is None:
1093             return
1094         error_via_api = None
1095         error_via_keep = None
1096         should_try_keep = ((self._manifest_text is None) and
1097                            util.keep_locator_pattern.match(
1098                                self._manifest_locator))
1099         if ((self._manifest_text is None) and
1100             util.signed_locator_pattern.match(self._manifest_locator)):
1101             error_via_keep = self._populate_from_keep()
1102         if self._manifest_text is None:
1103             error_via_api = self._populate_from_api_server()
1104             if error_via_api is not None and not should_try_keep:
1105                 raise error_via_api
1106         if ((self._manifest_text is None) and
1107             not error_via_keep and
1108             should_try_keep):
1109             # Looks like a keep locator, and we didn't already try keep above
1110             error_via_keep = self._populate_from_keep()
1111         if self._manifest_text is None:
1112             # Nothing worked!
1113             raise arvados.errors.NotFoundError(
1114                 ("Failed to retrieve collection '{}' " +
1115                  "from either API server ({}) or Keep ({})."
1116                  ).format(
1117                     self._manifest_locator,
1118                     error_via_api,
1119                     error_via_keep))
1120         # populate
1121         import_manifest(self._manifest_text, self)
1122
1123         if self._sync == SYNC_READONLY:
1124             # Now that we're populated, knowing that this will be readonly,
1125             # forego any further locking.
1126             self.lock = NoopLock()
1127
1128     def __enter__(self):
1129         return self
1130
1131     def __exit__(self, exc_type, exc_value, traceback):
1132         """Support scoped auto-commit in a with: block"""
1133         if self._sync != SYNC_READONLY:
1134             self.save(allow_no_locator=True)
1135         if self._block_manager is not None:
1136             self._block_manager.stop_threads()
1137
1138     @_synchronized
1139     def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
1140         if new_config is None:
1141             new_config = self.config
1142         c = Collection(parent=new_parent, config=new_config, sync=new_sync)
1143         if new_sync == SYNC_READONLY:
1144             c.lock = NoopLock()
1145         c._items = {}
1146         self._cloneinto(c)
1147         return c
1148
1149     @_synchronized
1150     def api_response(self):
1151         """
1152         api_response() -> dict or None
1153
1154         Returns information about this Collection fetched from the API server.
1155         If the Collection exists in Keep but not the API server, currently
1156         returns None.  Future versions may provide a synthetic response.
1157         """
1158         return self._api_response
1159
1160     @_must_be_writable
1161     @_synchronized
1162     def save(self, allow_no_locator=False):
1163         """Commit pending buffer blocks to Keep, write the manifest to Keep, and
1164         update the collection record to Keep.
1165
1166         :allow_no_locator:
1167           If there is no collection uuid associated with this
1168           Collection and `allow_no_locator` is False, raise an error.  If True,
1169           do not raise an error.
1170         """
1171         if self.modified():
1172             self._my_block_manager().commit_all()
1173             self._my_keep().put(self.manifest_text(strip=True))
1174             if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
1175                 self._api_response = self._my_api().collections().update(
1176                     uuid=self._manifest_locator,
1177                     body={'manifest_text': self.manifest_text(strip=False)}
1178                     ).execute(
1179                         num_retries=self.num_retries)
1180             elif not allow_no_locator:
1181                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
1182             self.set_unmodified()
1183
1184     @_must_be_writable
1185     @_synchronized
1186     def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
1187         """Save a new collection record.
1188
1189         :name:
1190           The collection name.
1191
1192         :owner_uuid:
1193           the user, or project uuid that will own this collection.
1194           If None, defaults to the current user.
1195
1196         :ensure_unique_name:
1197           If True, ask the API server to rename the collection
1198           if it conflicts with a collection with the same name and owner.  If
1199           False, a name conflict will result in an error.
1200
1201         """
1202         self._my_block_manager().commit_all()
1203         self._my_keep().put(self.manifest_text(strip=True))
1204         body = {"manifest_text": self.manifest_text(strip=False),
1205                 "name": name}
1206         if owner_uuid:
1207             body["owner_uuid"] = owner_uuid
1208         self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
1209
1210         if self.events:
1211             self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1212
1213         self._manifest_locator = self._api_response["uuid"]
1214
1215         if self.events:
1216             self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1217
1218         self.set_unmodified()
1219
1220     @_synchronized
1221     def subscribe(self, callback):
1222         self.callbacks.append(callback)
1223
1224     @_synchronized
1225     def unsubscribe(self, callback):
1226         self.callbacks.remove(callback)
1227
1228     @_synchronized
1229     def notify(self, collection, event, name, item):
1230         for c in self.callbacks:
1231             c(collection, event, name, item)
1232
1233 class Subcollection(SynchronizedCollectionBase):
1234     """This is a subdirectory within a collection that doesn't have its own API
1235     server record.  It falls under the umbrella of the root collection."""
1236
1237     def __init__(self, parent):
1238         super(Subcollection, self).__init__(parent)
1239         self.lock = parent._root_lock()
1240
1241     def _root_lock(self):
1242         return self.parent._root_lock()
1243
1244     def sync_mode(self):
1245         return self.parent.sync_mode()
1246
1247     def _my_api(self):
1248         return self.parent._my_api()
1249
1250     def _my_keep(self):
1251         return self.parent._my_keep()
1252
1253     def _my_block_manager(self):
1254         return self.parent._my_block_manager()
1255
1256     def _populate(self):
1257         self.parent._populate()
1258
1259     def notify(self, collection, event, name, item):
1260         self.parent.notify(collection, event, name, item)
1261
1262     @_synchronized
1263     def clone(self, new_parent):
1264         c = Subcollection(new_parent)
1265         c._items = {}
1266         self._cloneinto(c)
1267         return c
1268
1269 def import_manifest(manifest_text,
1270                     into_collection=None,
1271                     api_client=None,
1272                     keep=None,
1273                     num_retries=None,
1274                     sync=SYNC_READONLY):
1275     """Import a manifest into a `Collection`.
1276
1277     :manifest_text:
1278       The manifest text to import from.
1279
1280     :into_collection:
1281       The `Collection` that will be initialized (must be empty).
1282       If None, create a new `Collection` object.
1283
1284     :api_client:
1285       The API client object that will be used when creating a new `Collection` object.
1286
1287     :keep:
1288       The keep client object that will be used when creating a new `Collection` object.
1289
1290     :num_retries:
1291       the default number of api client and keep retries on error.
1292
1293     :sync:
1294       Collection sync mode (only if into_collection is None)
1295     """
1296     if into_collection is not None:
1297         if len(into_collection) > 0:
1298             raise ArgumentError("Can only import manifest into an empty collection")
1299         c = into_collection
1300     else:
1301         c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
1302
1303     save_sync = c.sync_mode()
1304     c._sync = None
1305
1306     STREAM_NAME = 0
1307     BLOCKS = 1
1308     SEGMENTS = 2
1309
1310     stream_name = None
1311     state = STREAM_NAME
1312
1313     for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1314         tok = n.group(1)
1315         sep = n.group(2)
1316
1317         if state == STREAM_NAME:
1318             # starting a new stream
1319             stream_name = tok.replace('\\040', ' ')
1320             blocks = []
1321             segments = []
1322             streamoffset = 0L
1323             state = BLOCKS
1324             continue
1325
1326         if state == BLOCKS:
1327             s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1328             if s:
1329                 blocksize = long(s.group(1))
1330                 blocks.append(Range(tok, streamoffset, blocksize))
1331                 streamoffset += blocksize
1332             else:
1333                 state = SEGMENTS
1334
1335         if state == SEGMENTS:
1336             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1337             if s:
1338                 pos = long(s.group(1))
1339                 size = long(s.group(2))
1340                 name = s.group(3).replace('\\040', ' ')
1341                 f = c.find("%s/%s" % (stream_name, name), create=True)
1342                 f.add_segment(blocks, pos, size)
1343             else:
1344                 # error!
1345                 raise errors.SyntaxError("Invalid manifest format")
1346
1347         if sep == "\n":
1348             stream_name = None
1349             state = STREAM_NAME
1350
1351     c.set_unmodified()
1352     c._sync = save_sync
1353     return c
1354
1355 def export_manifest(item, stream_name=".", portable_locators=False):
1356     """
1357     :item:
1358       Create a manifest for `item` (must be a `Collection` or `ArvadosFile`).  If
1359       `item` is a is a `Collection`, this will also export subcollections.
1360
1361     :stream_name:
1362       the name of the stream when exporting `item`.
1363
1364     :portable_locators:
1365       If True, strip any permission hints on block locators.
1366       If False, use block locators as-is.
1367     """
1368     buf = ""
1369     if isinstance(item, SynchronizedCollectionBase):
1370         stream = {}
1371         sorted_keys = sorted(item.keys())
1372         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1373             v = item[k]
1374             st = []
1375             for s in v.segments():
1376                 loc = s.locator
1377                 if loc.startswith("bufferblock"):
1378                     loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1379                 if portable_locators:
1380                     loc = KeepLocator(loc).stripped()
1381                 st.append(LocatorAndRange(loc, locator_block_size(loc),
1382                                      s.segment_offset, s.range_size))
1383             stream[k] = st
1384         if stream:
1385             buf += ' '.join(normalize_stream(stream_name, stream))
1386             buf += "\n"
1387         for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1388             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1389     elif isinstance(item, ArvadosFile):
1390         st = []
1391         for s in item.segments:
1392             loc = s.locator
1393             if loc.startswith("bufferblock"):
1394                 loc = item._bufferblocks[loc].calculate_locator()
1395             if portable_locators:
1396                 loc = KeepLocator(loc).stripped()
1397             st.append(LocatorAndRange(loc, locator_block_size(loc),
1398                                  s.segment_offset, s.range_size))
1399         stream[stream_name] = st
1400         buf += ' '.join(normalize_stream(stream_name, stream))
1401         buf += "\n"
1402     return buf