4363: Merge branch 'master' into 4363-less-filename-munging
[arvados.git] / sdk / python / arvados / collection.py
1 import logging
2 import os
3 import re
4
5 from collections import deque
6 from stat import *
7
8 from .arvfile import ArvadosFileBase
9 from keep import *
10 from .stream import StreamReader, split
11 import config
12 import errors
13 import util
14
15 _logger = logging.getLogger('arvados.collection')
16
17 def normalize_stream(s, stream):
18     stream_tokens = [s]
19     sortedfiles = list(stream.keys())
20     sortedfiles.sort()
21
22     blocks = {}
23     streamoffset = 0L
24     for f in sortedfiles:
25         for b in stream[f]:
26             if b[arvados.LOCATOR] not in blocks:
27                 stream_tokens.append(b[arvados.LOCATOR])
28                 blocks[b[arvados.LOCATOR]] = streamoffset
29                 streamoffset += b[arvados.BLOCKSIZE]
30
31     if len(stream_tokens) == 1:
32         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
33
34     for f in sortedfiles:
35         current_span = None
36         fout = f.replace(' ', '\\040')
37         for segment in stream[f]:
38             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
39             if current_span is None:
40                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
41             else:
42                 if segmentoffset == current_span[1]:
43                     current_span[1] += segment[arvados.SEGMENTSIZE]
44                 else:
45                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
46                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
47
48         if current_span is not None:
49             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
50
51         if not stream[f]:
52             stream_tokens.append("0:0:{0}".format(fout))
53
54     return stream_tokens
55
56
57 class CollectionBase(object):
58     def __enter__(self):
59         return self
60
61     def __exit__(self, exc_type, exc_value, traceback):
62         pass
63
64     def _my_keep(self):
65         if self._keep_client is None:
66             self._keep_client = KeepClient(api_client=self._api_client,
67                                            num_retries=self.num_retries)
68         return self._keep_client
69
70     def stripped_manifest(self):
71         """
72         Return the manifest for the current collection with all
73         non-portable hints (i.e., permission signatures and other
74         hints other than size hints) removed from the locators.
75         """
76         raw = self.manifest_text()
77         clean = []
78         for line in raw.split("\n"):
79             fields = line.split()
80             if fields:
81                 clean_fields = fields[:1] + [
82                     (re.sub(r'\+[^\d][^\+]*', '', x)
83                      if re.match(util.keep_locator_pattern, x)
84                      else x)
85                     for x in fields[1:]]
86                 clean += [' '.join(clean_fields), "\n"]
87         return ''.join(clean)
88
89
90 class CollectionReader(CollectionBase):
91     def __init__(self, manifest_locator_or_text, api_client=None,
92                  keep_client=None, num_retries=0):
93         """Instantiate a CollectionReader.
94
95         This class parses Collection manifests to provide a simple interface
96         to read its underlying files.
97
98         Arguments:
99         * manifest_locator_or_text: One of a Collection UUID, portable data
100           hash, or full manifest text.
101         * api_client: The API client to use to look up Collections.  If not
102           provided, CollectionReader will build one from available Arvados
103           configuration.
104         * keep_client: The KeepClient to use to download Collection data.
105           If not provided, CollectionReader will build one from available
106           Arvados configuration.
107         * num_retries: The default number of times to retry failed
108           service requests.  Default 0.  You may change this value
109           after instantiation, but note those changes may not
110           propagate to related objects like the Keep client.
111         """
112         self._api_client = api_client
113         self._keep_client = keep_client
114         self.num_retries = num_retries
115         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
116             self._manifest_locator = manifest_locator_or_text
117             self._manifest_text = None
118         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
119             self._manifest_locator = manifest_locator_or_text
120             self._manifest_text = None
121         elif re.match(util.manifest_pattern, manifest_locator_or_text):
122             self._manifest_text = manifest_locator_or_text
123             self._manifest_locator = None
124         else:
125             raise errors.ArgumentError(
126                 "Argument to CollectionReader must be a manifest or a collection UUID")
127         self._streams = None
128
129     def _populate_from_api_server(self):
130         # As in KeepClient itself, we must wait until the last
131         # possible moment to instantiate an API client, in order to
132         # avoid tripping up clients that don't have access to an API
133         # server.  If we do build one, make sure our Keep client uses
134         # it.  If instantiation fails, we'll fall back to the except
135         # clause, just like any other Collection lookup
136         # failure. Return an exception, or None if successful.
137         try:
138             if self._api_client is None:
139                 self._api_client = arvados.api('v1')
140                 self._keep_client = None  # Make a new one with the new api.
141             c = self._api_client.collections().get(
142                 uuid=self._manifest_locator).execute(
143                 num_retries=self.num_retries)
144             self._manifest_text = c['manifest_text']
145             return None
146         except Exception as e:
147             return e
148
149     def _populate_from_keep(self):
150         # Retrieve a manifest directly from Keep. This has a chance of
151         # working if [a] the locator includes a permission signature
152         # or [b] the Keep services are operating in world-readable
153         # mode. Return an exception, or None if successful.
154         try:
155             self._manifest_text = self._my_keep().get(
156                 self._manifest_locator, num_retries=self.num_retries)
157         except Exception as e:
158             return e
159
160     def _populate(self):
161         if self._streams is not None:
162             return
163         error_via_api = None
164         error_via_keep = None
165         should_try_keep = ((self._manifest_text is None) and
166                            util.keep_locator_pattern.match(
167                 self._manifest_locator))
168         if ((self._manifest_text is None) and
169             util.signed_locator_pattern.match(self._manifest_locator)):
170             error_via_keep = self._populate_from_keep()
171         if self._manifest_text is None:
172             error_via_api = self._populate_from_api_server()
173             if error_via_api is not None and not should_try_keep:
174                 raise error_via_api
175         if ((self._manifest_text is None) and
176             not error_via_keep and
177             should_try_keep):
178             # Looks like a keep locator, and we didn't already try keep above
179             error_via_keep = self._populate_from_keep()
180         if self._manifest_text is None:
181             # Nothing worked!
182             raise arvados.errors.NotFoundError(
183                 ("Failed to retrieve collection '{}' " +
184                  "from either API server ({}) or Keep ({})."
185                  ).format(
186                     self._manifest_locator,
187                     error_via_api,
188                     error_via_keep))
189         self._streams = [sline.split()
190                          for sline in self._manifest_text.split("\n")
191                          if sline]
192
193     def normalize(self):
194         self._populate()
195
196         # Rearrange streams
197         streams = {}
198         for s in self.all_streams():
199             for f in s.all_files():
200                 streamname, filename = split(s.name() + "/" + f.name())
201                 if streamname not in streams:
202                     streams[streamname] = {}
203                 if filename not in streams[streamname]:
204                     streams[streamname][filename] = []
205                 for r in f.segments:
206                     streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
207
208         self._streams = [normalize_stream(s, streams[s])
209                          for s in sorted(streams)]
210
211         # Regenerate the manifest text based on the normalized streams
212         self._manifest_text = ''.join(
213             [StreamReader(stream, keep=self._my_keep()).manifest_text()
214              for stream in self._streams])
215
216     def open(self, streampath, filename=None):
217         """open(streampath[, filename]) -> file-like object
218
219         Pass in the path of a file to read from the Collection, either as a
220         single string or as two separate stream name and file name arguments.
221         This method returns a file-like object to read that file.
222         """
223         self._populate()
224         if filename is None:
225             streampath, filename = split(streampath)
226         keep_client = self._my_keep()
227         for stream_s in self._streams:
228             stream = StreamReader(stream_s, keep_client,
229                                   num_retries=self.num_retries)
230             if stream.name() == streampath:
231                 break
232         else:
233             raise ValueError("stream '{}' not found in Collection".
234                              format(streampath))
235         try:
236             return stream.files()[filename]
237         except KeyError:
238             raise ValueError("file '{}' not found in Collection stream '{}'".
239                              format(filename, streampath))
240
241     def all_streams(self):
242         self._populate()
243         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
244                 for s in self._streams]
245
246     def all_files(self):
247         for s in self.all_streams():
248             for f in s.all_files():
249                 yield f
250
251     def manifest_text(self, strip=False, normalize=False):
252         if normalize:
253             cr = CollectionReader(self.manifest_text())
254             cr.normalize()
255             return cr.manifest_text(strip=strip, normalize=False)
256         elif strip:
257             return self.stripped_manifest()
258         else:
259             self._populate()
260             return self._manifest_text
261
262
263 class _WriterFile(ArvadosFileBase):
264     def __init__(self, coll_writer, name):
265         super(_WriterFile, self).__init__(name, 'wb')
266         self.dest = coll_writer
267
268     def close(self):
269         super(_WriterFile, self).close()
270         self.dest.finish_current_file()
271
272     @ArvadosFileBase._before_close
273     def write(self, data):
274         self.dest.write(data)
275
276     @ArvadosFileBase._before_close
277     def writelines(self, seq):
278         for data in seq:
279             self.write(data)
280
281     @ArvadosFileBase._before_close
282     def flush(self):
283         self.dest.flush_data()
284
285
286 class CollectionWriter(CollectionBase):
287     KEEP_BLOCK_SIZE = 2**26
288
289     def __init__(self, api_client=None, num_retries=0):
290         """Instantiate a CollectionWriter.
291
292         CollectionWriter lets you build a new Arvados Collection from scratch.
293         Write files to it.  The CollectionWriter will upload data to Keep as
294         appropriate, and provide you with the Collection manifest text when
295         you're finished.
296
297         Arguments:
298         * api_client: The API client to use to look up Collections.  If not
299           provided, CollectionReader will build one from available Arvados
300           configuration.
301         * num_retries: The default number of times to retry failed
302           service requests.  Default 0.  You may change this value
303           after instantiation, but note those changes may not
304           propagate to related objects like the Keep client.
305         """
306         self._api_client = api_client
307         self.num_retries = num_retries
308         self._keep_client = None
309         self._data_buffer = []
310         self._data_buffer_len = 0
311         self._current_stream_files = []
312         self._current_stream_length = 0
313         self._current_stream_locators = []
314         self._current_stream_name = '.'
315         self._current_file_name = None
316         self._current_file_pos = 0
317         self._finished_streams = []
318         self._close_file = None
319         self._queued_file = None
320         self._queued_dirents = deque()
321         self._queued_trees = deque()
322         self._last_open = None
323
324     def __exit__(self, exc_type, exc_value, traceback):
325         if exc_type is None:
326             self.finish()
327
328     def do_queued_work(self):
329         # The work queue consists of three pieces:
330         # * _queued_file: The file object we're currently writing to the
331         #   Collection.
332         # * _queued_dirents: Entries under the current directory
333         #   (_queued_trees[0]) that we want to write or recurse through.
334         #   This may contain files from subdirectories if
335         #   max_manifest_depth == 0 for this directory.
336         # * _queued_trees: Directories that should be written as separate
337         #   streams to the Collection.
338         # This function handles the smallest piece of work currently queued
339         # (current file, then current directory, then next directory) until
340         # no work remains.  The _work_THING methods each do a unit of work on
341         # THING.  _queue_THING methods add a THING to the work queue.
342         while True:
343             if self._queued_file:
344                 self._work_file()
345             elif self._queued_dirents:
346                 self._work_dirents()
347             elif self._queued_trees:
348                 self._work_trees()
349             else:
350                 break
351
352     def _work_file(self):
353         while True:
354             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
355             if not buf:
356                 break
357             self.write(buf)
358         self.finish_current_file()
359         if self._close_file:
360             self._queued_file.close()
361         self._close_file = None
362         self._queued_file = None
363
364     def _work_dirents(self):
365         path, stream_name, max_manifest_depth = self._queued_trees[0]
366         if stream_name != self.current_stream_name():
367             self.start_new_stream(stream_name)
368         while self._queued_dirents:
369             dirent = self._queued_dirents.popleft()
370             target = os.path.join(path, dirent)
371             if os.path.isdir(target):
372                 self._queue_tree(target,
373                                  os.path.join(stream_name, dirent),
374                                  max_manifest_depth - 1)
375             else:
376                 self._queue_file(target, dirent)
377                 break
378         if not self._queued_dirents:
379             self._queued_trees.popleft()
380
381     def _work_trees(self):
382         path, stream_name, max_manifest_depth = self._queued_trees[0]
383         d = util.listdir_recursive(
384             path, max_depth = (None if max_manifest_depth == 0 else 0))
385         if d:
386             self._queue_dirents(stream_name, d)
387         else:
388             self._queued_trees.popleft()
389
390     def _queue_file(self, source, filename=None):
391         assert (self._queued_file is None), "tried to queue more than one file"
392         if not hasattr(source, 'read'):
393             source = open(source, 'rb')
394             self._close_file = True
395         else:
396             self._close_file = False
397         if filename is None:
398             filename = os.path.basename(source.name)
399         self.start_new_file(filename)
400         self._queued_file = source
401
402     def _queue_dirents(self, stream_name, dirents):
403         assert (not self._queued_dirents), "tried to queue more than one tree"
404         self._queued_dirents = deque(sorted(dirents))
405
406     def _queue_tree(self, path, stream_name, max_manifest_depth):
407         self._queued_trees.append((path, stream_name, max_manifest_depth))
408
409     def write_file(self, source, filename=None):
410         self._queue_file(source, filename)
411         self.do_queued_work()
412
413     def write_directory_tree(self,
414                              path, stream_name='.', max_manifest_depth=-1):
415         self._queue_tree(path, stream_name, max_manifest_depth)
416         self.do_queued_work()
417
418     def write(self, newdata):
419         if hasattr(newdata, '__iter__'):
420             for s in newdata:
421                 self.write(s)
422             return
423         self._data_buffer.append(newdata)
424         self._data_buffer_len += len(newdata)
425         self._current_stream_length += len(newdata)
426         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
427             self.flush_data()
428
429     def open(self, streampath, filename=None):
430         """open(streampath[, filename]) -> file-like object
431
432         Pass in the path of a file to write to the Collection, either as a
433         single string or as two separate stream name and file name arguments.
434         This method returns a file-like object you can write to add it to the
435         Collection.
436
437         You may only have one file object from the Collection open at a time,
438         so be sure to close the object when you're done.  Using the object in
439         a with statement makes that easy::
440
441           with cwriter.open('./doc/page1.txt') as outfile:
442               outfile.write(page1_data)
443           with cwriter.open('./doc/page2.txt') as outfile:
444               outfile.write(page2_data)
445         """
446         if filename is None:
447             streampath, filename = split(streampath)
448         if self._last_open and not self._last_open.closed:
449             raise errors.AssertionError(
450                 "can't open '{}' when '{}' is still open".format(
451                     filename, self._last_open.name))
452         if streampath != self.current_stream_name():
453             self.start_new_stream(streampath)
454         self.set_current_file_name(filename)
455         self._last_open = _WriterFile(self, filename)
456         return self._last_open
457
458     def flush_data(self):
459         data_buffer = ''.join(self._data_buffer)
460         if data_buffer:
461             self._current_stream_locators.append(
462                 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
463             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
464             self._data_buffer_len = len(self._data_buffer[0])
465
466     def start_new_file(self, newfilename=None):
467         self.finish_current_file()
468         self.set_current_file_name(newfilename)
469
470     def set_current_file_name(self, newfilename):
471         if re.search(r'[\t\n]', newfilename):
472             raise errors.AssertionError(
473                 "Manifest filenames cannot contain whitespace: %s" %
474                 newfilename)
475         elif re.search(r'\x00', newfilename):
476             raise errors.AssertionError(
477                 "Manifest filenames cannot contain NUL characters: %s" %
478                 newfilename)
479         self._current_file_name = newfilename
480
481     def current_file_name(self):
482         return self._current_file_name
483
484     def finish_current_file(self):
485         if self._current_file_name is None:
486             if self._current_file_pos == self._current_stream_length:
487                 return
488             raise errors.AssertionError(
489                 "Cannot finish an unnamed file " +
490                 "(%d bytes at offset %d in '%s' stream)" %
491                 (self._current_stream_length - self._current_file_pos,
492                  self._current_file_pos,
493                  self._current_stream_name))
494         self._current_stream_files.append([
495                 self._current_file_pos,
496                 self._current_stream_length - self._current_file_pos,
497                 self._current_file_name])
498         self._current_file_pos = self._current_stream_length
499         self._current_file_name = None
500
501     def start_new_stream(self, newstreamname='.'):
502         self.finish_current_stream()
503         self.set_current_stream_name(newstreamname)
504
505     def set_current_stream_name(self, newstreamname):
506         if re.search(r'[\t\n]', newstreamname):
507             raise errors.AssertionError(
508                 "Manifest stream names cannot contain whitespace")
509         self._current_stream_name = '.' if newstreamname=='' else newstreamname
510
511     def current_stream_name(self):
512         return self._current_stream_name
513
514     def finish_current_stream(self):
515         self.finish_current_file()
516         self.flush_data()
517         if not self._current_stream_files:
518             pass
519         elif self._current_stream_name is None:
520             raise errors.AssertionError(
521                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
522                 (self._current_stream_length, len(self._current_stream_files)))
523         else:
524             if not self._current_stream_locators:
525                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
526             self._finished_streams.append([self._current_stream_name,
527                                            self._current_stream_locators,
528                                            self._current_stream_files])
529         self._current_stream_files = []
530         self._current_stream_length = 0
531         self._current_stream_locators = []
532         self._current_stream_name = None
533         self._current_file_pos = 0
534         self._current_file_name = None
535
536     def finish(self):
537         # Store the manifest in Keep and return its locator.
538         return self._my_keep().put(self.manifest_text())
539
540     def portable_data_hash(self):
541         stripped = self.stripped_manifest()
542         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
543
544     def manifest_text(self):
545         self.finish_current_stream()
546         manifest = ''
547
548         for stream in self._finished_streams:
549             if not re.search(r'^\.(/.*)?$', stream[0]):
550                 manifest += './'
551             manifest += stream[0].replace(' ', '\\040')
552             manifest += ' ' + ' '.join(stream[1])
553             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
554             manifest += "\n"
555
556         return manifest
557
558     def data_locators(self):
559         ret = []
560         for name, locators, files in self._finished_streams:
561             ret += locators
562         return ret
563
564
565 class ResumableCollectionWriter(CollectionWriter):
566     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
567                    '_current_stream_locators', '_current_stream_name',
568                    '_current_file_name', '_current_file_pos', '_close_file',
569                    '_data_buffer', '_dependencies', '_finished_streams',
570                    '_queued_dirents', '_queued_trees']
571
572     def __init__(self, api_client=None, num_retries=0):
573         self._dependencies = {}
574         super(ResumableCollectionWriter, self).__init__(
575             api_client, num_retries=num_retries)
576
577     @classmethod
578     def from_state(cls, state, *init_args, **init_kwargs):
579         # Try to build a new writer from scratch with the given state.
580         # If the state is not suitable to resume (because files have changed,
581         # been deleted, aren't predictable, etc.), raise a
582         # StaleWriterStateError.  Otherwise, return the initialized writer.
583         # The caller is responsible for calling writer.do_queued_work()
584         # appropriately after it's returned.
585         writer = cls(*init_args, **init_kwargs)
586         for attr_name in cls.STATE_PROPS:
587             attr_value = state[attr_name]
588             attr_class = getattr(writer, attr_name).__class__
589             # Coerce the value into the same type as the initial value, if
590             # needed.
591             if attr_class not in (type(None), attr_value.__class__):
592                 attr_value = attr_class(attr_value)
593             setattr(writer, attr_name, attr_value)
594         # Check dependencies before we try to resume anything.
595         if any(KeepLocator(ls).permission_expired()
596                for ls in writer._current_stream_locators):
597             raise errors.StaleWriterStateError(
598                 "locators include expired permission hint")
599         writer.check_dependencies()
600         if state['_current_file'] is not None:
601             path, pos = state['_current_file']
602             try:
603                 writer._queued_file = open(path, 'rb')
604                 writer._queued_file.seek(pos)
605             except IOError as error:
606                 raise errors.StaleWriterStateError(
607                     "failed to reopen active file {}: {}".format(path, error))
608         return writer
609
610     def check_dependencies(self):
611         for path, orig_stat in self._dependencies.items():
612             if not S_ISREG(orig_stat[ST_MODE]):
613                 raise errors.StaleWriterStateError("{} not file".format(path))
614             try:
615                 now_stat = tuple(os.stat(path))
616             except OSError as error:
617                 raise errors.StaleWriterStateError(
618                     "failed to stat {}: {}".format(path, error))
619             if ((not S_ISREG(now_stat[ST_MODE])) or
620                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
621                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
622                 raise errors.StaleWriterStateError("{} changed".format(path))
623
624     def dump_state(self, copy_func=lambda x: x):
625         state = {attr: copy_func(getattr(self, attr))
626                  for attr in self.STATE_PROPS}
627         if self._queued_file is None:
628             state['_current_file'] = None
629         else:
630             state['_current_file'] = (os.path.realpath(self._queued_file.name),
631                                       self._queued_file.tell())
632         return state
633
634     def _queue_file(self, source, filename=None):
635         try:
636             src_path = os.path.realpath(source)
637         except Exception:
638             raise errors.AssertionError("{} not a file path".format(source))
639         try:
640             path_stat = os.stat(src_path)
641         except OSError as stat_error:
642             path_stat = None
643         super(ResumableCollectionWriter, self)._queue_file(source, filename)
644         fd_stat = os.fstat(self._queued_file.fileno())
645         if not S_ISREG(fd_stat.st_mode):
646             # We won't be able to resume from this cache anyway, so don't
647             # worry about further checks.
648             self._dependencies[source] = tuple(fd_stat)
649         elif path_stat is None:
650             raise errors.AssertionError(
651                 "could not stat {}: {}".format(source, stat_error))
652         elif path_stat.st_ino != fd_stat.st_ino:
653             raise errors.AssertionError(
654                 "{} changed between open and stat calls".format(source))
655         else:
656             self._dependencies[src_path] = tuple(fd_stat)
657
658     def write(self, data):
659         if self._queued_file is None:
660             raise errors.AssertionError(
661                 "resumable writer can't accept unsourced data")
662         return super(ResumableCollectionWriter, self).write(data)