Merge remote-tracking branch 'origin/master' into 2042-new-collection-from-selected...
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from keep import *
22 from stream import *
23 import config
24 import errors
25
26 def normalize_stream(s, stream):
27     stream_tokens = [s]
28     sortedfiles = list(stream.keys())
29     sortedfiles.sort()
30
31     blocks = {}
32     streamoffset = 0L
33     for f in sortedfiles:
34         for b in stream[f]:
35             if b[arvados.LOCATOR] not in blocks:
36                 stream_tokens.append(b[arvados.LOCATOR])
37                 blocks[b[arvados.LOCATOR]] = streamoffset
38                 streamoffset += b[arvados.BLOCKSIZE]
39
40     for f in sortedfiles:
41         current_span = None
42         fout = f.replace(' ', '\\040')
43         for segment in stream[f]:
44             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
45             if current_span == None:
46                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
47             else:
48                 if segmentoffset == current_span[1]:
49                     current_span[1] += segment[arvados.SEGMENTSIZE]
50                 else:
51                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
52                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
53
54         if current_span != None:
55             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
56
57         if len(stream[f]) == 0:
58             stream_tokens.append("0:0:{0}".format(fout))            
59
60     return stream_tokens    
61
62 def normalize(collection):
63     streams = {}
64     for s in collection.all_streams():
65         for f in s.all_files():
66             filestream = s.name() + "/" + f.name()
67             r = filestream.rindex("/")
68             streamname = filestream[:r]
69             filename = filestream[r+1:]
70             if streamname not in streams:
71                 streams[streamname] = {}
72             if filename not in streams[streamname]:
73                 streams[streamname][filename] = []
74             for r in f.segments:
75                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
76
77     normalized_streams = []
78     sortedstreams = list(streams.keys())
79     sortedstreams.sort()
80     for s in sortedstreams:
81         normalized_streams.append(normalize_stream(s, streams[s]))
82     return normalized_streams
83
84
85 class CollectionReader(object):
86     def __init__(self, manifest_locator_or_text):
87         if re.search(r'^[a-f0-9]{32}\+\d+(\+\S)*$', manifest_locator_or_text):
88             self._manifest_locator = manifest_locator_or_text
89             self._manifest_text = None
90         else:
91             self._manifest_text = manifest_locator_or_text
92             self._manifest_locator = None
93         self._streams = None
94
95     def __enter__(self):
96         pass
97
98     def __exit__(self):
99         pass
100
101     def _populate(self):
102         if self._streams != None:
103             return
104         if not self._manifest_text:
105             try:
106                 c = arvados.api('v1').collections().get(
107                     uuid=self._manifest_locator).execute()
108                 self._manifest_text = c['manifest_text']
109             except Exception as e:
110                 logging.warning("API lookup failed for collection %s (%s: %s)" %
111                                 (self._manifest_locator, type(e), str(e)))
112                 self._manifest_text = Keep.get(self._manifest_locator)
113         self._streams = []
114         for stream_line in self._manifest_text.split("\n"):
115             if stream_line != '':
116                 stream_tokens = stream_line.split()
117                 self._streams += [stream_tokens]
118         self._streams = normalize(self)
119
120         # now regenerate the manifest text based on the normalized stream
121
122         #print "normalizing", self._manifest_text
123         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
124         #print "result", self._manifest_text
125
126
127     def all_streams(self):
128         self._populate()
129         resp = []
130         for s in self._streams:
131             resp.append(StreamReader(s))
132         return resp
133
134     def all_files(self):
135         for s in self.all_streams():
136             for f in s.all_files():
137                 yield f
138
139     def manifest_text(self):
140         self._populate()
141         return self._manifest_text
142
143 class CollectionWriter(object):
144     KEEP_BLOCK_SIZE = 2**26
145
146     def __init__(self):
147         self._data_buffer = []
148         self._data_buffer_len = 0
149         self._current_stream_files = []
150         self._current_stream_length = 0
151         self._current_stream_locators = []
152         self._current_stream_name = '.'
153         self._current_file_name = None
154         self._current_file_pos = 0
155         self._finished_streams = []
156
157     def __enter__(self):
158         pass
159
160     def __exit__(self):
161         self.finish()
162
163     def write_directory_tree(self,
164                              path, stream_name='.', max_manifest_depth=-1):
165         self.start_new_stream(stream_name)
166         todo = []
167         if max_manifest_depth == 0:
168             dirents = sorted(util.listdir_recursive(path))
169         else:
170             dirents = sorted(os.listdir(path))
171         for dirent in dirents:
172             target = os.path.join(path, dirent)
173             if os.path.isdir(target):
174                 todo += [[target,
175                           os.path.join(stream_name, dirent),
176                           max_manifest_depth-1]]
177             else:
178                 self.start_new_file(dirent)
179                 with open(target, 'rb') as f:
180                     while True:
181                         buf = f.read(2**26)
182                         if len(buf) == 0:
183                             break
184                         self.write(buf)
185         self.finish_current_stream()
186         map(lambda x: self.write_directory_tree(*x), todo)
187
188     def write(self, newdata):
189         if hasattr(newdata, '__iter__'):
190             for s in newdata:
191                 self.write(s)
192             return
193         self._data_buffer += [newdata]
194         self._data_buffer_len += len(newdata)
195         self._current_stream_length += len(newdata)
196         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
197             self.flush_data()
198
199     def flush_data(self):
200         data_buffer = ''.join(self._data_buffer)
201         if data_buffer != '':
202             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
203             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
204             self._data_buffer_len = len(self._data_buffer[0])
205
206     def start_new_file(self, newfilename=None):
207         self.finish_current_file()
208         self.set_current_file_name(newfilename)
209
210     def set_current_file_name(self, newfilename):
211         if re.search(r'[\t\n]', newfilename):
212             raise errors.AssertionError(
213                 "Manifest filenames cannot contain whitespace: %s" %
214                 newfilename)
215         self._current_file_name = newfilename
216
217     def current_file_name(self):
218         return self._current_file_name
219
220     def finish_current_file(self):
221         if self._current_file_name == None:
222             if self._current_file_pos == self._current_stream_length:
223                 return
224             raise errors.AssertionError(
225                 "Cannot finish an unnamed file " +
226                 "(%d bytes at offset %d in '%s' stream)" %
227                 (self._current_stream_length - self._current_file_pos,
228                  self._current_file_pos,
229                  self._current_stream_name))
230         self._current_stream_files += [[self._current_file_pos,
231                                        self._current_stream_length - self._current_file_pos,
232                                        self._current_file_name]]
233         self._current_file_pos = self._current_stream_length
234
235     def start_new_stream(self, newstreamname='.'):
236         self.finish_current_stream()
237         self.set_current_stream_name(newstreamname)
238
239     def set_current_stream_name(self, newstreamname):
240         if re.search(r'[\t\n]', newstreamname):
241             raise errors.AssertionError(
242                 "Manifest stream names cannot contain whitespace")
243         self._current_stream_name = '.' if newstreamname=='' else newstreamname
244
245     def current_stream_name(self):
246         return self._current_stream_name
247
248     def finish_current_stream(self):
249         self.finish_current_file()
250         self.flush_data()
251         if len(self._current_stream_files) == 0:
252             pass
253         elif self._current_stream_name == None:
254             raise errors.AssertionError(
255                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
256                 (self._current_stream_length, len(self._current_stream_files)))
257         else:
258             if len(self._current_stream_locators) == 0:
259                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
260             self._finished_streams += [[self._current_stream_name,
261                                        self._current_stream_locators,
262                                        self._current_stream_files]]
263         self._current_stream_files = []
264         self._current_stream_length = 0
265         self._current_stream_locators = []
266         self._current_stream_name = None
267         self._current_file_pos = 0
268         self._current_file_name = None
269
270     def finish(self):
271         return Keep.put(self.manifest_text())
272
273     def manifest_text(self):
274         self.finish_current_stream()
275         manifest = ''
276
277         for stream in self._finished_streams:
278             if not re.search(r'^\.(/.*)?$', stream[0]):
279                 manifest += './'
280             manifest += stream[0].replace(' ', '\\040')
281             manifest += ' ' + ' '.join(stream[1])
282             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
283             manifest += "\n"
284         
285         #print 'writer',manifest
286         #print 'after reader',CollectionReader(manifest).manifest_text()
287
288         return CollectionReader(manifest).manifest_text()
289
290     def data_locators(self):
291         ret = []
292         for name, locators, files in self._finished_streams:
293             ret += locators
294         return ret