Merge branch 'master' into 2352-use-state
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from keep import *
22 from stream import *
23 import config
24 import errors
25 import util
26
27 def normalize_stream(s, stream):
28     stream_tokens = [s]
29     sortedfiles = list(stream.keys())
30     sortedfiles.sort()
31
32     blocks = {}
33     streamoffset = 0L
34     for f in sortedfiles:
35         for b in stream[f]:
36             if b[arvados.LOCATOR] not in blocks:
37                 stream_tokens.append(b[arvados.LOCATOR])
38                 blocks[b[arvados.LOCATOR]] = streamoffset
39                 streamoffset += b[arvados.BLOCKSIZE]
40
41     for f in sortedfiles:
42         current_span = None
43         fout = f.replace(' ', '\\040')
44         for segment in stream[f]:
45             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
46             if current_span == None:
47                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
48             else:
49                 if segmentoffset == current_span[1]:
50                     current_span[1] += segment[arvados.SEGMENTSIZE]
51                 else:
52                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
53                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
54
55         if current_span != None:
56             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
57
58         if len(stream[f]) == 0:
59             stream_tokens.append("0:0:{0}".format(fout))            
60
61     return stream_tokens    
62
63 def normalize(collection):
64     streams = {}
65     for s in collection.all_streams():
66         for f in s.all_files():
67             filestream = s.name() + "/" + f.name()
68             r = filestream.rindex("/")
69             streamname = filestream[:r]
70             filename = filestream[r+1:]
71             if streamname not in streams:
72                 streams[streamname] = {}
73             if filename not in streams[streamname]:
74                 streams[streamname][filename] = []
75             for r in f.segments:
76                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
77
78     normalized_streams = []
79     sortedstreams = list(streams.keys())
80     sortedstreams.sort()
81     for s in sortedstreams:
82         normalized_streams.append(normalize_stream(s, streams[s]))
83     return normalized_streams
84
85
86 class CollectionReader(object):
87     def __init__(self, manifest_locator_or_text):
88         if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
89             self._manifest_locator = manifest_locator_or_text
90             self._manifest_text = None
91         elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
92             self._manifest_text = manifest_locator_or_text
93             self._manifest_locator = None
94         else:
95             raise errors.ArgumentError(
96                 "Argument to CollectionReader must be a manifest or a collection UUID")
97         self._streams = None
98
99     def __enter__(self):
100         pass
101
102     def __exit__(self):
103         pass
104
105     def _populate(self):
106         if self._streams != None:
107             return
108         if not self._manifest_text:
109             try:
110                 c = arvados.api('v1').collections().get(
111                     uuid=self._manifest_locator).execute()
112                 self._manifest_text = c['manifest_text']
113             except Exception as e:
114                 logging.warning("API lookup failed for collection %s (%s: %s)" %
115                                 (self._manifest_locator, type(e), str(e)))
116                 self._manifest_text = Keep.get(self._manifest_locator)
117         self._streams = []
118         for stream_line in self._manifest_text.split("\n"):
119             if stream_line != '':
120                 stream_tokens = stream_line.split()
121                 self._streams += [stream_tokens]
122         self._streams = normalize(self)
123
124         # now regenerate the manifest text based on the normalized stream
125
126         #print "normalizing", self._manifest_text
127         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
128         #print "result", self._manifest_text
129
130
131     def all_streams(self):
132         self._populate()
133         resp = []
134         for s in self._streams:
135             resp.append(StreamReader(s))
136         return resp
137
138     def all_files(self):
139         for s in self.all_streams():
140             for f in s.all_files():
141                 yield f
142
143     def manifest_text(self):
144         self._populate()
145         return self._manifest_text
146
147 class CollectionWriter(object):
148     KEEP_BLOCK_SIZE = 2**26
149
150     def __init__(self):
151         self._data_buffer = []
152         self._data_buffer_len = 0
153         self._current_stream_files = []
154         self._current_stream_length = 0
155         self._current_stream_locators = []
156         self._current_stream_name = '.'
157         self._current_file_name = None
158         self._current_file_pos = 0
159         self._finished_streams = []
160
161     def __enter__(self):
162         pass
163
164     def __exit__(self):
165         self.finish()
166
167     def write_directory_tree(self,
168                              path, stream_name='.', max_manifest_depth=-1):
169         self.start_new_stream(stream_name)
170         todo = []
171         if max_manifest_depth == 0:
172             dirents = sorted(util.listdir_recursive(path))
173         else:
174             dirents = sorted(os.listdir(path))
175         for dirent in dirents:
176             target = os.path.join(path, dirent)
177             if os.path.isdir(target):
178                 todo += [[target,
179                           os.path.join(stream_name, dirent),
180                           max_manifest_depth-1]]
181             else:
182                 self.start_new_file(dirent)
183                 with open(target, 'rb') as f:
184                     while True:
185                         buf = f.read(2**26)
186                         if len(buf) == 0:
187                             break
188                         self.write(buf)
189         self.finish_current_stream()
190         map(lambda x: self.write_directory_tree(*x), todo)
191
192     def write(self, newdata):
193         if hasattr(newdata, '__iter__'):
194             for s in newdata:
195                 self.write(s)
196             return
197         self._data_buffer += [newdata]
198         self._data_buffer_len += len(newdata)
199         self._current_stream_length += len(newdata)
200         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
201             self.flush_data()
202
203     def flush_data(self):
204         data_buffer = ''.join(self._data_buffer)
205         if data_buffer != '':
206             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
207             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
208             self._data_buffer_len = len(self._data_buffer[0])
209
210     def start_new_file(self, newfilename=None):
211         self.finish_current_file()
212         self.set_current_file_name(newfilename)
213
214     def set_current_file_name(self, newfilename):
215         if re.search(r'[\t\n]', newfilename):
216             raise errors.AssertionError(
217                 "Manifest filenames cannot contain whitespace: %s" %
218                 newfilename)
219         self._current_file_name = newfilename
220
221     def current_file_name(self):
222         return self._current_file_name
223
224     def finish_current_file(self):
225         if self._current_file_name == None:
226             if self._current_file_pos == self._current_stream_length:
227                 return
228             raise errors.AssertionError(
229                 "Cannot finish an unnamed file " +
230                 "(%d bytes at offset %d in '%s' stream)" %
231                 (self._current_stream_length - self._current_file_pos,
232                  self._current_file_pos,
233                  self._current_stream_name))
234         self._current_stream_files += [[self._current_file_pos,
235                                        self._current_stream_length - self._current_file_pos,
236                                        self._current_file_name]]
237         self._current_file_pos = self._current_stream_length
238
239     def start_new_stream(self, newstreamname='.'):
240         self.finish_current_stream()
241         self.set_current_stream_name(newstreamname)
242
243     def set_current_stream_name(self, newstreamname):
244         if re.search(r'[\t\n]', newstreamname):
245             raise errors.AssertionError(
246                 "Manifest stream names cannot contain whitespace")
247         self._current_stream_name = '.' if newstreamname=='' else newstreamname
248
249     def current_stream_name(self):
250         return self._current_stream_name
251
252     def finish_current_stream(self):
253         self.finish_current_file()
254         self.flush_data()
255         if len(self._current_stream_files) == 0:
256             pass
257         elif self._current_stream_name == None:
258             raise errors.AssertionError(
259                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
260                 (self._current_stream_length, len(self._current_stream_files)))
261         else:
262             if len(self._current_stream_locators) == 0:
263                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
264             self._finished_streams += [[self._current_stream_name,
265                                        self._current_stream_locators,
266                                        self._current_stream_files]]
267         self._current_stream_files = []
268         self._current_stream_length = 0
269         self._current_stream_locators = []
270         self._current_stream_name = None
271         self._current_file_pos = 0
272         self._current_file_name = None
273
274     def finish(self):
275         return Keep.put(self.manifest_text())
276
277     def manifest_text(self):
278         self.finish_current_stream()
279         manifest = ''
280
281         for stream in self._finished_streams:
282             if not re.search(r'^\.(/.*)?$', stream[0]):
283                 manifest += './'
284             manifest += stream[0].replace(' ', '\\040')
285             manifest += ' ' + ' '.join(stream[1])
286             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
287             manifest += "\n"
288         
289         #print 'writer',manifest
290         #print 'after reader',CollectionReader(manifest).manifest_text()
291
292         return CollectionReader(manifest).manifest_text()
293
294     def data_locators(self):
295         ret = []
296         for name, locators, files in self._finished_streams:
297             ret += locators
298         return ret