4823: Add copyinto()
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6
7 from collections import deque
8 from stat import *
9
10 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
11 from keep import *
12 from .stream import StreamReader, normalize_stream, locator_block_size
13 from .ranges import Range, LocatorAndRange
14 import config
15 import errors
16 import util
17
18 _logger = logging.getLogger('arvados.collection')
19
20 class CollectionBase(object):
21     def __enter__(self):
22         return self
23
24     def __exit__(self, exc_type, exc_value, traceback):
25         pass
26
27     def _my_keep(self):
28         if self._keep_client is None:
29             self._keep_client = KeepClient(api_client=self._api_client,
30                                            num_retries=self.num_retries)
31         return self._keep_client
32
33     def stripped_manifest(self):
34         """
35         Return the manifest for the current collection with all
36         non-portable hints (i.e., permission signatures and other
37         hints other than size hints) removed from the locators.
38         """
39         raw = self.manifest_text()
40         clean = []
41         for line in raw.split("\n"):
42             fields = line.split()
43             if fields:
44                 clean_fields = fields[:1] + [
45                     (re.sub(r'\+[^\d][^\+]*', '', x)
46                      if re.match(util.keep_locator_pattern, x)
47                      else x)
48                     for x in fields[1:]]
49                 clean += [' '.join(clean_fields), "\n"]
50         return ''.join(clean)
51
52
53 class CollectionReader(CollectionBase):
54     def __init__(self, manifest_locator_or_text, api_client=None,
55                  keep_client=None, num_retries=0):
56         """Instantiate a CollectionReader.
57
58         This class parses Collection manifests to provide a simple interface
59         to read its underlying files.
60
61         Arguments:
62         * manifest_locator_or_text: One of a Collection UUID, portable data
63           hash, or full manifest text.
64         * api_client: The API client to use to look up Collections.  If not
65           provided, CollectionReader will build one from available Arvados
66           configuration.
67         * keep_client: The KeepClient to use to download Collection data.
68           If not provided, CollectionReader will build one from available
69           Arvados configuration.
70         * num_retries: The default number of times to retry failed
71           service requests.  Default 0.  You may change this value
72           after instantiation, but note those changes may not
73           propagate to related objects like the Keep client.
74         """
75         self._api_client = api_client
76         self._keep_client = keep_client
77         self.num_retries = num_retries
78         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
79             self._manifest_locator = manifest_locator_or_text
80             self._manifest_text = None
81         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
82             self._manifest_locator = manifest_locator_or_text
83             self._manifest_text = None
84         elif re.match(util.manifest_pattern, manifest_locator_or_text):
85             self._manifest_text = manifest_locator_or_text
86             self._manifest_locator = None
87         else:
88             raise errors.ArgumentError(
89                 "Argument to CollectionReader must be a manifest or a collection UUID")
90         self._api_response = None
91         self._streams = None
92
93     def _populate_from_api_server(self):
94         # As in KeepClient itself, we must wait until the last
95         # possible moment to instantiate an API client, in order to
96         # avoid tripping up clients that don't have access to an API
97         # server.  If we do build one, make sure our Keep client uses
98         # it.  If instantiation fails, we'll fall back to the except
99         # clause, just like any other Collection lookup
100         # failure. Return an exception, or None if successful.
101         try:
102             if self._api_client is None:
103                 self._api_client = arvados.api('v1')
104                 self._keep_client = None  # Make a new one with the new api.
105             self._api_response = self._api_client.collections().get(
106                 uuid=self._manifest_locator).execute(
107                 num_retries=self.num_retries)
108             self._manifest_text = self._api_response['manifest_text']
109             return None
110         except Exception as e:
111             return e
112
113     def _populate_from_keep(self):
114         # Retrieve a manifest directly from Keep. This has a chance of
115         # working if [a] the locator includes a permission signature
116         # or [b] the Keep services are operating in world-readable
117         # mode. Return an exception, or None if successful.
118         try:
119             self._manifest_text = self._my_keep().get(
120                 self._manifest_locator, num_retries=self.num_retries)
121         except Exception as e:
122             return e
123
124     def _populate(self):
125         error_via_api = None
126         error_via_keep = None
127         should_try_keep = ((self._manifest_text is None) and
128                            util.keep_locator_pattern.match(
129                 self._manifest_locator))
130         if ((self._manifest_text is None) and
131             util.signed_locator_pattern.match(self._manifest_locator)):
132             error_via_keep = self._populate_from_keep()
133         if self._manifest_text is None:
134             error_via_api = self._populate_from_api_server()
135             if error_via_api is not None and not should_try_keep:
136                 raise error_via_api
137         if ((self._manifest_text is None) and
138             not error_via_keep and
139             should_try_keep):
140             # Looks like a keep locator, and we didn't already try keep above
141             error_via_keep = self._populate_from_keep()
142         if self._manifest_text is None:
143             # Nothing worked!
144             raise arvados.errors.NotFoundError(
145                 ("Failed to retrieve collection '{}' " +
146                  "from either API server ({}) or Keep ({})."
147                  ).format(
148                     self._manifest_locator,
149                     error_via_api,
150                     error_via_keep))
151         self._streams = [sline.split()
152                          for sline in self._manifest_text.split("\n")
153                          if sline]
154
155     @staticmethod
156     def _populate_first(orig_func):
157         # Decorator for methods that read actual Collection data.
158         @functools.wraps(orig_func)
159         def wrapper(self, *args, **kwargs):
160             if self._streams is None:
161                 self._populate()
162             return orig_func(self, *args, **kwargs)
163         return wrapper
164
165     @_populate_first
166     def api_response(self):
167         """api_response() -> dict or None
168
169         Returns information about this Collection fetched from the API server.
170         If the Collection exists in Keep but not the API server, currently
171         returns None.  Future versions may provide a synthetic response.
172         """
173         return self._api_response
174
175     @_populate_first
176     def normalize(self):
177         # Rearrange streams
178         streams = {}
179         for s in self.all_streams():
180             for f in s.all_files():
181                 streamname, filename = split(s.name() + "/" + f.name())
182                 if streamname not in streams:
183                     streams[streamname] = {}
184                 if filename not in streams[streamname]:
185                     streams[streamname][filename] = []
186                 for r in f.segments:
187                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
188
189         self._streams = [normalize_stream(s, streams[s])
190                          for s in sorted(streams)]
191
192         # Regenerate the manifest text based on the normalized streams
193         self._manifest_text = ''.join(
194             [StreamReader(stream, keep=self._my_keep()).manifest_text()
195              for stream in self._streams])
196
197     @_populate_first
198     def open(self, streampath, filename=None):
199         """open(streampath[, filename]) -> file-like object
200
201         Pass in the path of a file to read from the Collection, either as a
202         single string or as two separate stream name and file name arguments.
203         This method returns a file-like object to read that file.
204         """
205         if filename is None:
206             streampath, filename = split(streampath)
207         keep_client = self._my_keep()
208         for stream_s in self._streams:
209             stream = StreamReader(stream_s, keep_client,
210                                   num_retries=self.num_retries)
211             if stream.name() == streampath:
212                 break
213         else:
214             raise ValueError("stream '{}' not found in Collection".
215                              format(streampath))
216         try:
217             return stream.files()[filename]
218         except KeyError:
219             raise ValueError("file '{}' not found in Collection stream '{}'".
220                              format(filename, streampath))
221
222     @_populate_first
223     def all_streams(self):
224         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
225                 for s in self._streams]
226
227     def all_files(self):
228         for s in self.all_streams():
229             for f in s.all_files():
230                 yield f
231
232     @_populate_first
233     def manifest_text(self, strip=False, normalize=False):
234         if normalize:
235             cr = CollectionReader(self.manifest_text())
236             cr.normalize()
237             return cr.manifest_text(strip=strip, normalize=False)
238         elif strip:
239             return self.stripped_manifest()
240         else:
241             return self._manifest_text
242
243
244 class _WriterFile(ArvadosFileBase):
245     def __init__(self, coll_writer, name):
246         super(_WriterFile, self).__init__(name, 'wb')
247         self.dest = coll_writer
248
249     def close(self):
250         super(_WriterFile, self).close()
251         self.dest.finish_current_file()
252
253     @ArvadosFileBase._before_close
254     def write(self, data):
255         self.dest.write(data)
256
257     @ArvadosFileBase._before_close
258     def writelines(self, seq):
259         for data in seq:
260             self.write(data)
261
262     @ArvadosFileBase._before_close
263     def flush(self):
264         self.dest.flush_data()
265
266
267 class CollectionWriter(CollectionBase):
268     def __init__(self, api_client=None, num_retries=0):
269         """Instantiate a CollectionWriter.
270
271         CollectionWriter lets you build a new Arvados Collection from scratch.
272         Write files to it.  The CollectionWriter will upload data to Keep as
273         appropriate, and provide you with the Collection manifest text when
274         you're finished.
275
276         Arguments:
277         * api_client: The API client to use to look up Collections.  If not
278           provided, CollectionReader will build one from available Arvados
279           configuration.
280         * num_retries: The default number of times to retry failed
281           service requests.  Default 0.  You may change this value
282           after instantiation, but note those changes may not
283           propagate to related objects like the Keep client.
284         """
285         self._api_client = api_client
286         self.num_retries = num_retries
287         self._keep_client = None
288         self._data_buffer = []
289         self._data_buffer_len = 0
290         self._current_stream_files = []
291         self._current_stream_length = 0
292         self._current_stream_locators = []
293         self._current_stream_name = '.'
294         self._current_file_name = None
295         self._current_file_pos = 0
296         self._finished_streams = []
297         self._close_file = None
298         self._queued_file = None
299         self._queued_dirents = deque()
300         self._queued_trees = deque()
301         self._last_open = None
302
303     def __exit__(self, exc_type, exc_value, traceback):
304         if exc_type is None:
305             self.finish()
306
307     def do_queued_work(self):
308         # The work queue consists of three pieces:
309         # * _queued_file: The file object we're currently writing to the
310         #   Collection.
311         # * _queued_dirents: Entries under the current directory
312         #   (_queued_trees[0]) that we want to write or recurse through.
313         #   This may contain files from subdirectories if
314         #   max_manifest_depth == 0 for this directory.
315         # * _queued_trees: Directories that should be written as separate
316         #   streams to the Collection.
317         # This function handles the smallest piece of work currently queued
318         # (current file, then current directory, then next directory) until
319         # no work remains.  The _work_THING methods each do a unit of work on
320         # THING.  _queue_THING methods add a THING to the work queue.
321         while True:
322             if self._queued_file:
323                 self._work_file()
324             elif self._queued_dirents:
325                 self._work_dirents()
326             elif self._queued_trees:
327                 self._work_trees()
328             else:
329                 break
330
331     def _work_file(self):
332         while True:
333             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
334             if not buf:
335                 break
336             self.write(buf)
337         self.finish_current_file()
338         if self._close_file:
339             self._queued_file.close()
340         self._close_file = None
341         self._queued_file = None
342
343     def _work_dirents(self):
344         path, stream_name, max_manifest_depth = self._queued_trees[0]
345         if stream_name != self.current_stream_name():
346             self.start_new_stream(stream_name)
347         while self._queued_dirents:
348             dirent = self._queued_dirents.popleft()
349             target = os.path.join(path, dirent)
350             if os.path.isdir(target):
351                 self._queue_tree(target,
352                                  os.path.join(stream_name, dirent),
353                                  max_manifest_depth - 1)
354             else:
355                 self._queue_file(target, dirent)
356                 break
357         if not self._queued_dirents:
358             self._queued_trees.popleft()
359
360     def _work_trees(self):
361         path, stream_name, max_manifest_depth = self._queued_trees[0]
362         d = util.listdir_recursive(
363             path, max_depth = (None if max_manifest_depth == 0 else 0))
364         if d:
365             self._queue_dirents(stream_name, d)
366         else:
367             self._queued_trees.popleft()
368
369     def _queue_file(self, source, filename=None):
370         assert (self._queued_file is None), "tried to queue more than one file"
371         if not hasattr(source, 'read'):
372             source = open(source, 'rb')
373             self._close_file = True
374         else:
375             self._close_file = False
376         if filename is None:
377             filename = os.path.basename(source.name)
378         self.start_new_file(filename)
379         self._queued_file = source
380
381     def _queue_dirents(self, stream_name, dirents):
382         assert (not self._queued_dirents), "tried to queue more than one tree"
383         self._queued_dirents = deque(sorted(dirents))
384
385     def _queue_tree(self, path, stream_name, max_manifest_depth):
386         self._queued_trees.append((path, stream_name, max_manifest_depth))
387
388     def write_file(self, source, filename=None):
389         self._queue_file(source, filename)
390         self.do_queued_work()
391
392     def write_directory_tree(self,
393                              path, stream_name='.', max_manifest_depth=-1):
394         self._queue_tree(path, stream_name, max_manifest_depth)
395         self.do_queued_work()
396
397     def write(self, newdata):
398         if hasattr(newdata, '__iter__'):
399             for s in newdata:
400                 self.write(s)
401             return
402         self._data_buffer.append(newdata)
403         self._data_buffer_len += len(newdata)
404         self._current_stream_length += len(newdata)
405         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
406             self.flush_data()
407
408     def open(self, streampath, filename=None):
409         """open(streampath[, filename]) -> file-like object
410
411         Pass in the path of a file to write to the Collection, either as a
412         single string or as two separate stream name and file name arguments.
413         This method returns a file-like object you can write to add it to the
414         Collection.
415
416         You may only have one file object from the Collection open at a time,
417         so be sure to close the object when you're done.  Using the object in
418         a with statement makes that easy::
419
420           with cwriter.open('./doc/page1.txt') as outfile:
421               outfile.write(page1_data)
422           with cwriter.open('./doc/page2.txt') as outfile:
423               outfile.write(page2_data)
424         """
425         if filename is None:
426             streampath, filename = split(streampath)
427         if self._last_open and not self._last_open.closed:
428             raise errors.AssertionError(
429                 "can't open '{}' when '{}' is still open".format(
430                     filename, self._last_open.name))
431         if streampath != self.current_stream_name():
432             self.start_new_stream(streampath)
433         self.set_current_file_name(filename)
434         self._last_open = _WriterFile(self, filename)
435         return self._last_open
436
437     def flush_data(self):
438         data_buffer = ''.join(self._data_buffer)
439         if data_buffer:
440             self._current_stream_locators.append(
441                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
442             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
443             self._data_buffer_len = len(self._data_buffer[0])
444
445     def start_new_file(self, newfilename=None):
446         self.finish_current_file()
447         self.set_current_file_name(newfilename)
448
449     def set_current_file_name(self, newfilename):
450         if re.search(r'[\t\n]', newfilename):
451             raise errors.AssertionError(
452                 "Manifest filenames cannot contain whitespace: %s" %
453                 newfilename)
454         elif re.search(r'\x00', newfilename):
455             raise errors.AssertionError(
456                 "Manifest filenames cannot contain NUL characters: %s" %
457                 newfilename)
458         self._current_file_name = newfilename
459
460     def current_file_name(self):
461         return self._current_file_name
462
463     def finish_current_file(self):
464         if self._current_file_name is None:
465             if self._current_file_pos == self._current_stream_length:
466                 return
467             raise errors.AssertionError(
468                 "Cannot finish an unnamed file " +
469                 "(%d bytes at offset %d in '%s' stream)" %
470                 (self._current_stream_length - self._current_file_pos,
471                  self._current_file_pos,
472                  self._current_stream_name))
473         self._current_stream_files.append([
474                 self._current_file_pos,
475                 self._current_stream_length - self._current_file_pos,
476                 self._current_file_name])
477         self._current_file_pos = self._current_stream_length
478         self._current_file_name = None
479
480     def start_new_stream(self, newstreamname='.'):
481         self.finish_current_stream()
482         self.set_current_stream_name(newstreamname)
483
484     def set_current_stream_name(self, newstreamname):
485         if re.search(r'[\t\n]', newstreamname):
486             raise errors.AssertionError(
487                 "Manifest stream names cannot contain whitespace")
488         self._current_stream_name = '.' if newstreamname=='' else newstreamname
489
490     def current_stream_name(self):
491         return self._current_stream_name
492
493     def finish_current_stream(self):
494         self.finish_current_file()
495         self.flush_data()
496         if not self._current_stream_files:
497             pass
498         elif self._current_stream_name is None:
499             raise errors.AssertionError(
500                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
501                 (self._current_stream_length, len(self._current_stream_files)))
502         else:
503             if not self._current_stream_locators:
504                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
505             self._finished_streams.append([self._current_stream_name,
506                                            self._current_stream_locators,
507                                            self._current_stream_files])
508         self._current_stream_files = []
509         self._current_stream_length = 0
510         self._current_stream_locators = []
511         self._current_stream_name = None
512         self._current_file_pos = 0
513         self._current_file_name = None
514
515     def finish(self):
516         # Store the manifest in Keep and return its locator.
517         return self._my_keep().put(self.manifest_text())
518
519     def portable_data_hash(self):
520         stripped = self.stripped_manifest()
521         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
522
523     def manifest_text(self):
524         self.finish_current_stream()
525         manifest = ''
526
527         for stream in self._finished_streams:
528             if not re.search(r'^\.(/.*)?$', stream[0]):
529                 manifest += './'
530             manifest += stream[0].replace(' ', '\\040')
531             manifest += ' ' + ' '.join(stream[1])
532             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
533             manifest += "\n"
534
535         return manifest
536
537     def data_locators(self):
538         ret = []
539         for name, locators, files in self._finished_streams:
540             ret += locators
541         return ret
542
543
544 class ResumableCollectionWriter(CollectionWriter):
545     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
546                    '_current_stream_locators', '_current_stream_name',
547                    '_current_file_name', '_current_file_pos', '_close_file',
548                    '_data_buffer', '_dependencies', '_finished_streams',
549                    '_queued_dirents', '_queued_trees']
550
551     def __init__(self, api_client=None, num_retries=0):
552         self._dependencies = {}
553         super(ResumableCollectionWriter, self).__init__(
554             api_client, num_retries=num_retries)
555
556     @classmethod
557     def from_state(cls, state, *init_args, **init_kwargs):
558         # Try to build a new writer from scratch with the given state.
559         # If the state is not suitable to resume (because files have changed,
560         # been deleted, aren't predictable, etc.), raise a
561         # StaleWriterStateError.  Otherwise, return the initialized writer.
562         # The caller is responsible for calling writer.do_queued_work()
563         # appropriately after it's returned.
564         writer = cls(*init_args, **init_kwargs)
565         for attr_name in cls.STATE_PROPS:
566             attr_value = state[attr_name]
567             attr_class = getattr(writer, attr_name).__class__
568             # Coerce the value into the same type as the initial value, if
569             # needed.
570             if attr_class not in (type(None), attr_value.__class__):
571                 attr_value = attr_class(attr_value)
572             setattr(writer, attr_name, attr_value)
573         # Check dependencies before we try to resume anything.
574         if any(KeepLocator(ls).permission_expired()
575                for ls in writer._current_stream_locators):
576             raise errors.StaleWriterStateError(
577                 "locators include expired permission hint")
578         writer.check_dependencies()
579         if state['_current_file'] is not None:
580             path, pos = state['_current_file']
581             try:
582                 writer._queued_file = open(path, 'rb')
583                 writer._queued_file.seek(pos)
584             except IOError as error:
585                 raise errors.StaleWriterStateError(
586                     "failed to reopen active file {}: {}".format(path, error))
587         return writer
588
589     def check_dependencies(self):
590         for path, orig_stat in self._dependencies.items():
591             if not S_ISREG(orig_stat[ST_MODE]):
592                 raise errors.StaleWriterStateError("{} not file".format(path))
593             try:
594                 now_stat = tuple(os.stat(path))
595             except OSError as error:
596                 raise errors.StaleWriterStateError(
597                     "failed to stat {}: {}".format(path, error))
598             if ((not S_ISREG(now_stat[ST_MODE])) or
599                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
600                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
601                 raise errors.StaleWriterStateError("{} changed".format(path))
602
603     def dump_state(self, copy_func=lambda x: x):
604         state = {attr: copy_func(getattr(self, attr))
605                  for attr in self.STATE_PROPS}
606         if self._queued_file is None:
607             state['_current_file'] = None
608         else:
609             state['_current_file'] = (os.path.realpath(self._queued_file.name),
610                                       self._queued_file.tell())
611         return state
612
613     def _queue_file(self, source, filename=None):
614         try:
615             src_path = os.path.realpath(source)
616         except Exception:
617             raise errors.AssertionError("{} not a file path".format(source))
618         try:
619             path_stat = os.stat(src_path)
620         except OSError as stat_error:
621             path_stat = None
622         super(ResumableCollectionWriter, self)._queue_file(source, filename)
623         fd_stat = os.fstat(self._queued_file.fileno())
624         if not S_ISREG(fd_stat.st_mode):
625             # We won't be able to resume from this cache anyway, so don't
626             # worry about further checks.
627             self._dependencies[source] = tuple(fd_stat)
628         elif path_stat is None:
629             raise errors.AssertionError(
630                 "could not stat {}: {}".format(source, stat_error))
631         elif path_stat.st_ino != fd_stat.st_ino:
632             raise errors.AssertionError(
633                 "{} changed between open and stat calls".format(source))
634         else:
635             self._dependencies[src_path] = tuple(fd_stat)
636
637     def write(self, data):
638         if self._queued_file is None:
639             raise errors.AssertionError(
640                 "resumable writer can't accept unsourced data")
641         return super(ResumableCollectionWriter, self).write(data)
642
643
644 class SynchronizedCollectionBase(CollectionBase):
645     SYNC_READONLY = 1
646     SYNC_EXPLICIT = 2
647     SYNC_LIVE = 3
648
649     def __init__(self, parent=None):
650         self.parent = parent
651         self._items = None
652
653     def _my_api(self):
654         raise NotImplementedError()
655
656     def _my_keep(self):
657         raise NotImplementedError()
658
659     def _my_block_manager(self):
660         raise NotImplementedError()
661
662     def _root_lock(self):
663         raise NotImplementedError()
664
665     def _populate(self):
666         raise NotImplementedError()
667
668     def _sync_mode(self):
669         raise NotImplementedError()
670
671     @staticmethod
672     def _populate_first(orig_func):
673         # Decorator for methods that read actual Collection data.
674         @functools.wraps(orig_func)
675         def wrapper(self, *args, **kwargs):
676             if self._items is None:
677                 self._populate()
678             return orig_func(self, *args, **kwargs)
679         return wrapper
680
681     @arvfile._synchronized
682     @_populate_first
683     def find(self, path, create=False, create_collection=False):
684         """Recursively search the specified file path.  May return either a Collection
685         or ArvadosFile.
686
687         :create:
688           If true, create path components (i.e. Collections) that are
689           missing.  If "create" is False, return None if a path component is
690           not found.
691
692         :create_collection:
693           If the path is not found, "create" is True, and
694           "create_collection" is False, then create and return a new
695           ArvadosFile for the last path component.  If "create_collection" is
696           True, then create and return a new Collection for the last path
697           component.
698
699         """
700         if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
701             raise IOError((errno.EROFS, "Collection is read only"))
702
703         p = path.split("/")
704         if p[0] == '.':
705             del p[0]
706
707         if len(p) > 0:
708             item = self._items.get(p[0])
709             if len(p) == 1:
710                 # item must be a file
711                 if item is None and create:
712                     # create new file
713                     if create_collection:
714                         item = Subcollection(self)
715                     else:
716                         item = ArvadosFile(self)
717                     self._items[p[0]] = item
718                 return item
719             else:
720                 if item is None and create:
721                     # create new collection
722                     item = Subcollection(self)
723                     self._items[p[0]] = item
724                 del p[0]
725                 return item.find("/".join(p), create=create)
726         else:
727             return self
728
729     def open(self, path, mode):
730         """Open a file-like object for access.
731
732         :path:
733           path to a file in the collection
734         :mode:
735           one of "r", "r+", "w", "w+", "a", "a+"
736           :"r":
737             opens for reading
738           :"r+":
739             opens for reading and writing.  Reads/writes share a file pointer.
740           :"w", "w+":
741             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
742           :"a", "a+":
743             opens for reading and writing.  All writes are appended to
744             the end of the file.  Writing does not affect the file pointer for
745             reading.
746         """
747         mode = mode.replace("b", "")
748         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
749             raise ArgumentError("Bad mode '%s'" % mode)
750         create = (mode != "r")
751
752         if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
753             raise IOError((errno.EROFS, "Collection is read only"))
754
755         f = self.find(path, create=create)
756         if f is None:
757             raise IOError((errno.ENOENT, "File not found"))
758         if not isinstance(f, ArvadosFile):
759             raise IOError((errno.EISDIR, "Path must refer to a file."))
760
761         if mode[0] == "w":
762             f.truncate(0)
763
764         if mode == "r":
765             return ArvadosFileReader(f, path, mode)
766         else:
767             return ArvadosFileWriter(f, path, mode)
768
769     @arvfile._synchronized
770     @_populate_first
771     def modified(self):
772         """Test if the collection (or any subcollection or file) has been modified
773         since it was created."""
774         for k,v in self._items.items():
775             if v.modified():
776                 return True
777         return False
778
779     @arvfile._synchronized
780     @_populate_first
781     def set_unmodified(self):
782         """Recursively clear modified flag"""
783         for k,v in self._items.items():
784             v.set_unmodified()
785
786     @arvfile._synchronized
787     @_populate_first
788     def __iter__(self):
789         """Iterate over names of files and collections contained in this collection."""
790         return self._items.keys()
791
792     @arvfile._synchronized
793     @_populate_first
794     def iterkeys(self):
795         """Iterate over names of files and collections directly contained in this collection."""
796         return self._items.keys()
797
798     @arvfile._synchronized
799     @_populate_first
800     def __getitem__(self, k):
801         """Get a file or collection that is directly contained by this collection.  If
802         you want to search a path, use `find()` instead.
803         """
804         return self._items[k]
805
806     @arvfile._synchronized
807     @_populate_first
808     def __contains__(self, k):
809         """If there is a file or collection a directly contained by this collection
810         with name "k"."""
811         return k in self._items
812
813     @arvfile._synchronized
814     @_populate_first
815     def __len__(self):
816         """Get the number of items directly contained in this collection"""
817         return len(self._items)
818
819     @_must_be_writable
820     @arvfile._synchronized
821     @_populate_first
822     def __delitem__(self, p):
823         """Delete an item by name which is directly contained by this collection."""
824         del self._items[p]
825
826     @arvfile._synchronized
827     @_populate_first
828     def keys(self):
829         """Get a list of names of files and collections directly contained in this collection."""
830         return self._items.keys()
831
832     @arvfile._synchronized
833     @_populate_first
834     def values(self):
835         """Get a list of files and collection objects directly contained in this collection."""
836         return self._items.values()
837
838     @arvfile._synchronized
839     @_populate_first
840     def items(self):
841         """Get a list of (name, object) tuples directly contained in this collection."""
842         return self._items.items()
843
844     def exists(self, path):
845         """Test if there is a file or collection at "path" """
846         return self.find(path) != None
847
848     @_must_be_writable
849     @arvfile._synchronized
850     @_populate_first
851     def remove(self, path, rm_r=False):
852         """Remove the file or subcollection (directory) at `path`.
853         :rm_r:
854           Specify whether to remove non-empty subcollections (True), or raise an error (False).
855         """
856         p = path.split("/")
857         if p[0] == '.':
858             # Remove '.' from the front of the path
859             del p[0]
860
861         if len(p) > 0:
862             item = self._items.get(p[0])
863             if item is None:
864                 raise IOError((errno.ENOENT, "File not found"))
865             if len(p) == 1:
866                 if isinstance(SynchronizedCollection, self._items[p[0]]) and len(self._items[p[0]]) > 0 and not rm_r:
867                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
868                 del self._items[p[0]]
869             else:
870                 del p[0]
871                 item.remove("/".join(p))
872         else:
873             raise IOError((errno.ENOENT, "File not found"))
874
875     def _cloneinto(self, target):
876         for k,v in self._items:
877             target._items[k] = v.clone(new_parent=target)
878
879     def clone(self):
880         raise NotImplementedError()
881
882     @_must_be_writable
883     @arvfile._synchronized
884     @_populate_first
885     def copyto(self, target_path, source_path, source_collection=None, overwrite=False):
886         """
887         copyto('/foo', '/bar') will overwrite 'foo' if it exists.
888         copyto('/foo/', '/bar') will place 'bar' in subcollection 'foo'
889         """
890         if source_collection is None:
891             source_collection = self
892
893         # Find the object to copy
894         sp = source_path.split("/")
895         source_obj = source_collection.find(source_path)
896         if source_obj is None:
897             raise IOError((errno.ENOENT, "File not found"))
898
899         # Find parent collection the target path
900         tp = target_path.split("/")
901         target_dir = self.find(tp[0:-1].join("/"), create=True, create_collection=True)
902
903         # Determine the name to use.
904         target_name = tp[-1] if tp[-1] else sp[-1]
905
906         if target_name in target_dir and not overwrite:
907             raise IOError((errno.EEXIST, "File already exists"))
908
909         # Actually make the copy.
910         target_dir[target_name]._items = source_obj.clone(target_dir)
911
912     @arvfile._synchronized
913     @_populate_first
914     def manifest_text(self, strip=False, normalize=False):
915         """Get the manifest text for this collection, sub collections and files.
916
917         :strip:
918           If True, remove signing tokens from block locators if present.
919           If False, block locators are left unchanged.
920
921         :normalize:
922           If True, always export the manifest text in normalized form
923           even if the Collection is not modified.  If False and the collection
924           is not modified, return the original manifest text even if it is not
925           in normalized form.
926
927         """
928         if self.modified() or self._manifest_text is None or normalize:
929             return export_manifest(self, stream_name=".", portable_locators=strip)
930         else:
931             if strip:
932                 return self.stripped_manifest()
933             else:
934                 return self._manifest_text
935
936     def portable_data_hash(self):
937         """Get the portable data hash for this collection's manifest."""
938         stripped = self.manifest_text(strip=True)
939         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
940
941
942 class Collection(SynchronizedCollectionBase):
943     """Store an Arvados collection, consisting of a set of files and
944     sub-collections.
945     """
946
947     def __init__(self, manifest_locator_or_text=None,
948                  parent=None,
949                  config=None,
950                  api_client=None,
951                  keep_client=None,
952                  num_retries=0,
953                  block_manager=None,
954                  sync=Collection.SYNC_READONLY):
955         """:manifest_locator_or_text:
956           One of Arvados collection UUID, block locator of
957           a manifest, raw manifest text, or None (to create an empty collection).
958         :parent:
959           the parent Collection, may be None.
960         :config:
961           the arvados configuration to get the hostname and api token.
962           Prefer this over supplying your own api_client and keep_client (except in testing).
963           Will use default config settings if not specified.
964         :api_client:
965           The API client object to use for requests.  If not specified, create one using `config`.
966         :keep_client:
967           the Keep client to use for requests.  If not specified, create one using `config`.
968         :num_retries:
969           the number of retries for API and Keep requests.
970         :block_manager:
971           the block manager to use.  If not specified, create one.
972         :sync:
973           Set synchronization policy with API server collection record.
974           :SYNC_READONLY:
975             Collection is read only.  No synchronization.  This mode will
976             also forego locking, which gives better performance.
977           :SYNC_EXPLICIT:
978             Synchronize on explicit request via `merge()` or `save()`
979           :SYNC_LIVE:
980             Synchronize with server in response to background websocket events,
981             on block write, or on file close.
982
983         """
984
985         self.parent = parent
986         self._items = None
987         self._api_client = api_client
988         self._keep_client = keep_client
989         self._block_manager = block_manager
990         self._config = config
991         self.num_retries = num_retries
992         self._manifest_locator = None
993         self._manifest_text = None
994         self._api_response = None
995         self._sync = sync
996         self.lock = threading.RLock()
997
998         if manifest_locator_or_text:
999             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1000                 self._manifest_locator = manifest_locator_or_text
1001             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1002                 self._manifest_locator = manifest_locator_or_text
1003             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1004                 self._manifest_text = manifest_locator_or_text
1005             else:
1006                 raise errors.ArgumentError(
1007                     "Argument to CollectionReader must be a manifest or a collection UUID")
1008
1009     def _root_lock(self):
1010         return self.lock
1011
1012     def sync_mode(self):
1013         return self._sync
1014
1015     @arvfile._synchronized
1016     def _my_api(self):
1017         if self._api_client is None:
1018             self._api_client = arvados.api.SafeApi(self._config)
1019             self._keep_client = self._api_client.keep
1020         return self._api_client
1021
1022     @arvfile._synchronized
1023     def _my_keep(self):
1024         if self._keep_client is None:
1025             if self._api_client is None:
1026                 self._my_api()
1027             else:
1028                 self._keep_client = KeepClient(api=self._api_client)
1029         return self._keep_client
1030
1031     @arvfile._synchronized
1032     def _my_block_manager(self):
1033         if self._block_manager is None:
1034             self._block_manager = BlockManager(self._my_keep())
1035         return self._block_manager
1036
1037     def _populate_from_api_server(self):
1038         # As in KeepClient itself, we must wait until the last
1039         # possible moment to instantiate an API client, in order to
1040         # avoid tripping up clients that don't have access to an API
1041         # server.  If we do build one, make sure our Keep client uses
1042         # it.  If instantiation fails, we'll fall back to the except
1043         # clause, just like any other Collection lookup
1044         # failure. Return an exception, or None if successful.
1045         try:
1046             self._api_response = self._my_api().collections().get(
1047                 uuid=self._manifest_locator).execute(
1048                     num_retries=self.num_retries)
1049             self._manifest_text = self._api_response['manifest_text']
1050             return None
1051         except Exception as e:
1052             return e
1053
1054     def _populate_from_keep(self):
1055         # Retrieve a manifest directly from Keep. This has a chance of
1056         # working if [a] the locator includes a permission signature
1057         # or [b] the Keep services are operating in world-readable
1058         # mode. Return an exception, or None if successful.
1059         try:
1060             self._manifest_text = self._my_keep().get(
1061                 self._manifest_locator, num_retries=self.num_retries)
1062         except Exception as e:
1063             return e
1064
1065     def _populate(self):
1066         self._items = {}
1067         if self._manifest_locator is None and self._manifest_text is None:
1068             return
1069         error_via_api = None
1070         error_via_keep = None
1071         should_try_keep = ((self._manifest_text is None) and
1072                            util.keep_locator_pattern.match(
1073                 self._manifest_locator))
1074         if ((self._manifest_text is None) and
1075             util.signed_locator_pattern.match(self._manifest_locator)):
1076             error_via_keep = self._populate_from_keep()
1077         if self._manifest_text is None:
1078             error_via_api = self._populate_from_api_server()
1079             if error_via_api is not None and not should_try_keep:
1080                 raise error_via_api
1081         if ((self._manifest_text is None) and
1082             not error_via_keep and
1083             should_try_keep):
1084             # Looks like a keep locator, and we didn't already try keep above
1085             error_via_keep = self._populate_from_keep()
1086         if self._manifest_text is None:
1087             # Nothing worked!
1088             raise arvados.errors.NotFoundError(
1089                 ("Failed to retrieve collection '{}' " +
1090                  "from either API server ({}) or Keep ({})."
1091                  ).format(
1092                     self._manifest_locator,
1093                     error_via_api,
1094                     error_via_keep))
1095         # populate
1096         import_manifest(self._manifest_text, self)
1097
1098         if self._sync == SYNC_READONLY:
1099             # Now that we're populated, knowing that this will be readonly,
1100             # forego any further locking.
1101             self.lock = NoopLock()
1102
1103     def __enter__(self):
1104         return self
1105
1106     def __exit__(self, exc_type, exc_value, traceback):
1107         """Support scoped auto-commit in a with: block"""
1108         self.save(allow_no_locator=True)
1109         if self._block_manager is not None:
1110             self._block_manager.stop_threads()
1111
1112     @arvfile._synchronized
1113     @_populate_first
1114     def clone(self, new_parent=None, new_sync=Collection.SYNC_READONLY, new_config=self.config):
1115         c = Collection(parent=new_parent, config=new_config, sync=new_sync)
1116         if new_sync == Collection.SYNC_READONLY:
1117             c.lock = NoopLock()
1118         c._items = {}
1119         self._cloneinto(c)
1120         return c
1121
1122     @arvfile._synchronized
1123     @_populate_first
1124     def api_response(self):
1125         """
1126         api_response() -> dict or None
1127
1128         Returns information about this Collection fetched from the API server.
1129         If the Collection exists in Keep but not the API server, currently
1130         returns None.  Future versions may provide a synthetic response.
1131         """
1132         return self._api_response
1133
1134     @_must_be_writable
1135     @arvfile._synchronized
1136     @_populate_first
1137     def save(self, allow_no_locator=False):
1138         """Commit pending buffer blocks to Keep, write the manifest to Keep, and
1139         update the collection record to Keep.
1140
1141         :allow_no_locator:
1142           If there is no collection uuid associated with this
1143           Collection and `allow_no_locator` is False, raise an error.  If True,
1144           do not raise an error.
1145         """
1146         if self.modified():
1147             self._my_block_manager().commit_all()
1148             self._my_keep().put(self.manifest_text(strip=True))
1149             if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
1150                 self._api_response = self._my_api().collections().update(
1151                     uuid=self._manifest_locator,
1152                     body={'manifest_text': self.manifest_text(strip=False)}
1153                     ).execute(
1154                         num_retries=self.num_retries)
1155             elif not allow_no_locator:
1156                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
1157             self.set_unmodified()
1158
1159     @_must_be_writable
1160     @arvfile._synchronized
1161     @_populate_first
1162     def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
1163         """Save a new collection record.
1164
1165         :name:
1166           The collection name.
1167
1168         :owner_uuid:
1169           the user, or project uuid that will own this collection.
1170           If None, defaults to the current user.
1171
1172         :ensure_unique_name:
1173           If True, ask the API server to rename the collection
1174           if it conflicts with a collection with the same name and owner.  If
1175           False, a name conflict will result in an error.
1176
1177         """
1178         self._my_block_manager().commit_all()
1179         self._my_keep().put(self.manifest_text(strip=True))
1180         body = {"manifest_text": self.manifest_text(strip=False),
1181                 "name": name}
1182         if owner_uuid:
1183             body["owner_uuid"] = owner_uuid
1184         self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
1185         self._manifest_locator = self._api_response["uuid"]
1186         self.set_unmodified()
1187
1188
1189 class Subcollection(SynchronizedCollectionBase):
1190     """This is a subdirectory within a collection that doesn't have its own API
1191     server record.  It falls under the umbrella of the root collection."""
1192
1193     def __init__(self, parent):
1194         super(Subcollection, self).__init__(parent)
1195         self.lock = parent._root_lock()
1196
1197     def _root_lock():
1198         return self.parent._root_lock()
1199
1200     def sync_mode(self):
1201         return self.parent.sync_mode()
1202
1203     def _my_api(self):
1204         return self.parent._my_api()
1205
1206     def _my_keep(self):
1207         return self.parent._my_keep()
1208
1209     def _my_block_manager(self):
1210         return self.parent._my_block_manager()
1211
1212     def _populate(self):
1213         self.parent._populate()
1214
1215     @arvfile._synchronized
1216     @_populate_first
1217     def clone(self, new_parent):
1218         c = Subcollection(new_parent)
1219         c._items = {}
1220         self._cloneinto(c)
1221         return c
1222
1223 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
1224     """Import a manifest into a `Collection`.
1225
1226     :manifest_text:
1227       The manifest text to import from.
1228
1229     :into_collection:
1230       The `Collection` that will be initialized (must be empty).
1231       If None, create a new `Collection` object.
1232
1233     :api_client:
1234       The API client object that will be used when creating a new `Collection` object.
1235
1236     :keep:
1237       The keep client object that will be used when creating a new `Collection` object.
1238
1239     num_retries
1240       the default number of api client and keep retries on error.
1241     """
1242     if into_collection is not None:
1243         if len(into_collection) > 0:
1244             raise ArgumentError("Can only import manifest into an empty collection")
1245         c = into_collection
1246     else:
1247         c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
1248
1249     STREAM_NAME = 0
1250     BLOCKS = 1
1251     SEGMENTS = 2
1252
1253     stream_name = None
1254     state = STREAM_NAME
1255
1256     for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1257         tok = n.group(1)
1258         sep = n.group(2)
1259
1260         if state == STREAM_NAME:
1261             # starting a new stream
1262             stream_name = tok.replace('\\040', ' ')
1263             blocks = []
1264             segments = []
1265             streamoffset = 0L
1266             state = BLOCKS
1267             continue
1268
1269         if state == BLOCKS:
1270             s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1271             if s:
1272                 blocksize = long(s.group(1))
1273                 blocks.append(Range(tok, streamoffset, blocksize))
1274                 streamoffset += blocksize
1275             else:
1276                 state = SEGMENTS
1277
1278         if state == SEGMENTS:
1279             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1280             if s:
1281                 pos = long(s.group(1))
1282                 size = long(s.group(2))
1283                 name = s.group(3).replace('\\040', ' ')
1284                 f = c.find("%s/%s" % (stream_name, name), create=True)
1285                 f.add_segment(blocks, pos, size)
1286             else:
1287                 # error!
1288                 raise errors.SyntaxError("Invalid manifest format")
1289
1290         if sep == "\n":
1291             stream_name = None
1292             state = STREAM_NAME
1293
1294     c.set_unmodified()
1295     return c
1296
1297 def export_manifest(item, stream_name=".", portable_locators=False):
1298     """
1299     :item:
1300       Create a manifest for `item` (must be a `Collection` or `ArvadosFile`).  If
1301       `item` is a is a `Collection`, this will also export subcollections.
1302
1303     :stream_name:
1304       the name of the stream when exporting `item`.
1305
1306     :portable_locators:
1307       If True, strip any permission hints on block locators.
1308       If False, use block locators as-is.
1309     """
1310     buf = ""
1311     if isinstance(item, SynchronizedCollectionBase):
1312         stream = {}
1313         sorted_keys = sorted(item.keys())
1314         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1315             v = item[k]
1316             st = []
1317             for s in v.segments:
1318                 loc = s.locator
1319                 if loc.startswith("bufferblock"):
1320                     loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1321                 if portable_locators:
1322                     loc = KeepLocator(loc).stripped()
1323                 st.append(LocatorAndRange(loc, locator_block_size(loc),
1324                                      s.segment_offset, s.range_size))
1325             stream[k] = st
1326         if stream:
1327             buf += ' '.join(normalize_stream(stream_name, stream))
1328             buf += "\n"
1329         for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1330             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1331     elif isinstance(item, ArvadosFile):
1332         st = []
1333         for s in item.segments:
1334             loc = s.locator
1335             if loc.startswith("bufferblock"):
1336                 loc = item._bufferblocks[loc].calculate_locator()
1337             if portable_locators:
1338                 loc = KeepLocator(loc).stripped()
1339             st.append(LocatorAndRange(loc, locator_block_size(loc),
1340                                  s.segment_offset, s.range_size))
1341         stream[stream_name] = st
1342         buf += ' '.join(normalize_stream(stream_name, stream))
1343         buf += "\n"
1344     return buf