Fix CollectionReader treating any collection with additional +hints as
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from keep import *
22 from stream import *
23 import config
24 import errors
25
26 def normalize_stream(s, stream):
27     stream_tokens = [s]
28     sortedfiles = list(stream.keys())
29     sortedfiles.sort()
30
31     blocks = {}
32     streamoffset = 0L
33     for f in sortedfiles:
34         for b in stream[f]:
35             if b[arvados.LOCATOR] not in blocks:
36                 stream_tokens.append(b[arvados.LOCATOR])
37                 blocks[b[arvados.LOCATOR]] = streamoffset
38                 streamoffset += b[arvados.BLOCKSIZE]
39
40     for f in sortedfiles:
41         current_span = None
42         fout = f.replace(' ', '\\040')
43         for segment in stream[f]:
44             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
45             if current_span == None:
46                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
47             else:
48                 if segmentoffset == current_span[1]:
49                     current_span[1] += segment[arvados.SEGMENTSIZE]
50                 else:
51                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
52                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
53
54         if current_span != None:
55             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
56
57         if len(stream[f]) == 0:
58             stream_tokens.append("0:0:{0}".format(fout))            
59
60     return stream_tokens    
61
62 def normalize(collection):
63     streams = {}
64     for s in collection.all_streams():
65         for f in s.all_files():
66             filestream = s.name() + "/" + f.name()
67             r = filestream.rindex("/")
68             streamname = filestream[:r]
69             filename = filestream[r+1:]
70             if streamname not in streams:
71                 streams[streamname] = {}
72             if filename not in streams[streamname]:
73                 streams[streamname][filename] = []
74             for r in f.segments:
75                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
76
77     normalized_streams = []
78     sortedstreams = list(streams.keys())
79     sortedstreams.sort()
80     for s in sortedstreams:
81         normalized_streams.append(normalize_stream(s, streams[s]))
82     return normalized_streams
83
84
85 class CollectionReader(object):
86     def __init__(self, manifest_locator_or_text):
87         if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
88             self._manifest_locator = manifest_locator_or_text
89             self._manifest_text = None
90         elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
91             self._manifest_text = manifest_locator_or_text
92             self._manifest_locator = None
93         else:
94             raise errors.ArgumentError(
95                 "Argument to CollectionReader must be a manifest or a collection UUID")
96         self._streams = None
97
98     def __enter__(self):
99         pass
100
101     def __exit__(self):
102         pass
103
104     def _populate(self):
105         if self._streams != None:
106             return
107         if not self._manifest_text:
108             try:
109                 c = arvados.api('v1').collections().get(
110                     uuid=self._manifest_locator).execute()
111                 self._manifest_text = c['manifest_text']
112             except Exception as e:
113                 logging.warning("API lookup failed for collection %s (%s: %s)" %
114                                 (self._manifest_locator, type(e), str(e)))
115                 self._manifest_text = Keep.get(self._manifest_locator)
116         self._streams = []
117         for stream_line in self._manifest_text.split("\n"):
118             if stream_line != '':
119                 stream_tokens = stream_line.split()
120                 self._streams += [stream_tokens]
121         self._streams = normalize(self)
122
123         # now regenerate the manifest text based on the normalized stream
124
125         #print "normalizing", self._manifest_text
126         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
127         #print "result", self._manifest_text
128
129
130     def all_streams(self):
131         self._populate()
132         resp = []
133         for s in self._streams:
134             resp.append(StreamReader(s))
135         return resp
136
137     def all_files(self):
138         for s in self.all_streams():
139             for f in s.all_files():
140                 yield f
141
142     def manifest_text(self):
143         self._populate()
144         return self._manifest_text
145
146 class CollectionWriter(object):
147     KEEP_BLOCK_SIZE = 2**26
148
149     def __init__(self):
150         self._data_buffer = []
151         self._data_buffer_len = 0
152         self._current_stream_files = []
153         self._current_stream_length = 0
154         self._current_stream_locators = []
155         self._current_stream_name = '.'
156         self._current_file_name = None
157         self._current_file_pos = 0
158         self._finished_streams = []
159
160     def __enter__(self):
161         pass
162
163     def __exit__(self):
164         self.finish()
165
166     def write_directory_tree(self,
167                              path, stream_name='.', max_manifest_depth=-1):
168         self.start_new_stream(stream_name)
169         todo = []
170         if max_manifest_depth == 0:
171             dirents = sorted(util.listdir_recursive(path))
172         else:
173             dirents = sorted(os.listdir(path))
174         for dirent in dirents:
175             target = os.path.join(path, dirent)
176             if os.path.isdir(target):
177                 todo += [[target,
178                           os.path.join(stream_name, dirent),
179                           max_manifest_depth-1]]
180             else:
181                 self.start_new_file(dirent)
182                 with open(target, 'rb') as f:
183                     while True:
184                         buf = f.read(2**26)
185                         if len(buf) == 0:
186                             break
187                         self.write(buf)
188         self.finish_current_stream()
189         map(lambda x: self.write_directory_tree(*x), todo)
190
191     def write(self, newdata):
192         if hasattr(newdata, '__iter__'):
193             for s in newdata:
194                 self.write(s)
195             return
196         self._data_buffer += [newdata]
197         self._data_buffer_len += len(newdata)
198         self._current_stream_length += len(newdata)
199         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
200             self.flush_data()
201
202     def flush_data(self):
203         data_buffer = ''.join(self._data_buffer)
204         if data_buffer != '':
205             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
206             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
207             self._data_buffer_len = len(self._data_buffer[0])
208
209     def start_new_file(self, newfilename=None):
210         self.finish_current_file()
211         self.set_current_file_name(newfilename)
212
213     def set_current_file_name(self, newfilename):
214         if re.search(r'[\t\n]', newfilename):
215             raise errors.AssertionError(
216                 "Manifest filenames cannot contain whitespace: %s" %
217                 newfilename)
218         self._current_file_name = newfilename
219
220     def current_file_name(self):
221         return self._current_file_name
222
223     def finish_current_file(self):
224         if self._current_file_name == None:
225             if self._current_file_pos == self._current_stream_length:
226                 return
227             raise errors.AssertionError(
228                 "Cannot finish an unnamed file " +
229                 "(%d bytes at offset %d in '%s' stream)" %
230                 (self._current_stream_length - self._current_file_pos,
231                  self._current_file_pos,
232                  self._current_stream_name))
233         self._current_stream_files += [[self._current_file_pos,
234                                        self._current_stream_length - self._current_file_pos,
235                                        self._current_file_name]]
236         self._current_file_pos = self._current_stream_length
237
238     def start_new_stream(self, newstreamname='.'):
239         self.finish_current_stream()
240         self.set_current_stream_name(newstreamname)
241
242     def set_current_stream_name(self, newstreamname):
243         if re.search(r'[\t\n]', newstreamname):
244             raise errors.AssertionError(
245                 "Manifest stream names cannot contain whitespace")
246         self._current_stream_name = '.' if newstreamname=='' else newstreamname
247
248     def current_stream_name(self):
249         return self._current_stream_name
250
251     def finish_current_stream(self):
252         self.finish_current_file()
253         self.flush_data()
254         if len(self._current_stream_files) == 0:
255             pass
256         elif self._current_stream_name == None:
257             raise errors.AssertionError(
258                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
259                 (self._current_stream_length, len(self._current_stream_files)))
260         else:
261             if len(self._current_stream_locators) == 0:
262                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
263             self._finished_streams += [[self._current_stream_name,
264                                        self._current_stream_locators,
265                                        self._current_stream_files]]
266         self._current_stream_files = []
267         self._current_stream_length = 0
268         self._current_stream_locators = []
269         self._current_stream_name = None
270         self._current_file_pos = 0
271         self._current_file_name = None
272
273     def finish(self):
274         return Keep.put(self.manifest_text())
275
276     def manifest_text(self):
277         self.finish_current_stream()
278         manifest = ''
279
280         for stream in self._finished_streams:
281             if not re.search(r'^\.(/.*)?$', stream[0]):
282                 manifest += './'
283             manifest += stream[0].replace(' ', '\\040')
284             manifest += ' ' + ' '.join(stream[1])
285             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
286             manifest += "\n"
287         
288         #print 'writer',manifest
289         #print 'after reader',CollectionReader(manifest).manifest_text()
290
291         return CollectionReader(manifest).manifest_text()
292
293     def data_locators(self):
294         ret = []
295         for name, locators, files in self._finished_streams:
296             ret += locators
297         return ret