from collections import deque
from stat import *
-from .arvfile import ArvadosFileBase
+from .arvfile import ArvadosFileBase, split, ArvadosFile
from keep import *
-from .stream import StreamReader, split
+from .stream import StreamReader, normalize_stream
import config
import errors
import util
_logger = logging.getLogger('arvados.collection')
-def normalize_stream(s, stream):
- stream_tokens = [s]
- sortedfiles = list(stream.keys())
- sortedfiles.sort()
-
- blocks = {}
- streamoffset = 0L
- for f in sortedfiles:
- for b in stream[f]:
- if b[arvados.LOCATOR] not in blocks:
- stream_tokens.append(b[arvados.LOCATOR])
- blocks[b[arvados.LOCATOR]] = streamoffset
- streamoffset += b[arvados.BLOCKSIZE]
-
- if len(stream_tokens) == 1:
- stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
-
- for f in sortedfiles:
- current_span = None
- fout = f.replace(' ', '\\040')
- for segment in stream[f]:
- segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
- if current_span is None:
- current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
- else:
- if segmentoffset == current_span[1]:
- current_span[1] += segment[arvados.SEGMENTSIZE]
- else:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
- current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
-
- if current_span is not None:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
-
- if not stream[f]:
- stream_tokens.append("0:0:{0}".format(fout))
-
- return stream_tokens
-
-
class CollectionBase(object):
def __enter__(self):
return self
class CollectionWriter(CollectionBase):
- KEEP_BLOCK_SIZE = 2**26
-
def __init__(self, api_client=None, num_retries=0):
"""Instantiate a CollectionWriter.
def _work_file(self):
while True:
- buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
+ buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
if not buf:
break
self.write(buf)
self._data_buffer.append(newdata)
self._data_buffer_len += len(newdata)
self._current_stream_length += len(newdata)
- while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
+ while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
self.flush_data()
def open(self, streampath, filename=None):
data_buffer = ''.join(self._data_buffer)
if data_buffer:
self._current_stream_locators.append(
- self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
- self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
+ self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
+ self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
def start_new_file(self, newfilename=None):
raise errors.AssertionError(
"resumable writer can't accept unsourced data")
return super(ResumableCollectionWriter, self).write(data)
+
+
+class Collection(object):
+ def __init__(self):
+ self.items = {}
+
+ def find_or_create(self, path):
+ p = path.split("/")
+ if p[0] == '.':
+ del p[0]
+
+ if len(p) > 0:
+ item = self.items.get(p[0])
+ if len(p) == 1:
+ # item must be a file
+ if item is None:
+ # create new file
+ item = ArvadosFile(p[0], 'wb', [], [])
+ self.items[p[0]] = item
+ return item
+ else:
+ if item is None:
+ # create new collection
+ item = Collection()
+ self.items[p[0]] = item
+ del p[0]
+ return item.find_or_create("/".join(p))
+ else:
+ return self
+
+
+def import_manifest(manifest_text):
+ c = Collection()
+
+ STREAM_NAME = 0
+ BLOCKS = 1
+ SEGMENTS = 2
+
+ stream_name = None
+ state = STREAM_NAME
+
+ for n in re.finditer(r'([^ \n]+)([ \n])', manifest_text):
+ tok = n.group(1)
+ sep = n.group(2)
+ if state == STREAM_NAME:
+ # starting a new stream
+ stream_name = tok.replace('\\040', ' ')
+ blocks = []
+ segments = []
+ streamoffset = 0L
+ state = BLOCKS
+ continue
+
+ if state == BLOCKS:
+ s = re.match(r'[0-9a-f]{32}\+(\d)+(\+\S+)*', tok)
+ if s:
+ blocksize = long(s.group(1))
+ blocks.append([tok, blocksize, streamoffset])
+ streamoffset += blocksize
+ else:
+ state = SEGMENTS
+
+ if state == SEGMENTS:
+ s = re.search(r'^(\d+):(\d+):(\S+)', tok)
+ if s:
+ pos = long(s.group(1))
+ size = long(s.group(2))
+ name = s.group(3).replace('\\040', ' ')
+ f = c.find_or_create("%s/%s" % (stream_name, name))
+ f.add_segment(blocks, pos, size)
+ else:
+ # error!
+ raise errors.SyntaxError("Invalid manifest format")
+
+ if sep == "\n":
+ stream_name = None
+ state = STREAM_NAME
+
+ return c
+
+def export_manifest(item, stream_name="."):
+ buf = ""
+ print item
+ if isinstance(item, Collection):
+ for i, j in item.items.values():
+ buf += export_manifest(j, stream_name)
+ else:
+ buf += stream_name
+ buf += " "
+ buf += item.segments
+ return buf