3504: CollectionReader accepts arvados object collection uuids. 'arv-put' sets
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if len(stream[f]) == 0:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71 def normalize(collection):
72     streams = {}
73     for s in collection.all_streams():
74         for f in s.all_files():
75             filestream = s.name() + "/" + f.name()
76             r = filestream.rindex("/")
77             streamname = filestream[:r]
78             filename = filestream[r+1:]
79             if streamname not in streams:
80                 streams[streamname] = {}
81             if filename not in streams[streamname]:
82                 streams[streamname][filename] = []
83             for r in f.segments:
84                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
85
86     normalized_streams = []
87     sortedstreams = list(streams.keys())
88     sortedstreams.sort()
89     for s in sortedstreams:
90         normalized_streams.append(normalize_stream(s, streams[s]))
91     return normalized_streams
92
93
94 class CollectionReader(object):
95     def __init__(self, manifest_locator_or_text):
96         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
97             self._manifest_locator = manifest_locator_or_text
98             self._manifest_text = None
99         elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
100             self._manifest_locator = manifest_locator_or_text
101             self._manifest_text = None
102         elif re.match(r'(\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
103             self._manifest_text = manifest_locator_or_text
104             self._manifest_locator = None
105         else:
106             raise errors.ArgumentError(
107                 "Argument to CollectionReader must be a manifest or a collection UUID")
108         self._streams = None
109
110     def __enter__(self):
111         pass
112
113     def __exit__(self):
114         pass
115
116     def _populate(self):
117         if self._streams != None:
118             return
119         if not self._manifest_text:
120             try:
121                 c = arvados.api('v1').collections().get(
122                     uuid=self._manifest_locator).execute()
123                 self._manifest_text = c['manifest_text']
124             except Exception as e:
125                 _logger.warning("API lookup failed for collection %s (%s: %s)",
126                                 self._manifest_locator, type(e), str(e))
127                 self._manifest_text = Keep.get(self._manifest_locator)
128         self._streams = []
129         for stream_line in self._manifest_text.split("\n"):
130             if stream_line != '':
131                 stream_tokens = stream_line.split()
132                 self._streams += [stream_tokens]
133         self._streams = normalize(self)
134
135         # now regenerate the manifest text based on the normalized stream
136
137         #print "normalizing", self._manifest_text
138         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
139         #print "result", self._manifest_text
140
141
142     def all_streams(self):
143         self._populate()
144         resp = []
145         for s in self._streams:
146             resp.append(StreamReader(s))
147         return resp
148
149     def all_files(self):
150         for s in self.all_streams():
151             for f in s.all_files():
152                 yield f
153
154     def manifest_text(self, strip=False):
155         self._populate()
156         if strip:
157             m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
158             return m
159         else:
160             return self._manifest_text
161
162 class CollectionWriter(object):
163     KEEP_BLOCK_SIZE = 2**26
164
165     def __init__(self):
166         self._data_buffer = []
167         self._data_buffer_len = 0
168         self._current_stream_files = []
169         self._current_stream_length = 0
170         self._current_stream_locators = []
171         self._current_stream_name = '.'
172         self._current_file_name = None
173         self._current_file_pos = 0
174         self._finished_streams = []
175         self._close_file = None
176         self._queued_file = None
177         self._queued_dirents = deque()
178         self._queued_trees = deque()
179
180     def __enter__(self):
181         pass
182
183     def __exit__(self):
184         self.finish()
185
186     def do_queued_work(self):
187         # The work queue consists of three pieces:
188         # * _queued_file: The file object we're currently writing to the
189         #   Collection.
190         # * _queued_dirents: Entries under the current directory
191         #   (_queued_trees[0]) that we want to write or recurse through.
192         #   This may contain files from subdirectories if
193         #   max_manifest_depth == 0 for this directory.
194         # * _queued_trees: Directories that should be written as separate
195         #   streams to the Collection.
196         # This function handles the smallest piece of work currently queued
197         # (current file, then current directory, then next directory) until
198         # no work remains.  The _work_THING methods each do a unit of work on
199         # THING.  _queue_THING methods add a THING to the work queue.
200         while True:
201             if self._queued_file:
202                 self._work_file()
203             elif self._queued_dirents:
204                 self._work_dirents()
205             elif self._queued_trees:
206                 self._work_trees()
207             else:
208                 break
209
210     def _work_file(self):
211         while True:
212             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
213             if not buf:
214                 break
215             self.write(buf)
216         self.finish_current_file()
217         if self._close_file:
218             self._queued_file.close()
219         self._close_file = None
220         self._queued_file = None
221
222     def _work_dirents(self):
223         path, stream_name, max_manifest_depth = self._queued_trees[0]
224         if stream_name != self.current_stream_name():
225             self.start_new_stream(stream_name)
226         while self._queued_dirents:
227             dirent = self._queued_dirents.popleft()
228             target = os.path.join(path, dirent)
229             if os.path.isdir(target):
230                 self._queue_tree(target,
231                                  os.path.join(stream_name, dirent),
232                                  max_manifest_depth - 1)
233             else:
234                 self._queue_file(target, dirent)
235                 break
236         if not self._queued_dirents:
237             self._queued_trees.popleft()
238
239     def _work_trees(self):
240         path, stream_name, max_manifest_depth = self._queued_trees[0]
241         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
242                         else os.listdir)
243         d = make_dirents(path)
244         if len(d) > 0:
245             self._queue_dirents(stream_name, d)
246         else:
247             self._queued_trees.popleft()
248
249     def _queue_file(self, source, filename=None):
250         assert (self._queued_file is None), "tried to queue more than one file"
251         if not hasattr(source, 'read'):
252             source = open(source, 'rb')
253             self._close_file = True
254         else:
255             self._close_file = False
256         if filename is None:
257             filename = os.path.basename(source.name)
258         self.start_new_file(filename)
259         self._queued_file = source
260
261     def _queue_dirents(self, stream_name, dirents):
262         assert (not self._queued_dirents), "tried to queue more than one tree"
263         self._queued_dirents = deque(sorted(dirents))
264
265     def _queue_tree(self, path, stream_name, max_manifest_depth):
266         self._queued_trees.append((path, stream_name, max_manifest_depth))
267
268     def write_file(self, source, filename=None):
269         self._queue_file(source, filename)
270         self.do_queued_work()
271
272     def write_directory_tree(self,
273                              path, stream_name='.', max_manifest_depth=-1):
274         self._queue_tree(path, stream_name, max_manifest_depth)
275         self.do_queued_work()
276
277     def write(self, newdata):
278         if hasattr(newdata, '__iter__'):
279             for s in newdata:
280                 self.write(s)
281             return
282         self._data_buffer += [newdata]
283         self._data_buffer_len += len(newdata)
284         self._current_stream_length += len(newdata)
285         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
286             self.flush_data()
287
288     def flush_data(self):
289         data_buffer = ''.join(self._data_buffer)
290         if data_buffer != '':
291             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
292             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
293             self._data_buffer_len = len(self._data_buffer[0])
294
295     def start_new_file(self, newfilename=None):
296         self.finish_current_file()
297         self.set_current_file_name(newfilename)
298
299     def set_current_file_name(self, newfilename):
300         if re.search(r'[\t\n]', newfilename):
301             raise errors.AssertionError(
302                 "Manifest filenames cannot contain whitespace: %s" %
303                 newfilename)
304         self._current_file_name = newfilename
305
306     def current_file_name(self):
307         return self._current_file_name
308
309     def finish_current_file(self):
310         if self._current_file_name == None:
311             if self._current_file_pos == self._current_stream_length:
312                 return
313             raise errors.AssertionError(
314                 "Cannot finish an unnamed file " +
315                 "(%d bytes at offset %d in '%s' stream)" %
316                 (self._current_stream_length - self._current_file_pos,
317                  self._current_file_pos,
318                  self._current_stream_name))
319         self._current_stream_files += [[self._current_file_pos,
320                                         self._current_stream_length - self._current_file_pos,
321                                         self._current_file_name]]
322         self._current_file_pos = self._current_stream_length
323
324     def start_new_stream(self, newstreamname='.'):
325         self.finish_current_stream()
326         self.set_current_stream_name(newstreamname)
327
328     def set_current_stream_name(self, newstreamname):
329         if re.search(r'[\t\n]', newstreamname):
330             raise errors.AssertionError(
331                 "Manifest stream names cannot contain whitespace")
332         self._current_stream_name = '.' if newstreamname=='' else newstreamname
333
334     def current_stream_name(self):
335         return self._current_stream_name
336
337     def finish_current_stream(self):
338         self.finish_current_file()
339         self.flush_data()
340         if len(self._current_stream_files) == 0:
341             pass
342         elif self._current_stream_name == None:
343             raise errors.AssertionError(
344                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
345                 (self._current_stream_length, len(self._current_stream_files)))
346         else:
347             if len(self._current_stream_locators) == 0:
348                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
349             self._finished_streams += [[self._current_stream_name,
350                                         self._current_stream_locators,
351                                         self._current_stream_files]]
352         self._current_stream_files = []
353         self._current_stream_length = 0
354         self._current_stream_locators = []
355         self._current_stream_name = None
356         self._current_file_pos = 0
357         self._current_file_name = None
358
359     def finish(self):
360         # Store the manifest in Keep and return its locator.
361         return Keep.put(self.manifest_text())
362
363     def stripped_manifest(self):
364         """
365         Return the manifest for the current collection with all permission
366         hints removed from the locators in the manifest.
367         """
368         raw = self.manifest_text()
369         clean = ''
370         for line in raw.split("\n"):
371             fields = line.split()
372             if len(fields) > 0:
373                 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
374                              for x in fields[1:-1] ]
375                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
376         return clean
377
378     def manifest_text(self):
379         self.finish_current_stream()
380         manifest = ''
381
382         for stream in self._finished_streams:
383             if not re.search(r'^\.(/.*)?$', stream[0]):
384                 manifest += './'
385             manifest += stream[0].replace(' ', '\\040')
386             manifest += ' ' + ' '.join(stream[1])
387             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
388             manifest += "\n"
389
390         if len(manifest) > 0:
391             return CollectionReader(manifest).manifest_text()
392         else:
393             return ""
394
395     def data_locators(self):
396         ret = []
397         for name, locators, files in self._finished_streams:
398             ret += locators
399         return ret
400
401
402 class ResumableCollectionWriter(CollectionWriter):
403     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
404                    '_current_stream_locators', '_current_stream_name',
405                    '_current_file_name', '_current_file_pos', '_close_file',
406                    '_data_buffer', '_dependencies', '_finished_streams',
407                    '_queued_dirents', '_queued_trees']
408
409     def __init__(self):
410         self._dependencies = {}
411         super(ResumableCollectionWriter, self).__init__()
412
413     @classmethod
414     def from_state(cls, state, *init_args, **init_kwargs):
415         # Try to build a new writer from scratch with the given state.
416         # If the state is not suitable to resume (because files have changed,
417         # been deleted, aren't predictable, etc.), raise a
418         # StaleWriterStateError.  Otherwise, return the initialized writer.
419         # The caller is responsible for calling writer.do_queued_work()
420         # appropriately after it's returned.
421         writer = cls(*init_args, **init_kwargs)
422         for attr_name in cls.STATE_PROPS:
423             attr_value = state[attr_name]
424             attr_class = getattr(writer, attr_name).__class__
425             # Coerce the value into the same type as the initial value, if
426             # needed.
427             if attr_class not in (type(None), attr_value.__class__):
428                 attr_value = attr_class(attr_value)
429             setattr(writer, attr_name, attr_value)
430         # Check dependencies before we try to resume anything.
431         if any(KeepLocator(ls).permission_expired()
432                for ls in writer._current_stream_locators):
433             raise errors.StaleWriterStateError(
434                 "locators include expired permission hint")
435         writer.check_dependencies()
436         if state['_current_file'] is not None:
437             path, pos = state['_current_file']
438             try:
439                 writer._queued_file = open(path, 'rb')
440                 writer._queued_file.seek(pos)
441             except IOError as error:
442                 raise errors.StaleWriterStateError(
443                     "failed to reopen active file {}: {}".format(path, error))
444         return writer
445
446     def check_dependencies(self):
447         for path, orig_stat in self._dependencies.items():
448             if not S_ISREG(orig_stat[ST_MODE]):
449                 raise errors.StaleWriterStateError("{} not file".format(path))
450             try:
451                 now_stat = tuple(os.stat(path))
452             except OSError as error:
453                 raise errors.StaleWriterStateError(
454                     "failed to stat {}: {}".format(path, error))
455             if ((not S_ISREG(now_stat[ST_MODE])) or
456                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
457                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
458                 raise errors.StaleWriterStateError("{} changed".format(path))
459
460     def dump_state(self, copy_func=lambda x: x):
461         state = {attr: copy_func(getattr(self, attr))
462                  for attr in self.STATE_PROPS}
463         if self._queued_file is None:
464             state['_current_file'] = None
465         else:
466             state['_current_file'] = (os.path.realpath(self._queued_file.name),
467                                       self._queued_file.tell())
468         return state
469
470     def _queue_file(self, source, filename=None):
471         try:
472             src_path = os.path.realpath(source)
473         except Exception:
474             raise errors.AssertionError("{} not a file path".format(source))
475         try:
476             path_stat = os.stat(src_path)
477         except OSError as stat_error:
478             path_stat = None
479         super(ResumableCollectionWriter, self)._queue_file(source, filename)
480         fd_stat = os.fstat(self._queued_file.fileno())
481         if not S_ISREG(fd_stat.st_mode):
482             # We won't be able to resume from this cache anyway, so don't
483             # worry about further checks.
484             self._dependencies[source] = tuple(fd_stat)
485         elif path_stat is None:
486             raise errors.AssertionError(
487                 "could not stat {}: {}".format(source, stat_error))
488         elif path_stat.st_ino != fd_stat.st_ino:
489             raise errors.AssertionError(
490                 "{} changed between open and stat calls".format(source))
491         else:
492             self._dependencies[src_path] = tuple(fd_stat)
493
494     def write(self, data):
495         if self._queued_file is None:
496             raise errors.AssertionError(
497                 "resumable writer can't accept unsourced data")
498         return super(ResumableCollectionWriter, self).write(data)