3706: Do not leave zero-length file segment at end of stream after finish_current_file().
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if len(stream[f]) == 0:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71 def normalize(collection):
72     streams = {}
73     for s in collection.all_streams():
74         for f in s.all_files():
75             filestream = s.name() + "/" + f.name()
76             r = filestream.rindex("/")
77             streamname = filestream[:r]
78             filename = filestream[r+1:]
79             if streamname not in streams:
80                 streams[streamname] = {}
81             if filename not in streams[streamname]:
82                 streams[streamname][filename] = []
83             for r in f.segments:
84                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
85
86     normalized_streams = []
87     sortedstreams = list(streams.keys())
88     sortedstreams.sort()
89     for s in sortedstreams:
90         normalized_streams.append(normalize_stream(s, streams[s]))
91     return normalized_streams
92
93
94 class CollectionBase(object):
95     def __enter__(self):
96         pass
97
98     def __exit__(self):
99         pass
100
101     def _my_keep(self):
102         if self._keep_client is None:
103             self._keep_client = KeepClient(api_client=self._api_client,
104                                            num_retries=self.num_retries)
105         return self._keep_client
106
107
108 class CollectionReader(CollectionBase):
109     def __init__(self, manifest_locator_or_text, api_client=None,
110                  keep_client=None, num_retries=0):
111         """Instantiate a CollectionReader.
112
113         This class parses Collection manifests to provide a simple interface
114         to read its underlying files.
115
116         Arguments:
117         * manifest_locator_or_text: One of a Collection UUID, portable data
118           hash, or full manifest text.
119         * api_client: The API client to use to look up Collections.  If not
120           provided, CollectionReader will build one from available Arvados
121           configuration.
122         * keep_client: The KeepClient to use to download Collection data.
123           If not provided, CollectionReader will build one from available
124           Arvados configuration.
125         * num_retries: The default number of times to retry failed
126           service requests.  Default 0.  You may change this value
127           after instantiation, but note those changes may not
128           propagate to related objects like the Keep client.
129         """
130         self._api_client = api_client
131         self._keep_client = keep_client
132         self.num_retries = num_retries
133         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
134             self._manifest_locator = manifest_locator_or_text
135             self._manifest_text = None
136         elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
137             self._manifest_locator = manifest_locator_or_text
138             self._manifest_text = None
139         elif re.match(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', manifest_locator_or_text, re.MULTILINE):
140             self._manifest_text = manifest_locator_or_text
141             self._manifest_locator = None
142         else:
143             raise errors.ArgumentError(
144                 "Argument to CollectionReader must be a manifest or a collection UUID")
145         self._streams = None
146
147     def _populate(self):
148         if self._streams is not None:
149             return
150         error_via_api = None
151         error_via_keep = None
152         should_try_keep = (not self._manifest_text and
153                            util.keep_locator_pattern.match(
154                 self._manifest_locator))
155         if (not self._manifest_text and
156             util.signed_locator_pattern.match(self._manifest_locator)):
157             try:
158                 self._populate_from_keep()
159             except e:
160                 error_via_keep = e
161         if not self._manifest_text:
162             try:
163                 self._populate_from_api_server()
164             except Exception as e:
165                 if not should_try_keep:
166                     raise
167                 error_via_api = e
168         if (not self._manifest_text and
169             not error_via_keep and
170             should_try_keep):
171             # Looks like a keep locator, and we didn't already try keep above
172             try:
173                 self._populate_from_keep()
174             except Exception as e:
175                 error_via_keep = e
176         if not self._manifest_text:
177             # Nothing worked!
178             raise arvados.errors.NotFoundError(
179                 ("Failed to retrieve collection '{}' " +
180                  "from either API server ({}) or Keep ({})."
181                  ).format(
182                     self._manifest_locator,
183                     error_via_api,
184                     error_via_keep))
185         self._streams = [sline.split()
186                          for sline in self._manifest_text.split("\n")
187                          if sline]
188         self._streams = normalize(self)
189
190         # now regenerate the manifest text based on the normalized stream
191
192         #print "normalizing", self._manifest_text
193         self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
194         #print "result", self._manifest_text
195
196
197     def all_streams(self):
198         self._populate()
199         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
200                 for s in self._streams]
201
202     def all_files(self):
203         for s in self.all_streams():
204             for f in s.all_files():
205                 yield f
206
207     def manifest_text(self, strip=False):
208         self._populate()
209         if strip:
210             m = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text(strip=True) for stream in self._streams])
211             return m
212         else:
213             return self._manifest_text
214
215
216 class CollectionWriter(CollectionBase):
217     KEEP_BLOCK_SIZE = 2**26
218
219     def __init__(self, api_client=None, num_retries=0):
220         """Instantiate a CollectionWriter.
221
222         CollectionWriter lets you build a new Arvados Collection from scratch.
223         Write files to it.  The CollectionWriter will upload data to Keep as
224         appropriate, and provide you with the Collection manifest text when
225         you're finished.
226
227         Arguments:
228         * api_client: The API client to use to look up Collections.  If not
229           provided, CollectionReader will build one from available Arvados
230           configuration.
231         * num_retries: The default number of times to retry failed
232           service requests.  Default 0.  You may change this value
233           after instantiation, but note those changes may not
234           propagate to related objects like the Keep client.
235         """
236         self._api_client = api_client
237         self.num_retries = num_retries
238         self._keep_client = None
239         self._data_buffer = []
240         self._data_buffer_len = 0
241         self._current_stream_files = []
242         self._current_stream_length = 0
243         self._current_stream_locators = []
244         self._current_stream_name = '.'
245         self._current_file_name = None
246         self._current_file_pos = 0
247         self._finished_streams = []
248         self._close_file = None
249         self._queued_file = None
250         self._queued_dirents = deque()
251         self._queued_trees = deque()
252
253     def __exit__(self):
254         self.finish()
255
256     def do_queued_work(self):
257         # The work queue consists of three pieces:
258         # * _queued_file: The file object we're currently writing to the
259         #   Collection.
260         # * _queued_dirents: Entries under the current directory
261         #   (_queued_trees[0]) that we want to write or recurse through.
262         #   This may contain files from subdirectories if
263         #   max_manifest_depth == 0 for this directory.
264         # * _queued_trees: Directories that should be written as separate
265         #   streams to the Collection.
266         # This function handles the smallest piece of work currently queued
267         # (current file, then current directory, then next directory) until
268         # no work remains.  The _work_THING methods each do a unit of work on
269         # THING.  _queue_THING methods add a THING to the work queue.
270         while True:
271             if self._queued_file:
272                 self._work_file()
273             elif self._queued_dirents:
274                 self._work_dirents()
275             elif self._queued_trees:
276                 self._work_trees()
277             else:
278                 break
279
280     def _work_file(self):
281         while True:
282             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
283             if not buf:
284                 break
285             self.write(buf)
286         self.finish_current_file()
287         if self._close_file:
288             self._queued_file.close()
289         self._close_file = None
290         self._queued_file = None
291
292     def _work_dirents(self):
293         path, stream_name, max_manifest_depth = self._queued_trees[0]
294         if stream_name != self.current_stream_name():
295             self.start_new_stream(stream_name)
296         while self._queued_dirents:
297             dirent = self._queued_dirents.popleft()
298             target = os.path.join(path, dirent)
299             if os.path.isdir(target):
300                 self._queue_tree(target,
301                                  os.path.join(stream_name, dirent),
302                                  max_manifest_depth - 1)
303             else:
304                 self._queue_file(target, dirent)
305                 break
306         if not self._queued_dirents:
307             self._queued_trees.popleft()
308
309     def _work_trees(self):
310         path, stream_name, max_manifest_depth = self._queued_trees[0]
311         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
312                         else os.listdir)
313         d = make_dirents(path)
314         if len(d) > 0:
315             self._queue_dirents(stream_name, d)
316         else:
317             self._queued_trees.popleft()
318
319     def _queue_file(self, source, filename=None):
320         assert (self._queued_file is None), "tried to queue more than one file"
321         if not hasattr(source, 'read'):
322             source = open(source, 'rb')
323             self._close_file = True
324         else:
325             self._close_file = False
326         if filename is None:
327             filename = os.path.basename(source.name)
328         self.start_new_file(filename)
329         self._queued_file = source
330
331     def _queue_dirents(self, stream_name, dirents):
332         assert (not self._queued_dirents), "tried to queue more than one tree"
333         self._queued_dirents = deque(sorted(dirents))
334
335     def _queue_tree(self, path, stream_name, max_manifest_depth):
336         self._queued_trees.append((path, stream_name, max_manifest_depth))
337
338     def write_file(self, source, filename=None):
339         self._queue_file(source, filename)
340         self.do_queued_work()
341
342     def write_directory_tree(self,
343                              path, stream_name='.', max_manifest_depth=-1):
344         self._queue_tree(path, stream_name, max_manifest_depth)
345         self.do_queued_work()
346
347     def write(self, newdata):
348         if hasattr(newdata, '__iter__'):
349             for s in newdata:
350                 self.write(s)
351             return
352         self._data_buffer.append(newdata)
353         self._data_buffer_len += len(newdata)
354         self._current_stream_length += len(newdata)
355         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
356             self.flush_data()
357
358     def flush_data(self):
359         data_buffer = ''.join(self._data_buffer)
360         if data_buffer:
361             self._current_stream_locators.append(
362                 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
363             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
364             self._data_buffer_len = len(self._data_buffer[0])
365
366     def start_new_file(self, newfilename=None):
367         self.finish_current_file()
368         self.set_current_file_name(newfilename)
369
370     def set_current_file_name(self, newfilename):
371         if re.search(r'[\t\n]', newfilename):
372             raise errors.AssertionError(
373                 "Manifest filenames cannot contain whitespace: %s" %
374                 newfilename)
375         self._current_file_name = newfilename
376
377     def current_file_name(self):
378         return self._current_file_name
379
380     def finish_current_file(self):
381         if self._current_file_name == None:
382             if self._current_file_pos == self._current_stream_length:
383                 return
384             raise errors.AssertionError(
385                 "Cannot finish an unnamed file " +
386                 "(%d bytes at offset %d in '%s' stream)" %
387                 (self._current_stream_length - self._current_file_pos,
388                  self._current_file_pos,
389                  self._current_stream_name))
390         self._current_stream_files.append([
391                 self._current_file_pos,
392                 self._current_stream_length - self._current_file_pos,
393                 self._current_file_name])
394         self._current_file_pos = self._current_stream_length
395         self._current_file_name = None
396
397     def start_new_stream(self, newstreamname='.'):
398         self.finish_current_stream()
399         self.set_current_stream_name(newstreamname)
400
401     def set_current_stream_name(self, newstreamname):
402         if re.search(r'[\t\n]', newstreamname):
403             raise errors.AssertionError(
404                 "Manifest stream names cannot contain whitespace")
405         self._current_stream_name = '.' if newstreamname=='' else newstreamname
406
407     def current_stream_name(self):
408         return self._current_stream_name
409
410     def finish_current_stream(self):
411         self.finish_current_file()
412         self.flush_data()
413         if not self._current_stream_files:
414             pass
415         elif self._current_stream_name is None:
416             raise errors.AssertionError(
417                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
418                 (self._current_stream_length, len(self._current_stream_files)))
419         else:
420             if not self._current_stream_locators:
421                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
422             self._finished_streams.append([self._current_stream_name,
423                                            self._current_stream_locators,
424                                            self._current_stream_files])
425         self._current_stream_files = []
426         self._current_stream_length = 0
427         self._current_stream_locators = []
428         self._current_stream_name = None
429         self._current_file_pos = 0
430         self._current_file_name = None
431
432     def finish(self):
433         # Store the manifest in Keep and return its locator.
434         return self._my_keep().put(self.manifest_text())
435
436     def stripped_manifest(self):
437         """
438         Return the manifest for the current collection with all permission
439         hints removed from the locators in the manifest.
440         """
441         raw = self.manifest_text()
442         clean = ''
443         for line in raw.split("\n"):
444             fields = line.split()
445             if len(fields) > 0:
446                 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
447                              for x in fields[1:-1] ]
448                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
449         return clean
450
451     def manifest_text(self):
452         self.finish_current_stream()
453         manifest = ''
454
455         for stream in self._finished_streams:
456             if not re.search(r'^\.(/.*)?$', stream[0]):
457                 manifest += './'
458             manifest += stream[0].replace(' ', '\\040')
459             manifest += ' ' + ' '.join(stream[1])
460             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
461             manifest += "\n"
462
463         if manifest:
464             return CollectionReader(manifest, self._api_client).manifest_text()
465         else:
466             return ""
467
468     def data_locators(self):
469         ret = []
470         for name, locators, files in self._finished_streams:
471             ret += locators
472         return ret
473
474
475 class ResumableCollectionWriter(CollectionWriter):
476     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
477                    '_current_stream_locators', '_current_stream_name',
478                    '_current_file_name', '_current_file_pos', '_close_file',
479                    '_data_buffer', '_dependencies', '_finished_streams',
480                    '_queued_dirents', '_queued_trees']
481
482     def __init__(self, api_client=None, num_retries=0):
483         self._dependencies = {}
484         super(ResumableCollectionWriter, self).__init__(
485             api_client, num_retries=num_retries)
486
487     @classmethod
488     def from_state(cls, state, *init_args, **init_kwargs):
489         # Try to build a new writer from scratch with the given state.
490         # If the state is not suitable to resume (because files have changed,
491         # been deleted, aren't predictable, etc.), raise a
492         # StaleWriterStateError.  Otherwise, return the initialized writer.
493         # The caller is responsible for calling writer.do_queued_work()
494         # appropriately after it's returned.
495         writer = cls(*init_args, **init_kwargs)
496         for attr_name in cls.STATE_PROPS:
497             attr_value = state[attr_name]
498             attr_class = getattr(writer, attr_name).__class__
499             # Coerce the value into the same type as the initial value, if
500             # needed.
501             if attr_class not in (type(None), attr_value.__class__):
502                 attr_value = attr_class(attr_value)
503             setattr(writer, attr_name, attr_value)
504         # Check dependencies before we try to resume anything.
505         if any(KeepLocator(ls).permission_expired()
506                for ls in writer._current_stream_locators):
507             raise errors.StaleWriterStateError(
508                 "locators include expired permission hint")
509         writer.check_dependencies()
510         if state['_current_file'] is not None:
511             path, pos = state['_current_file']
512             try:
513                 writer._queued_file = open(path, 'rb')
514                 writer._queued_file.seek(pos)
515             except IOError as error:
516                 raise errors.StaleWriterStateError(
517                     "failed to reopen active file {}: {}".format(path, error))
518         return writer
519
520     def check_dependencies(self):
521         for path, orig_stat in self._dependencies.items():
522             if not S_ISREG(orig_stat[ST_MODE]):
523                 raise errors.StaleWriterStateError("{} not file".format(path))
524             try:
525                 now_stat = tuple(os.stat(path))
526             except OSError as error:
527                 raise errors.StaleWriterStateError(
528                     "failed to stat {}: {}".format(path, error))
529             if ((not S_ISREG(now_stat[ST_MODE])) or
530                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
531                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
532                 raise errors.StaleWriterStateError("{} changed".format(path))
533
534     def dump_state(self, copy_func=lambda x: x):
535         state = {attr: copy_func(getattr(self, attr))
536                  for attr in self.STATE_PROPS}
537         if self._queued_file is None:
538             state['_current_file'] = None
539         else:
540             state['_current_file'] = (os.path.realpath(self._queued_file.name),
541                                       self._queued_file.tell())
542         return state
543
544     def _queue_file(self, source, filename=None):
545         try:
546             src_path = os.path.realpath(source)
547         except Exception:
548             raise errors.AssertionError("{} not a file path".format(source))
549         try:
550             path_stat = os.stat(src_path)
551         except OSError as stat_error:
552             path_stat = None
553         super(ResumableCollectionWriter, self)._queue_file(source, filename)
554         fd_stat = os.fstat(self._queued_file.fileno())
555         if not S_ISREG(fd_stat.st_mode):
556             # We won't be able to resume from this cache anyway, so don't
557             # worry about further checks.
558             self._dependencies[source] = tuple(fd_stat)
559         elif path_stat is None:
560             raise errors.AssertionError(
561                 "could not stat {}: {}".format(source, stat_error))
562         elif path_stat.st_ino != fd_stat.st_ino:
563             raise errors.AssertionError(
564                 "{} changed between open and stat calls".format(source))
565         else:
566             self._dependencies[src_path] = tuple(fd_stat)
567
568     def write(self, data):
569         if self._queued_file is None:
570             raise errors.AssertionError(
571                 "resumable writer can't accept unsourced data")
572         return super(ResumableCollectionWriter, self).write(data)