Fixing things up
[arvados.git] / sdk / python / arvados / stream.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from keep import *
22 import config
23 import errors
24
25 LOCATOR = 0
26 BLOCKSIZE = 1
27 OFFSET = 2
28 SEGMENTSIZE = 3
29
30 def locators_and_ranges(data_locators, range_start, range_size):
31     '''returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range'''
32     resp = []
33     range_start = long(range_start)
34     range_size = long(range_size)
35     range_end = range_start + range_size
36     block_start = 0L
37     for locator, block_size, block_start in data_locators:
38         block_end = block_start + block_size
39         if range_end < block_start:
40             # range ends before this block starts, so don't look at any more locators
41             break
42         if range_start > block_end:
43             # range starts after this block ends, so go to next block
44             next
45         elif range_start >= block_start and range_end <= block_end:
46             # range starts and ends in this block
47             resp.append([locator, block_size, range_start - block_start, range_size])
48         elif range_start >= block_start:
49             # range starts in this block
50             resp.append([locator, block_size, range_start - block_start, block_end - range_start])
51         elif range_start < block_start and range_end > block_end:
52             # range starts in a previous block and extends to further blocks
53             resp.append([locator, block_size, 0L, block_size])
54         elif range_start < block_start and range_end <= block_end:
55             # range starts in a previous block and ends in this block
56             resp.append([locator, block_size, 0L, range_end - block_start])
57         block_start = block_end
58     return resp
59
60
61 class StreamFileReader(object):
62     def __init__(self, stream, segments, name):
63         self._stream = stream
64         self.segments = segments
65         self._name = name
66         self._filepos = 0L
67
68     def name(self):
69         return self._name
70
71     def decompressed_name(self):
72         return re.sub('\.(bz2|gz)$', '', self._name)
73
74     def stream_name(self):
75         return self._stream.name()
76
77     def seek(self, pos):
78         self._filepos = min(max(pos, 0L), self.size())
79
80     def tell(self, pos):
81         return self._filepos
82
83     def size(self):
84         n = self.segments[-1]
85         return n[OFFSET] + n[BLOCKSIZE]
86
87     def read(self, size):
88         """Read up to 'size' bytes from the stream, starting at the current file position"""
89         if size == 0:
90             return ''
91
92         data = ''
93         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
94             self._stream.seek(locator+segmentoffset)
95             data += self._stream.read(segmentsize)
96             self._filepos += len(data)
97         return data
98
99     def readall(self, size=2**20):
100         while True:
101             data = self.read(size)
102             if data == '':
103                 break
104             yield data
105
106     def bunzip2(self, size):
107         decompressor = bz2.BZ2Decompressor()
108         for segment in self.readall(size):
109             data = decompressor.decompress(segment)
110             if data and data != '':
111                 yield data
112
113     def gunzip(self, size):
114         decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
115         for segment in self.readall(size):
116             data = decompressor.decompress(decompressor.unconsumed_tail + segment)
117             if data and data != '':
118                 yield data
119
120     def readall_decompressed(self, size=2**20):
121         self.seek(0)
122         if re.search('\.bz2$', self._name):
123             return self.bunzip2(size)
124         elif re.search('\.gz$', self._name):
125             return self.gunzip(size)
126         else:
127             return self.readall(size)
128
129     def readlines(self, decompress=True):
130         if decompress:
131             datasource = self.readall_decompressed()
132         else:
133             self._stream.seek(self._pos + self._filepos)
134             datasource = self.readall()
135         data = ''
136         for newdata in datasource:
137             data += newdata
138             sol = 0
139             while True:
140                 eol = string.find(data, "\n", sol)
141                 if eol < 0:
142                     break
143                 yield data[sol:eol+1]
144                 sol = eol+1
145             data = data[sol:]
146         if data != '':
147             yield data
148
149
150 class StreamReader(object):
151     def __init__(self, tokens):
152         self._tokens = tokens
153         self._pos = 0L
154
155         self._stream_name = None
156         self.data_locators = []
157         self.files = {}
158
159         streamoffset = 0L
160
161         for tok in self._tokens:
162             if self._stream_name == None:
163                 self._stream_name = tok.replace('\\040', ' ')
164                 continue
165
166             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
167             if s:
168                 blocksize = long(s.group(1))
169                 self.data_locators.append([tok, blocksize, streamoffset])
170                 streamoffset += blocksize
171                 continue
172
173             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
174             if s:
175                 pos = long(s.group(1))
176                 size = long(s.group(2))
177                 name = s.group(3).replace('\\040', ' ')
178                 if name not in self.files:
179                     self.files[name] = StreamFileReader(self, [[pos, size, 0]], name)
180                 else:
181                     n = self.files[name]
182                     n.segments.append([pos, size, n.size()])
183                 continue
184
185             raise errors.SyntaxError("Invalid manifest format")
186             
187     def tokens(self):
188         return self._tokens
189
190     def name(self):
191         return self._stream_name
192
193     def all_files(self):
194         return self.files.values()
195
196     def seek(self, pos):
197         """Set the position of the next read operation."""
198         self._pos = pos
199
200     def tell(self):
201         return self._pos
202
203     def size(self):
204         n = self.data_locators[-1]
205         return n[self.OFFSET] + n[self.BLOCKSIZE]
206
207     def locators_and_ranges(self, range_start, range_size):
208         return locators_and_ranges(self.data_locators, range_start, range_size)
209
210     def read(self, size):
211         """Read up to 'size' bytes from the stream, starting at the current file position"""
212         if size == 0:
213             return ''
214         data = ''
215         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.data_locators, self._pos, size):
216             data += Keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
217         self._pos += len(data)
218         return data