Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import time
7
8 from collections import deque
9 from stat import *
10
11 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable
12 from keep import *
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
15 import config
16 import errors
17 import util
18 import events
19
20 _logger = logging.getLogger('arvados.collection')
21
22 class CollectionBase(object):
23     def __enter__(self):
24         return self
25
26     def __exit__(self, exc_type, exc_value, traceback):
27         pass
28
29     def _my_keep(self):
30         if self._keep_client is None:
31             self._keep_client = KeepClient(api_client=self._api_client,
32                                            num_retries=self.num_retries)
33         return self._keep_client
34
35     def stripped_manifest(self):
36         """
37         Return the manifest for the current collection with all
38         non-portable hints (i.e., permission signatures and other
39         hints other than size hints) removed from the locators.
40         """
41         raw = self.manifest_text()
42         clean = []
43         for line in raw.split("\n"):
44             fields = line.split()
45             if fields:
46                 clean_fields = fields[:1] + [
47                     (re.sub(r'\+[^\d][^\+]*', '', x)
48                      if re.match(util.keep_locator_pattern, x)
49                      else x)
50                     for x in fields[1:]]
51                 clean += [' '.join(clean_fields), "\n"]
52         return ''.join(clean)
53
54
55 class CollectionReader(CollectionBase):
56     def __init__(self, manifest_locator_or_text, api_client=None,
57                  keep_client=None, num_retries=0):
58         """Instantiate a CollectionReader.
59
60         This class parses Collection manifests to provide a simple interface
61         to read its underlying files.
62
63         Arguments:
64         * manifest_locator_or_text: One of a Collection UUID, portable data
65           hash, or full manifest text.
66         * api_client: The API client to use to look up Collections.  If not
67           provided, CollectionReader will build one from available Arvados
68           configuration.
69         * keep_client: The KeepClient to use to download Collection data.
70           If not provided, CollectionReader will build one from available
71           Arvados configuration.
72         * num_retries: The default number of times to retry failed
73           service requests.  Default 0.  You may change this value
74           after instantiation, but note those changes may not
75           propagate to related objects like the Keep client.
76         """
77         self._api_client = api_client
78         self._keep_client = keep_client
79         self.num_retries = num_retries
80         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
81             self._manifest_locator = manifest_locator_or_text
82             self._manifest_text = None
83         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
84             self._manifest_locator = manifest_locator_or_text
85             self._manifest_text = None
86         elif re.match(util.manifest_pattern, manifest_locator_or_text):
87             self._manifest_text = manifest_locator_or_text
88             self._manifest_locator = None
89         else:
90             raise errors.ArgumentError(
91                 "Argument to CollectionReader must be a manifest or a collection UUID")
92         self._api_response = None
93         self._streams = None
94
95     def _populate_from_api_server(self):
96         # As in KeepClient itself, we must wait until the last
97         # possible moment to instantiate an API client, in order to
98         # avoid tripping up clients that don't have access to an API
99         # server.  If we do build one, make sure our Keep client uses
100         # it.  If instantiation fails, we'll fall back to the except
101         # clause, just like any other Collection lookup
102         # failure. Return an exception, or None if successful.
103         try:
104             if self._api_client is None:
105                 self._api_client = arvados.api('v1')
106                 self._keep_client = None  # Make a new one with the new api.
107             self._api_response = self._api_client.collections().get(
108                 uuid=self._manifest_locator).execute(
109                 num_retries=self.num_retries)
110             self._manifest_text = self._api_response['manifest_text']
111             return None
112         except Exception as e:
113             return e
114
115     def _populate_from_keep(self):
116         # Retrieve a manifest directly from Keep. This has a chance of
117         # working if [a] the locator includes a permission signature
118         # or [b] the Keep services are operating in world-readable
119         # mode. Return an exception, or None if successful.
120         try:
121             self._manifest_text = self._my_keep().get(
122                 self._manifest_locator, num_retries=self.num_retries)
123         except Exception as e:
124             return e
125
126     def _populate(self):
127         error_via_api = None
128         error_via_keep = None
129         should_try_keep = ((self._manifest_text is None) and
130                            util.keep_locator_pattern.match(
131                 self._manifest_locator))
132         if ((self._manifest_text is None) and
133             util.signed_locator_pattern.match(self._manifest_locator)):
134             error_via_keep = self._populate_from_keep()
135         if self._manifest_text is None:
136             error_via_api = self._populate_from_api_server()
137             if error_via_api is not None and not should_try_keep:
138                 raise error_via_api
139         if ((self._manifest_text is None) and
140             not error_via_keep and
141             should_try_keep):
142             # Looks like a keep locator, and we didn't already try keep above
143             error_via_keep = self._populate_from_keep()
144         if self._manifest_text is None:
145             # Nothing worked!
146             raise arvados.errors.NotFoundError(
147                 ("Failed to retrieve collection '{}' " +
148                  "from either API server ({}) or Keep ({})."
149                  ).format(
150                     self._manifest_locator,
151                     error_via_api,
152                     error_via_keep))
153         self._streams = [sline.split()
154                          for sline in self._manifest_text.split("\n")
155                          if sline]
156
157     @staticmethod
158     def _populate_first(orig_func):
159         # Decorator for methods that read actual Collection data.
160         @functools.wraps(orig_func)
161         def wrapper(self, *args, **kwargs):
162             if self._streams is None:
163                 self._populate()
164             return orig_func(self, *args, **kwargs)
165         return wrapper
166
167     @_populate_first
168     def api_response(self):
169         """api_response() -> dict or None
170
171         Returns information about this Collection fetched from the API server.
172         If the Collection exists in Keep but not the API server, currently
173         returns None.  Future versions may provide a synthetic response.
174         """
175         return self._api_response
176
177     @_populate_first
178     def normalize(self):
179         # Rearrange streams
180         streams = {}
181         for s in self.all_streams():
182             for f in s.all_files():
183                 streamname, filename = split(s.name() + "/" + f.name())
184                 if streamname not in streams:
185                     streams[streamname] = {}
186                 if filename not in streams[streamname]:
187                     streams[streamname][filename] = []
188                 for r in f.segments:
189                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
190
191         self._streams = [normalize_stream(s, streams[s])
192                          for s in sorted(streams)]
193
194         # Regenerate the manifest text based on the normalized streams
195         self._manifest_text = ''.join(
196             [StreamReader(stream, keep=self._my_keep()).manifest_text()
197              for stream in self._streams])
198
199     @_populate_first
200     def open(self, streampath, filename=None):
201         """open(streampath[, filename]) -> file-like object
202
203         Pass in the path of a file to read from the Collection, either as a
204         single string or as two separate stream name and file name arguments.
205         This method returns a file-like object to read that file.
206         """
207         if filename is None:
208             streampath, filename = split(streampath)
209         keep_client = self._my_keep()
210         for stream_s in self._streams:
211             stream = StreamReader(stream_s, keep_client,
212                                   num_retries=self.num_retries)
213             if stream.name() == streampath:
214                 break
215         else:
216             raise ValueError("stream '{}' not found in Collection".
217                              format(streampath))
218         try:
219             return stream.files()[filename]
220         except KeyError:
221             raise ValueError("file '{}' not found in Collection stream '{}'".
222                              format(filename, streampath))
223
224     @_populate_first
225     def all_streams(self):
226         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
227                 for s in self._streams]
228
229     def all_files(self):
230         for s in self.all_streams():
231             for f in s.all_files():
232                 yield f
233
234     @_populate_first
235     def manifest_text(self, strip=False, normalize=False):
236         if normalize:
237             cr = CollectionReader(self.manifest_text())
238             cr.normalize()
239             return cr.manifest_text(strip=strip, normalize=False)
240         elif strip:
241             return self.stripped_manifest()
242         else:
243             return self._manifest_text
244
245
246 class _WriterFile(ArvadosFileBase):
247     def __init__(self, coll_writer, name):
248         super(_WriterFile, self).__init__(name, 'wb')
249         self.dest = coll_writer
250
251     def close(self):
252         super(_WriterFile, self).close()
253         self.dest.finish_current_file()
254
255     @ArvadosFileBase._before_close
256     def write(self, data):
257         self.dest.write(data)
258
259     @ArvadosFileBase._before_close
260     def writelines(self, seq):
261         for data in seq:
262             self.write(data)
263
264     @ArvadosFileBase._before_close
265     def flush(self):
266         self.dest.flush_data()
267
268
269 class CollectionWriter(CollectionBase):
270     def __init__(self, api_client=None, num_retries=0):
271         """Instantiate a CollectionWriter.
272
273         CollectionWriter lets you build a new Arvados Collection from scratch.
274         Write files to it.  The CollectionWriter will upload data to Keep as
275         appropriate, and provide you with the Collection manifest text when
276         you're finished.
277
278         Arguments:
279         * api_client: The API client to use to look up Collections.  If not
280           provided, CollectionReader will build one from available Arvados
281           configuration.
282         * num_retries: The default number of times to retry failed
283           service requests.  Default 0.  You may change this value
284           after instantiation, but note those changes may not
285           propagate to related objects like the Keep client.
286         """
287         self._api_client = api_client
288         self.num_retries = num_retries
289         self._keep_client = None
290         self._data_buffer = []
291         self._data_buffer_len = 0
292         self._current_stream_files = []
293         self._current_stream_length = 0
294         self._current_stream_locators = []
295         self._current_stream_name = '.'
296         self._current_file_name = None
297         self._current_file_pos = 0
298         self._finished_streams = []
299         self._close_file = None
300         self._queued_file = None
301         self._queued_dirents = deque()
302         self._queued_trees = deque()
303         self._last_open = None
304
305     def __exit__(self, exc_type, exc_value, traceback):
306         if exc_type is None:
307             self.finish()
308
309     def do_queued_work(self):
310         # The work queue consists of three pieces:
311         # * _queued_file: The file object we're currently writing to the
312         #   Collection.
313         # * _queued_dirents: Entries under the current directory
314         #   (_queued_trees[0]) that we want to write or recurse through.
315         #   This may contain files from subdirectories if
316         #   max_manifest_depth == 0 for this directory.
317         # * _queued_trees: Directories that should be written as separate
318         #   streams to the Collection.
319         # This function handles the smallest piece of work currently queued
320         # (current file, then current directory, then next directory) until
321         # no work remains.  The _work_THING methods each do a unit of work on
322         # THING.  _queue_THING methods add a THING to the work queue.
323         while True:
324             if self._queued_file:
325                 self._work_file()
326             elif self._queued_dirents:
327                 self._work_dirents()
328             elif self._queued_trees:
329                 self._work_trees()
330             else:
331                 break
332
333     def _work_file(self):
334         while True:
335             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
336             if not buf:
337                 break
338             self.write(buf)
339         self.finish_current_file()
340         if self._close_file:
341             self._queued_file.close()
342         self._close_file = None
343         self._queued_file = None
344
345     def _work_dirents(self):
346         path, stream_name, max_manifest_depth = self._queued_trees[0]
347         if stream_name != self.current_stream_name():
348             self.start_new_stream(stream_name)
349         while self._queued_dirents:
350             dirent = self._queued_dirents.popleft()
351             target = os.path.join(path, dirent)
352             if os.path.isdir(target):
353                 self._queue_tree(target,
354                                  os.path.join(stream_name, dirent),
355                                  max_manifest_depth - 1)
356             else:
357                 self._queue_file(target, dirent)
358                 break
359         if not self._queued_dirents:
360             self._queued_trees.popleft()
361
362     def _work_trees(self):
363         path, stream_name, max_manifest_depth = self._queued_trees[0]
364         d = util.listdir_recursive(
365             path, max_depth = (None if max_manifest_depth == 0 else 0))
366         if d:
367             self._queue_dirents(stream_name, d)
368         else:
369             self._queued_trees.popleft()
370
371     def _queue_file(self, source, filename=None):
372         assert (self._queued_file is None), "tried to queue more than one file"
373         if not hasattr(source, 'read'):
374             source = open(source, 'rb')
375             self._close_file = True
376         else:
377             self._close_file = False
378         if filename is None:
379             filename = os.path.basename(source.name)
380         self.start_new_file(filename)
381         self._queued_file = source
382
383     def _queue_dirents(self, stream_name, dirents):
384         assert (not self._queued_dirents), "tried to queue more than one tree"
385         self._queued_dirents = deque(sorted(dirents))
386
387     def _queue_tree(self, path, stream_name, max_manifest_depth):
388         self._queued_trees.append((path, stream_name, max_manifest_depth))
389
390     def write_file(self, source, filename=None):
391         self._queue_file(source, filename)
392         self.do_queued_work()
393
394     def write_directory_tree(self,
395                              path, stream_name='.', max_manifest_depth=-1):
396         self._queue_tree(path, stream_name, max_manifest_depth)
397         self.do_queued_work()
398
399     def write(self, newdata):
400         if hasattr(newdata, '__iter__'):
401             for s in newdata:
402                 self.write(s)
403             return
404         self._data_buffer.append(newdata)
405         self._data_buffer_len += len(newdata)
406         self._current_stream_length += len(newdata)
407         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
408             self.flush_data()
409
410     def open(self, streampath, filename=None):
411         """open(streampath[, filename]) -> file-like object
412
413         Pass in the path of a file to write to the Collection, either as a
414         single string or as two separate stream name and file name arguments.
415         This method returns a file-like object you can write to add it to the
416         Collection.
417
418         You may only have one file object from the Collection open at a time,
419         so be sure to close the object when you're done.  Using the object in
420         a with statement makes that easy::
421
422           with cwriter.open('./doc/page1.txt') as outfile:
423               outfile.write(page1_data)
424           with cwriter.open('./doc/page2.txt') as outfile:
425               outfile.write(page2_data)
426         """
427         if filename is None:
428             streampath, filename = split(streampath)
429         if self._last_open and not self._last_open.closed:
430             raise errors.AssertionError(
431                 "can't open '{}' when '{}' is still open".format(
432                     filename, self._last_open.name))
433         if streampath != self.current_stream_name():
434             self.start_new_stream(streampath)
435         self.set_current_file_name(filename)
436         self._last_open = _WriterFile(self, filename)
437         return self._last_open
438
439     def flush_data(self):
440         data_buffer = ''.join(self._data_buffer)
441         if data_buffer:
442             self._current_stream_locators.append(
443                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
444             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
445             self._data_buffer_len = len(self._data_buffer[0])
446
447     def start_new_file(self, newfilename=None):
448         self.finish_current_file()
449         self.set_current_file_name(newfilename)
450
451     def set_current_file_name(self, newfilename):
452         if re.search(r'[\t\n]', newfilename):
453             raise errors.AssertionError(
454                 "Manifest filenames cannot contain whitespace: %s" %
455                 newfilename)
456         elif re.search(r'\x00', newfilename):
457             raise errors.AssertionError(
458                 "Manifest filenames cannot contain NUL characters: %s" %
459                 newfilename)
460         self._current_file_name = newfilename
461
462     def current_file_name(self):
463         return self._current_file_name
464
465     def finish_current_file(self):
466         if self._current_file_name is None:
467             if self._current_file_pos == self._current_stream_length:
468                 return
469             raise errors.AssertionError(
470                 "Cannot finish an unnamed file " +
471                 "(%d bytes at offset %d in '%s' stream)" %
472                 (self._current_stream_length - self._current_file_pos,
473                  self._current_file_pos,
474                  self._current_stream_name))
475         self._current_stream_files.append([
476                 self._current_file_pos,
477                 self._current_stream_length - self._current_file_pos,
478                 self._current_file_name])
479         self._current_file_pos = self._current_stream_length
480         self._current_file_name = None
481
482     def start_new_stream(self, newstreamname='.'):
483         self.finish_current_stream()
484         self.set_current_stream_name(newstreamname)
485
486     def set_current_stream_name(self, newstreamname):
487         if re.search(r'[\t\n]', newstreamname):
488             raise errors.AssertionError(
489                 "Manifest stream names cannot contain whitespace")
490         self._current_stream_name = '.' if newstreamname=='' else newstreamname
491
492     def current_stream_name(self):
493         return self._current_stream_name
494
495     def finish_current_stream(self):
496         self.finish_current_file()
497         self.flush_data()
498         if not self._current_stream_files:
499             pass
500         elif self._current_stream_name is None:
501             raise errors.AssertionError(
502                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
503                 (self._current_stream_length, len(self._current_stream_files)))
504         else:
505             if not self._current_stream_locators:
506                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
507             self._finished_streams.append([self._current_stream_name,
508                                            self._current_stream_locators,
509                                            self._current_stream_files])
510         self._current_stream_files = []
511         self._current_stream_length = 0
512         self._current_stream_locators = []
513         self._current_stream_name = None
514         self._current_file_pos = 0
515         self._current_file_name = None
516
517     def finish(self):
518         # Store the manifest in Keep and return its locator.
519         return self._my_keep().put(self.manifest_text())
520
521     def portable_data_hash(self):
522         stripped = self.stripped_manifest()
523         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
524
525     def manifest_text(self):
526         self.finish_current_stream()
527         manifest = ''
528
529         for stream in self._finished_streams:
530             if not re.search(r'^\.(/.*)?$', stream[0]):
531                 manifest += './'
532             manifest += stream[0].replace(' ', '\\040')
533             manifest += ' ' + ' '.join(stream[1])
534             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
535             manifest += "\n"
536
537         return manifest
538
539     def data_locators(self):
540         ret = []
541         for name, locators, files in self._finished_streams:
542             ret += locators
543         return ret
544
545
546 class ResumableCollectionWriter(CollectionWriter):
547     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
548                    '_current_stream_locators', '_current_stream_name',
549                    '_current_file_name', '_current_file_pos', '_close_file',
550                    '_data_buffer', '_dependencies', '_finished_streams',
551                    '_queued_dirents', '_queued_trees']
552
553     def __init__(self, api_client=None, num_retries=0):
554         self._dependencies = {}
555         super(ResumableCollectionWriter, self).__init__(
556             api_client, num_retries=num_retries)
557
558     @classmethod
559     def from_state(cls, state, *init_args, **init_kwargs):
560         # Try to build a new writer from scratch with the given state.
561         # If the state is not suitable to resume (because files have changed,
562         # been deleted, aren't predictable, etc.), raise a
563         # StaleWriterStateError.  Otherwise, return the initialized writer.
564         # The caller is responsible for calling writer.do_queued_work()
565         # appropriately after it's returned.
566         writer = cls(*init_args, **init_kwargs)
567         for attr_name in cls.STATE_PROPS:
568             attr_value = state[attr_name]
569             attr_class = getattr(writer, attr_name).__class__
570             # Coerce the value into the same type as the initial value, if
571             # needed.
572             if attr_class not in (type(None), attr_value.__class__):
573                 attr_value = attr_class(attr_value)
574             setattr(writer, attr_name, attr_value)
575         # Check dependencies before we try to resume anything.
576         if any(KeepLocator(ls).permission_expired()
577                for ls in writer._current_stream_locators):
578             raise errors.StaleWriterStateError(
579                 "locators include expired permission hint")
580         writer.check_dependencies()
581         if state['_current_file'] is not None:
582             path, pos = state['_current_file']
583             try:
584                 writer._queued_file = open(path, 'rb')
585                 writer._queued_file.seek(pos)
586             except IOError as error:
587                 raise errors.StaleWriterStateError(
588                     "failed to reopen active file {}: {}".format(path, error))
589         return writer
590
591     def check_dependencies(self):
592         for path, orig_stat in self._dependencies.items():
593             if not S_ISREG(orig_stat[ST_MODE]):
594                 raise errors.StaleWriterStateError("{} not file".format(path))
595             try:
596                 now_stat = tuple(os.stat(path))
597             except OSError as error:
598                 raise errors.StaleWriterStateError(
599                     "failed to stat {}: {}".format(path, error))
600             if ((not S_ISREG(now_stat[ST_MODE])) or
601                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
602                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
603                 raise errors.StaleWriterStateError("{} changed".format(path))
604
605     def dump_state(self, copy_func=lambda x: x):
606         state = {attr: copy_func(getattr(self, attr))
607                  for attr in self.STATE_PROPS}
608         if self._queued_file is None:
609             state['_current_file'] = None
610         else:
611             state['_current_file'] = (os.path.realpath(self._queued_file.name),
612                                       self._queued_file.tell())
613         return state
614
615     def _queue_file(self, source, filename=None):
616         try:
617             src_path = os.path.realpath(source)
618         except Exception:
619             raise errors.AssertionError("{} not a file path".format(source))
620         try:
621             path_stat = os.stat(src_path)
622         except OSError as stat_error:
623             path_stat = None
624         super(ResumableCollectionWriter, self)._queue_file(source, filename)
625         fd_stat = os.fstat(self._queued_file.fileno())
626         if not S_ISREG(fd_stat.st_mode):
627             # We won't be able to resume from this cache anyway, so don't
628             # worry about further checks.
629             self._dependencies[source] = tuple(fd_stat)
630         elif path_stat is None:
631             raise errors.AssertionError(
632                 "could not stat {}: {}".format(source, stat_error))
633         elif path_stat.st_ino != fd_stat.st_ino:
634             raise errors.AssertionError(
635                 "{} changed between open and stat calls".format(source))
636         else:
637             self._dependencies[src_path] = tuple(fd_stat)
638
639     def write(self, data):
640         if self._queued_file is None:
641             raise errors.AssertionError(
642                 "resumable writer can't accept unsourced data")
643         return super(ResumableCollectionWriter, self).write(data)
644
645
646 class SynchronizedCollectionBase(CollectionBase):
647     SYNC_READONLY = 1
648     SYNC_EXPLICIT = 2
649     SYNC_LIVE = 3
650
651     ADD = "add"
652     DEL = "del"
653
654     def __init__(self, parent=None):
655         self.parent = parent
656         self._items = None
657
658     def _my_api(self):
659         raise NotImplementedError()
660
661     def _my_keep(self):
662         raise NotImplementedError()
663
664     def _my_block_manager(self):
665         raise NotImplementedError()
666
667     def _root_lock(self):
668         raise NotImplementedError()
669
670     def _populate(self):
671         raise NotImplementedError()
672
673     def sync_mode(self):
674         raise NotImplementedError()
675
676     def notify(self, collection, event, name, item):
677         raise NotImplementedError()
678
679     @_synchronized
680     def find(self, path, create=False, create_collection=False):
681         """Recursively search the specified file path.  May return either a Collection
682         or ArvadosFile.
683
684         :create:
685           If true, create path components (i.e. Collections) that are
686           missing.  If "create" is False, return None if a path component is
687           not found.
688
689         :create_collection:
690           If the path is not found, "create" is True, and
691           "create_collection" is False, then create and return a new
692           ArvadosFile for the last path component.  If "create_collection" is
693           True, then create and return a new Collection for the last path
694           component.
695
696         """
697         if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
698             raise IOError((errno.EROFS, "Collection is read only"))
699
700         p = path.split("/")
701         if p[0] == '.':
702             del p[0]
703
704         if len(p) > 0:
705             item = self._items.get(p[0])
706             if len(p) == 1:
707                 # item must be a file
708                 if item is None and create:
709                     # create new file
710                     if create_collection:
711                         item = Subcollection(self)
712                     else:
713                         item = ArvadosFile(self)
714                     self._items[p[0]] = item
715                     self.notify(self, ADD, p[0], item)
716                 return item
717             else:
718                 if item is None and create:
719                     # create new collection
720                     item = Subcollection(self)
721                     self._items[p[0]] = item
722                     self.notify(self, ADD, p[0], item)
723                 del p[0]
724                 return item.find("/".join(p), create=create)
725         else:
726             return self
727
728     def open(self, path, mode):
729         """Open a file-like object for access.
730
731         :path:
732           path to a file in the collection
733         :mode:
734           one of "r", "r+", "w", "w+", "a", "a+"
735           :"r":
736             opens for reading
737           :"r+":
738             opens for reading and writing.  Reads/writes share a file pointer.
739           :"w", "w+":
740             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
741           :"a", "a+":
742             opens for reading and writing.  All writes are appended to
743             the end of the file.  Writing does not affect the file pointer for
744             reading.
745         """
746         mode = mode.replace("b", "")
747         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
748             raise ArgumentError("Bad mode '%s'" % mode)
749         create = (mode != "r")
750
751         if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
752             raise IOError((errno.EROFS, "Collection is read only"))
753
754         f = self.find(path, create=create)
755         if f is None:
756             raise IOError((errno.ENOENT, "File not found"))
757         if not isinstance(f, ArvadosFile):
758             raise IOError((errno.EISDIR, "Path must refer to a file."))
759
760         if mode[0] == "w":
761             f.truncate(0)
762
763         if mode == "r":
764             return ArvadosFileReader(f, path, mode)
765         else:
766             return ArvadosFileWriter(f, path, mode)
767
768     @_synchronized
769     def modified(self):
770         """Test if the collection (or any subcollection or file) has been modified
771         since it was created."""
772         for k,v in self._items.items():
773             if v.modified():
774                 return True
775         return False
776
777     @_synchronized
778     def set_unmodified(self):
779         """Recursively clear modified flag"""
780         for k,v in self._items.items():
781             v.set_unmodified()
782
783     @_synchronized
784     def __iter__(self):
785         """Iterate over names of files and collections contained in this collection."""
786         return self._items.keys()
787
788     @_synchronized
789     def iterkeys(self):
790         """Iterate over names of files and collections directly contained in this collection."""
791         return self._items.keys()
792
793     @_synchronized
794     def __getitem__(self, k):
795         """Get a file or collection that is directly contained by this collection.  If
796         you want to search a path, use `find()` instead.
797         """
798         return self._items[k]
799
800     @_synchronized
801     def __contains__(self, k):
802         """If there is a file or collection a directly contained by this collection
803         with name "k"."""
804         return k in self._items
805
806     @_synchronized
807     def __len__(self):
808         """Get the number of items directly contained in this collection"""
809         return len(self._items)
810
811     @_must_be_writable
812     @_synchronized
813     def __delitem__(self, p):
814         """Delete an item by name which is directly contained by this collection."""
815         del self._items[p]
816         self.notify(self, DEL, p, None)
817
818     @_synchronized
819     def keys(self):
820         """Get a list of names of files and collections directly contained in this collection."""
821         return self._items.keys()
822
823     @_synchronized
824     def values(self):
825         """Get a list of files and collection objects directly contained in this collection."""
826         return self._items.values()
827
828     @_synchronized
829     def items(self):
830         """Get a list of (name, object) tuples directly contained in this collection."""
831         return self._items.items()
832
833     def exists(self, path):
834         """Test if there is a file or collection at "path" """
835         return self.find(path) != None
836
837     @_must_be_writable
838     @_synchronized
839     def remove(self, path, rm_r=False):
840         """Remove the file or subcollection (directory) at `path`.
841         :rm_r:
842           Specify whether to remove non-empty subcollections (True), or raise an error (False).
843         """
844         p = path.split("/")
845         if p[0] == '.':
846             # Remove '.' from the front of the path
847             del p[0]
848
849         if len(p) > 0:
850             item = self._items.get(p[0])
851             if item is None:
852                 raise IOError((errno.ENOENT, "File not found"))
853             if len(p) == 1:
854                 if isinstance(SynchronizedCollection, self._items[p[0]]) and len(self._items[p[0]]) > 0 and not rm_r:
855                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
856                 del self._items[p[0]]
857                 self.notify(self, DEL, p[0], None)
858             else:
859                 del p[0]
860                 item.remove("/".join(p))
861         else:
862             raise IOError((errno.ENOENT, "File not found"))
863
864     def _cloneinto(self, target):
865         for k,v in self._items:
866             target._items[k] = v.clone(new_parent=target)
867
868     def clone(self):
869         raise NotImplementedError()
870
871     @_must_be_writable
872     @_synchronized
873     def copyto(self, target_path, source_path, source_collection=None, overwrite=False):
874         """
875         copyto('/foo', '/bar') will overwrite 'foo' if it exists.
876         copyto('/foo/', '/bar') will place 'bar' in subcollection 'foo'
877         """
878         if source_collection is None:
879             source_collection = self
880
881         # Find the object to copy
882         sp = source_path.split("/")
883         source_obj = source_collection.find(source_path)
884         if source_obj is None:
885             raise IOError((errno.ENOENT, "File not found"))
886
887         # Find parent collection the target path
888         tp = target_path.split("/")
889         target_dir = self.find(tp[0:-1].join("/"), create=True, create_collection=True)
890
891         # Determine the name to use.
892         target_name = tp[-1] if tp[-1] else sp[-1]
893
894         if target_name in target_dir and not overwrite:
895             raise IOError((errno.EEXIST, "File already exists"))
896
897         # Actually make the copy.
898         dup = source_obj.clone(target_dir)
899         with target_dir.lock:
900             target_dir._items[target_name] = dup
901
902         self.notify(target_dir, ADD, target_name, dup)
903
904
905     @_synchronized
906     def manifest_text(self, strip=False, normalize=False):
907         """Get the manifest text for this collection, sub collections and files.
908
909         :strip:
910           If True, remove signing tokens from block locators if present.
911           If False, block locators are left unchanged.
912
913         :normalize:
914           If True, always export the manifest text in normalized form
915           even if the Collection is not modified.  If False and the collection
916           is not modified, return the original manifest text even if it is not
917           in normalized form.
918
919         """
920         if self.modified() or self._manifest_text is None or normalize:
921             return export_manifest(self, stream_name=".", portable_locators=strip)
922         else:
923             if strip:
924                 return self.stripped_manifest()
925             else:
926                 return self._manifest_text
927
928     @_must_be_writable
929     @_synchronized
930     def merge(self, other):
931         for k in other.keys():
932             if k in self:
933                 if isinstance(self[k], Subcollection) and isinstance(other[k], Subcollection):
934                     self[k].merge(other[k])
935                 else:
936                     if self[k] != other[k]:
937                         name = "%s~conflict-%s~" % (k, time.strftime("%Y-%m-%d~%H:%M%:%S",
938                                                                      time.gmtime()))
939                         self[name] = other[k].clone(self)
940                         self.notify(self, name, ADD, self[name])
941             else:
942                 self[k] = other[k].clone(self)
943                 self.notify(self, k, ADD, self[k])
944
945     def portable_data_hash(self):
946         """Get the portable data hash for this collection's manifest."""
947         stripped = self.manifest_text(strip=True)
948         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
949
950
951 class Collection(SynchronizedCollectionBase):
952     """Store an Arvados collection, consisting of a set of files and
953     sub-collections.
954     """
955
956     def __init__(self, manifest_locator_or_text=None,
957                  parent=None,
958                  config=None,
959                  api_client=None,
960                  keep_client=None,
961                  num_retries=0,
962                  block_manager=None,
963                  sync=Collection.SYNC_READONLY):
964         """:manifest_locator_or_text:
965           One of Arvados collection UUID, block locator of
966           a manifest, raw manifest text, or None (to create an empty collection).
967         :parent:
968           the parent Collection, may be None.
969         :config:
970           the arvados configuration to get the hostname and api token.
971           Prefer this over supplying your own api_client and keep_client (except in testing).
972           Will use default config settings if not specified.
973         :api_client:
974           The API client object to use for requests.  If not specified, create one using `config`.
975         :keep_client:
976           the Keep client to use for requests.  If not specified, create one using `config`.
977         :num_retries:
978           the number of retries for API and Keep requests.
979         :block_manager:
980           the block manager to use.  If not specified, create one.
981         :sync:
982           Set synchronization policy with API server collection record.
983           :SYNC_READONLY:
984             Collection is read only.  No synchronization.  This mode will
985             also forego locking, which gives better performance.
986           :SYNC_EXPLICIT:
987             Synchronize on explicit request via `merge()` or `save()`
988           :SYNC_LIVE:
989             Synchronize with server in response to background websocket events,
990             on block write, or on file close.
991
992         """
993
994         self.parent = parent
995         self._items = None
996         self._api_client = api_client
997         self._keep_client = keep_client
998         self._block_manager = block_manager
999         self._config = config
1000         self.num_retries = num_retries
1001         self._manifest_locator = None
1002         self._manifest_text = None
1003         self._api_response = None
1004         self._sync = sync
1005         self.lock = threading.RLock()
1006         self.callbacks = []
1007
1008         if manifest_locator_or_text:
1009             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1010                 self._manifest_locator = manifest_locator_or_text
1011             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1012                 self._manifest_locator = manifest_locator_or_text
1013             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1014                 self._manifest_text = manifest_locator_or_text
1015             else:
1016                 raise errors.ArgumentError(
1017                     "Argument to CollectionReader must be a manifest or a collection UUID")
1018
1019             self._populate()
1020
1021             if self._sync == SYNC_LIVE:
1022                 if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
1023                     raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
1024                 self.events = events.subscribe(arvados.api(), filters=[["object_uuid", "=", self._manifest_locator]], self.on_message)
1025
1026     @staticmethod
1027     def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
1028         c = Collection(sync=SYNC_EXPLICIT)
1029         c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1030         return c
1031
1032     def _root_lock(self):
1033         return self.lock
1034
1035     def sync_mode(self):
1036         return self._sync
1037
1038     @_synchronized
1039     def on_message():
1040         n = self._my_api().collections().get(uuid=self._manifest_locator, select=[["manifest_text"])).execute()
1041         other = import_collection(n["manifest_text"])
1042         self.merge(other)
1043
1044     @_synchronized
1045     def _my_api(self):
1046         if self._api_client is None:
1047             self._api_client = arvados.api.SafeApi(self._config)
1048             self._keep_client = self._api_client.keep
1049         return self._api_client
1050
1051     @_synchronized
1052     def _my_keep(self):
1053         if self._keep_client is None:
1054             if self._api_client is None:
1055                 self._my_api()
1056             else:
1057                 self._keep_client = KeepClient(api=self._api_client)
1058         return self._keep_client
1059
1060     @_synchronized
1061     def _my_block_manager(self):
1062         if self._block_manager is None:
1063             self._block_manager = BlockManager(self._my_keep())
1064         return self._block_manager
1065
1066     def _populate_from_api_server(self):
1067         # As in KeepClient itself, we must wait until the last
1068         # possible moment to instantiate an API client, in order to
1069         # avoid tripping up clients that don't have access to an API
1070         # server.  If we do build one, make sure our Keep client uses
1071         # it.  If instantiation fails, we'll fall back to the except
1072         # clause, just like any other Collection lookup
1073         # failure. Return an exception, or None if successful.
1074         try:
1075             self._api_response = self._my_api().collections().get(
1076                 uuid=self._manifest_locator).execute(
1077                     num_retries=self.num_retries)
1078             self._manifest_text = self._api_response['manifest_text']
1079             return None
1080         except Exception as e:
1081             return e
1082
1083     def _populate_from_keep(self):
1084         # Retrieve a manifest directly from Keep. This has a chance of
1085         # working if [a] the locator includes a permission signature
1086         # or [b] the Keep services are operating in world-readable
1087         # mode. Return an exception, or None if successful.
1088         try:
1089             self._manifest_text = self._my_keep().get(
1090                 self._manifest_locator, num_retries=self.num_retries)
1091         except Exception as e:
1092             return e
1093
1094     def _populate(self):
1095         self._items = {}
1096         if self._manifest_locator is None and self._manifest_text is None:
1097             return
1098         error_via_api = None
1099         error_via_keep = None
1100         should_try_keep = ((self._manifest_text is None) and
1101                            util.keep_locator_pattern.match(
1102                                self._manifest_locator))
1103         if ((self._manifest_text is None) and
1104             util.signed_locator_pattern.match(self._manifest_locator)):
1105             error_via_keep = self._populate_from_keep()
1106         if self._manifest_text is None:
1107             error_via_api = self._populate_from_api_server()
1108             if error_via_api is not None and not should_try_keep:
1109                 raise error_via_api
1110         if ((self._manifest_text is None) and
1111             not error_via_keep and
1112             should_try_keep):
1113             # Looks like a keep locator, and we didn't already try keep above
1114             error_via_keep = self._populate_from_keep()
1115         if self._manifest_text is None:
1116             # Nothing worked!
1117             raise arvados.errors.NotFoundError(
1118                 ("Failed to retrieve collection '{}' " +
1119                  "from either API server ({}) or Keep ({})."
1120                  ).format(
1121                     self._manifest_locator,
1122                     error_via_api,
1123                     error_via_keep))
1124         # populate
1125         import_manifest(self._manifest_text, self)
1126
1127         if self._sync == SYNC_READONLY:
1128             # Now that we're populated, knowing that this will be readonly,
1129             # forego any further locking.
1130             self.lock = NoopLock()
1131
1132     def __enter__(self):
1133         return self
1134
1135     def __exit__(self, exc_type, exc_value, traceback):
1136         """Support scoped auto-commit in a with: block"""
1137         self.save(allow_no_locator=True)
1138         if self._block_manager is not None:
1139             self._block_manager.stop_threads()
1140
1141     @_synchronized
1142     def clone(self, new_parent=None, new_sync=Collection.SYNC_READONLY, new_config=self.config):
1143         c = Collection(parent=new_parent, config=new_config, sync=new_sync)
1144         if new_sync == Collection.SYNC_READONLY:
1145             c.lock = NoopLock()
1146         c._items = {}
1147         self._cloneinto(c)
1148         return c
1149
1150     @_synchronized
1151     def api_response(self):
1152         """
1153         api_response() -> dict or None
1154
1155         Returns information about this Collection fetched from the API server.
1156         If the Collection exists in Keep but not the API server, currently
1157         returns None.  Future versions may provide a synthetic response.
1158         """
1159         return self._api_response
1160
1161     @_must_be_writable
1162     @_synchronized
1163     def save(self, allow_no_locator=False):
1164         """Commit pending buffer blocks to Keep, write the manifest to Keep, and
1165         update the collection record to Keep.
1166
1167         :allow_no_locator:
1168           If there is no collection uuid associated with this
1169           Collection and `allow_no_locator` is False, raise an error.  If True,
1170           do not raise an error.
1171         """
1172         if self.modified():
1173             self._my_block_manager().commit_all()
1174             self._my_keep().put(self.manifest_text(strip=True))
1175             if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
1176                 self._api_response = self._my_api().collections().update(
1177                     uuid=self._manifest_locator,
1178                     body={'manifest_text': self.manifest_text(strip=False)}
1179                     ).execute(
1180                         num_retries=self.num_retries)
1181             elif not allow_no_locator:
1182                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
1183             self.set_unmodified()
1184
1185     @_must_be_writable
1186     @_synchronized
1187     def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
1188         """Save a new collection record.
1189
1190         :name:
1191           The collection name.
1192
1193         :owner_uuid:
1194           the user, or project uuid that will own this collection.
1195           If None, defaults to the current user.
1196
1197         :ensure_unique_name:
1198           If True, ask the API server to rename the collection
1199           if it conflicts with a collection with the same name and owner.  If
1200           False, a name conflict will result in an error.
1201
1202         """
1203         self._my_block_manager().commit_all()
1204         self._my_keep().put(self.manifest_text(strip=True))
1205         body = {"manifest_text": self.manifest_text(strip=False),
1206                 "name": name}
1207         if owner_uuid:
1208             body["owner_uuid"] = owner_uuid
1209         self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
1210
1211         if self.events:
1212             self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1213
1214         self._manifest_locator = self._api_response["uuid"]
1215
1216         if self.events:
1217             self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1218
1219         self.set_unmodified()
1220
1221     @_synchronized
1222     def subscribe(self, callback):
1223         self.callbacks.append(callback)
1224
1225     @_synchronized
1226     def unsubscribe(self, callback):
1227         self.callbacks.remove(callback)
1228
1229     @_synchronized
1230     def notify(self, event):
1231         for c in self.callbacks:
1232             c(event)
1233
1234 class Subcollection(SynchronizedCollectionBase):
1235     """This is a subdirectory within a collection that doesn't have its own API
1236     server record.  It falls under the umbrella of the root collection."""
1237
1238     def __init__(self, parent):
1239         super(Subcollection, self).__init__(parent)
1240         self.lock = parent._root_lock()
1241
1242     def _root_lock():
1243         return self.parent._root_lock()
1244
1245     def sync_mode(self):
1246         return self.parent.sync_mode()
1247
1248     def _my_api(self):
1249         return self.parent._my_api()
1250
1251     def _my_keep(self):
1252         return self.parent._my_keep()
1253
1254     def _my_block_manager(self):
1255         return self.parent._my_block_manager()
1256
1257     def _populate(self):
1258         self.parent._populate()
1259
1260     def notify(self, event):
1261         self.parent.notify(event)
1262
1263     @_synchronized
1264     def clone(self, new_parent):
1265         c = Subcollection(new_parent)
1266         c._items = {}
1267         self._cloneinto(c)
1268         return c
1269
1270 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
1271     """Import a manifest into a `Collection`.
1272
1273     :manifest_text:
1274       The manifest text to import from.
1275
1276     :into_collection:
1277       The `Collection` that will be initialized (must be empty).
1278       If None, create a new `Collection` object.
1279
1280     :api_client:
1281       The API client object that will be used when creating a new `Collection` object.
1282
1283     :keep:
1284       The keep client object that will be used when creating a new `Collection` object.
1285
1286     num_retries
1287       the default number of api client and keep retries on error.
1288     """
1289     if into_collection is not None:
1290         if len(into_collection) > 0:
1291             raise ArgumentError("Can only import manifest into an empty collection")
1292         c = into_collection
1293     else:
1294         c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
1295
1296     STREAM_NAME = 0
1297     BLOCKS = 1
1298     SEGMENTS = 2
1299
1300     stream_name = None
1301     state = STREAM_NAME
1302
1303     for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1304         tok = n.group(1)
1305         sep = n.group(2)
1306
1307         if state == STREAM_NAME:
1308             # starting a new stream
1309             stream_name = tok.replace('\\040', ' ')
1310             blocks = []
1311             segments = []
1312             streamoffset = 0L
1313             state = BLOCKS
1314             continue
1315
1316         if state == BLOCKS:
1317             s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1318             if s:
1319                 blocksize = long(s.group(1))
1320                 blocks.append(Range(tok, streamoffset, blocksize))
1321                 streamoffset += blocksize
1322             else:
1323                 state = SEGMENTS
1324
1325         if state == SEGMENTS:
1326             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1327             if s:
1328                 pos = long(s.group(1))
1329                 size = long(s.group(2))
1330                 name = s.group(3).replace('\\040', ' ')
1331                 f = c.find("%s/%s" % (stream_name, name), create=True)
1332                 f.add_segment(blocks, pos, size)
1333             else:
1334                 # error!
1335                 raise errors.SyntaxError("Invalid manifest format")
1336
1337         if sep == "\n":
1338             stream_name = None
1339             state = STREAM_NAME
1340
1341     c.set_unmodified()
1342     return c
1343
1344 def export_manifest(item, stream_name=".", portable_locators=False):
1345     """
1346     :item:
1347       Create a manifest for `item` (must be a `Collection` or `ArvadosFile`).  If
1348       `item` is a is a `Collection`, this will also export subcollections.
1349
1350     :stream_name:
1351       the name of the stream when exporting `item`.
1352
1353     :portable_locators:
1354       If True, strip any permission hints on block locators.
1355       If False, use block locators as-is.
1356     """
1357     buf = ""
1358     if isinstance(item, SynchronizedCollectionBase):
1359         stream = {}
1360         sorted_keys = sorted(item.keys())
1361         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1362             v = item[k]
1363             st = []
1364             for s in v.segments:
1365                 loc = s.locator
1366                 if loc.startswith("bufferblock"):
1367                     loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1368                 if portable_locators:
1369                     loc = KeepLocator(loc).stripped()
1370                 st.append(LocatorAndRange(loc, locator_block_size(loc),
1371                                      s.segment_offset, s.range_size))
1372             stream[k] = st
1373         if stream:
1374             buf += ' '.join(normalize_stream(stream_name, stream))
1375             buf += "\n"
1376         for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1377             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1378     elif isinstance(item, ArvadosFile):
1379         st = []
1380         for s in item.segments:
1381             loc = s.locator
1382             if loc.startswith("bufferblock"):
1383                 loc = item._bufferblocks[loc].calculate_locator()
1384             if portable_locators:
1385                 loc = KeepLocator(loc).stripped()
1386             st.append(LocatorAndRange(loc, locator_block_size(loc),
1387                                  s.segment_offset, s.range_size))
1388         stream[stream_name] = st
1389         buf += ' '.join(normalize_stream(stream_name, stream))
1390         buf += "\n"
1391     return buf