24 class CollectionReader(object):
25 def __init__(self, manifest_locator_or_text):
26 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
27 self._manifest_text = manifest_locator_or_text
28 self._manifest_locator = None
30 self._manifest_locator = manifest_locator_or_text
31 self._manifest_text = None
41 if self._streams != None:
43 if not self._manifest_text:
44 self._manifest_text = Keep.get(self._manifest_locator)
46 for stream_line in self._manifest_text.split("\n"):
48 stream_tokens = stream_line.split()
49 self._streams += [stream_tokens]
51 def all_streams(self):
54 for s in self._streams:
55 resp += [StreamReader(s)]
59 for s in self.all_streams():
60 for f in s.all_files():
63 def manifest_text(self):
65 return self._manifest_text
67 class CollectionWriter(object):
68 KEEP_BLOCK_SIZE = 2**26
71 self._data_buffer = []
72 self._data_buffer_len = 0
73 self._current_stream_files = []
74 self._current_stream_length = 0
75 self._current_stream_locators = []
76 self._current_stream_name = '.'
77 self._current_file_name = None
78 self._current_file_pos = 0
79 self._finished_streams = []
87 def write_directory_tree(self,
88 path, stream_name='.', max_manifest_depth=-1):
89 self.start_new_stream(stream_name)
91 if max_manifest_depth == 0:
92 dirents = sorted(util.listdir_recursive(path))
94 dirents = sorted(os.listdir(path))
95 for dirent in dirents:
96 target = os.path.join(path, dirent)
97 if os.path.isdir(target):
99 os.path.join(stream_name, dirent),
100 max_manifest_depth-1]]
102 self.start_new_file(dirent)
103 with open(target, 'rb') as f:
109 self.finish_current_stream()
110 map(lambda x: self.write_directory_tree(*x), todo)
112 def write(self, newdata):
113 if hasattr(newdata, '__iter__'):
117 self._data_buffer += [newdata]
118 self._data_buffer_len += len(newdata)
119 self._current_stream_length += len(newdata)
120 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
123 def flush_data(self):
124 data_buffer = ''.join(self._data_buffer)
125 if data_buffer != '':
126 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
127 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
128 self._data_buffer_len = len(self._data_buffer[0])
130 def start_new_file(self, newfilename=None):
131 self.finish_current_file()
132 self.set_current_file_name(newfilename)
134 def set_current_file_name(self, newfilename):
135 if re.search(r'[\t\n]', newfilename):
136 raise errors.AssertionError(
137 "Manifest filenames cannot contain whitespace: %s" %
139 self._current_file_name = newfilename
141 def current_file_name(self):
142 return self._current_file_name
144 def finish_current_file(self):
145 if self._current_file_name == None:
146 if self._current_file_pos == self._current_stream_length:
148 raise errors.AssertionError(
149 "Cannot finish an unnamed file " +
150 "(%d bytes at offset %d in '%s' stream)" %
151 (self._current_stream_length - self._current_file_pos,
152 self._current_file_pos,
153 self._current_stream_name))
154 self._current_stream_files += [[self._current_file_pos,
155 self._current_stream_length - self._current_file_pos,
156 self._current_file_name]]
157 self._current_file_pos = self._current_stream_length
159 def start_new_stream(self, newstreamname='.'):
160 self.finish_current_stream()
161 self.set_current_stream_name(newstreamname)
163 def set_current_stream_name(self, newstreamname):
164 if re.search(r'[\t\n]', newstreamname):
165 raise errors.AssertionError(
166 "Manifest stream names cannot contain whitespace")
167 self._current_stream_name = '.' if newstreamname=='' else newstreamname
169 def current_stream_name(self):
170 return self._current_stream_name
172 def finish_current_stream(self):
173 self.finish_current_file()
175 if len(self._current_stream_files) == 0:
177 elif self._current_stream_name == None:
178 raise errors.AssertionError(
179 "Cannot finish an unnamed stream (%d bytes in %d files)" %
180 (self._current_stream_length, len(self._current_stream_files)))
182 if len(self._current_stream_locators) == 0:
183 self._current_stream_locators += [EMPTY_BLOCK_LOCATOR]
184 self._finished_streams += [[self._current_stream_name,
185 self._current_stream_locators,
186 self._current_stream_files]]
187 self._current_stream_files = []
188 self._current_stream_length = 0
189 self._current_stream_locators = []
190 self._current_stream_name = None
191 self._current_file_pos = 0
192 self._current_file_name = None
195 return Keep.put(self.manifest_text())
197 def manifest_text(self):
198 self.finish_current_stream()
200 for stream in self._finished_streams:
201 if not re.search(r'^\.(/.*)?$', stream[0]):
203 manifest += stream[0].replace(' ', '\\040')
204 for locator in stream[1]:
205 manifest += " %s" % locator
206 for sfile in stream[2]:
207 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
211 def data_locators(self):
213 for name, locators, files in self._finished_streams: