Merge branch 'master' of git.clinicalfuture.com:arvados
[arvados.git] / sdk / python / arvados / stream.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from keep import *
22 import config
23 import errors
24
25 class StreamFileReader(object):
26     def __init__(self, stream, pos, size, name):
27         self._stream = stream
28         self._pos = pos
29         self._size = size
30         self._name = name
31         self._filepos = 0
32
33     def name(self):
34         return self._name
35
36     def decompressed_name(self):
37         return re.sub('\.(bz2|gz)$', '', self._name)
38
39     def size(self):
40         return self._size
41
42     def stream_name(self):
43         return self._stream.name()
44
45     def read(self, size, **kwargs):
46         self._stream.seek(self._pos + self._filepos)
47         data = self._stream.read(min(size, self._size - self._filepos))
48         self._filepos += len(data)
49         return data
50
51     def readall(self, size=2**20, **kwargs):
52         while True:
53             data = self.read(size, **kwargs)
54             if data == '':
55                 break
56             yield data
57
58     def seek(self, pos):
59         self._filepos = pos
60
61     def bunzip2(self, size):
62         decompressor = bz2.BZ2Decompressor()
63         for chunk in self.readall(size):
64             data = decompressor.decompress(chunk)
65             if data and data != '':
66                 yield data
67
68     def gunzip(self, size):
69         decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
70         for chunk in self.readall(size):
71             data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
72             if data and data != '':
73                 yield data
74
75     def readall_decompressed(self, size=2**20):
76         self._stream.seek(self._pos + self._filepos)
77         if re.search('\.bz2$', self._name):
78             return self.bunzip2(size)
79         elif re.search('\.gz$', self._name):
80             return self.gunzip(size)
81         else:
82             return self.readall(size)
83
84     def readlines(self, decompress=True):
85         if decompress:
86             datasource = self.readall_decompressed()
87         else:
88             self._stream.seek(self._pos + self._filepos)
89             datasource = self.readall()
90         data = ''
91         for newdata in datasource:
92             data += newdata
93             sol = 0
94             while True:
95                 eol = string.find(data, "\n", sol)
96                 if eol < 0:
97                     break
98                 yield data[sol:eol+1]
99                 sol = eol+1
100             data = data[sol:]
101         if data != '':
102             yield data
103
104     def as_manifest(self):
105         if self.size() == 0:
106             return ("%s %s 0:0:%s\n"
107                     % (self._stream.name(), config.EMPTY_BLOCK_LOCATOR, self.name()))
108         return string.join(self._stream.tokens_for_range(self._pos, self._size),
109                            " ") + "\n"
110
111 class StreamReader(object):
112     def __init__(self, tokens):
113         self._tokens = tokens
114         self._current_datablock_data = None
115         self._current_datablock_pos = 0
116         self._current_datablock_index = -1
117         self._pos = 0
118
119         self._stream_name = None
120         self.data_locators = []
121         self.files = []
122
123         for tok in self._tokens:
124             if self._stream_name == None:
125                 self._stream_name = tok.replace('\\040', ' ')
126             elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
127                 self.data_locators += [tok]
128             elif re.search(r'^\d+:\d+:\S+', tok):
129                 pos, size, name = tok.split(':',2)
130                 self.files += [[int(pos), int(size), name.replace('\\040', ' ')]]
131             else:
132                 raise errors.SyntaxError("Invalid manifest format")
133
134     def tokens(self):
135         return self._tokens
136
137     def tokens_for_range(self, range_start, range_size):
138         resp = [self._stream_name]
139         return_all_tokens = False
140         block_start = 0
141         token_bytes_skipped = 0
142         for locator in self.data_locators:
143             sizehint = re.search(r'\+(\d+)', locator)
144             if not sizehint:
145                 return_all_tokens = True
146             if return_all_tokens:
147                 resp += [locator]
148                 next
149             blocksize = int(sizehint.group(0))
150             if range_start + range_size <= block_start:
151                 break
152             if range_start < block_start + blocksize:
153                 resp += [locator]
154             else:
155                 token_bytes_skipped += blocksize
156             block_start += blocksize
157         for f in self.files:
158             if ((f[0] < range_start + range_size)
159                 and
160                 (f[0] + f[1] > range_start)
161                 and
162                 f[1] > 0):
163                 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
164         return resp
165
166     def name(self):
167         return self._stream_name
168
169     def all_files(self):
170         for f in self.files:
171             pos, size, name = f
172             yield StreamFileReader(self, pos, size, name)
173
174     def nextdatablock(self):
175         if self._current_datablock_index < 0:
176             self._current_datablock_pos = 0
177             self._current_datablock_index = 0
178         else:
179             self._current_datablock_pos += self.current_datablock_size()
180             self._current_datablock_index += 1
181         self._current_datablock_data = None
182
183     def current_datablock_data(self):
184         if self._current_datablock_data == None:
185             self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
186         return self._current_datablock_data
187
188     def current_datablock_size(self):
189         if self._current_datablock_index < 0:
190             self.nextdatablock()
191         sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
192         if sizehint:
193             return int(sizehint.group(0))
194         return len(self.current_datablock_data())
195
196     def seek(self, pos):
197         """Set the position of the next read operation."""
198         self._pos = pos
199
200     def really_seek(self):
201         """Find and load the appropriate data block, so the byte at
202         _pos is in memory.
203         """
204         if self._pos == self._current_datablock_pos:
205             return True
206         if (self._current_datablock_pos != None and
207             self._pos >= self._current_datablock_pos and
208             self._pos <= self._current_datablock_pos + self.current_datablock_size()):
209             return True
210         if self._pos < self._current_datablock_pos:
211             self._current_datablock_index = -1
212             self.nextdatablock()
213         while (self._pos > self._current_datablock_pos and
214                self._pos > self._current_datablock_pos + self.current_datablock_size()):
215             self.nextdatablock()
216
217     def read(self, size):
218         """Read no more than size bytes -- but at least one byte,
219         unless _pos is already at the end of the stream.
220         """
221         if size == 0:
222             return ''
223         self.really_seek()
224         while self._pos >= self._current_datablock_pos + self.current_datablock_size():
225             self.nextdatablock()
226             if self._current_datablock_index >= len(self.data_locators):
227                 return None
228         data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
229         self._pos += len(data)
230         return data