projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
11308: Fix bytes vs. str problems.
[arvados.git]
/
sdk
/
python
/
arvados
/
arvfile.py
diff --git
a/sdk/python/arvados/arvfile.py
b/sdk/python/arvados/arvfile.py
index 9db19b05f6bc356c2b673d4983551b2dceef7122..c6cb1c91cc9469f8d8f2df6e37296104c886d705 100644
(file)
--- a/
sdk/python/arvados/arvfile.py
+++ b/
sdk/python/arvados/arvfile.py
@@
-1,11
+1,17
@@
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+standard_library.install_aliases()
+from builtins import range
+from builtins import object
import functools
import os
import zlib
import bz2
import functools
import os
import zlib
import bz2
-import config
+
from .
import config
import hashlib
import threading
import hashlib
import threading
-import
Q
ueue
+import
q
ueue
import copy
import errno
import re
import copy
import errno
import re
@@
-76,7
+82,7
@@
class _FileLikeObjectBase(object):
class ArvadosFileReaderBase(_FileLikeObjectBase):
def __init__(self, name, mode, num_retries=None):
super(ArvadosFileReaderBase, self).__init__(name, mode)
class ArvadosFileReaderBase(_FileLikeObjectBase):
def __init__(self, name, mode, num_retries=None):
super(ArvadosFileReaderBase, self).__init__(name, mode)
- self._filepos = 0
L
+ self._filepos = 0
self.num_retries = num_retries
self._readline_cache = (None, None)
self.num_retries = num_retries
self._readline_cache = (None, None)
@@
-96,7
+102,7
@@
class ArvadosFileReaderBase(_FileLikeObjectBase):
pos += self._filepos
elif whence == os.SEEK_END:
pos += self.size()
pos += self._filepos
elif whence == os.SEEK_END:
pos += self.size()
- self._filepos = min(max(pos, 0
L
), self.size())
+ self._filepos = min(max(pos, 0), self.size())
def tell(self):
return self._filepos
def tell(self):
return self._filepos
@@
-213,8
+219,8
@@
class StreamFileReader(ArvadosFileReaderBase):
if available_chunks:
lr = available_chunks[0]
data = self._stream.readfrom(lr.locator+lr.segment_offset,
if available_chunks:
lr = available_chunks[0]
data = self._stream.readfrom(lr.locator+lr.segment_offset,
-
lr.segment_size,
-
num_retries=num_retries)
+ lr.segment_size,
+ num_retries=num_retries)
self._filepos += len(data)
return data
self._filepos += len(data)
return data
@@
-507,10
+513,10
@@
class _BlockManager(object):
# blocks pending. If they are full 64 MiB blocks, that means up to
# 256 MiB of internal buffering, which is the same size as the
# default download block cache in KeepClient.
# blocks pending. If they are full 64 MiB blocks, that means up to
# 256 MiB of internal buffering, which is the same size as the
# default download block cache in KeepClient.
- self._put_queue =
Q
ueue.Queue(maxsize=2)
+ self._put_queue =
q
ueue.Queue(maxsize=2)
self._put_threads = []
self._put_threads = []
- for i in
x
range(0, self.num_put_threads):
+ for i in range(0, self.num_put_threads):
thread = threading.Thread(target=self._commit_bufferblock_worker)
self._put_threads.append(thread)
thread.daemon = True
thread = threading.Thread(target=self._commit_bufferblock_worker)
self._put_threads.append(thread)
thread.daemon = True
@@
-530,9
+536,9
@@
class _BlockManager(object):
@synchronized
def start_get_threads(self):
if self._prefetch_threads is None:
@synchronized
def start_get_threads(self):
if self._prefetch_threads is None:
- self._prefetch_queue =
Q
ueue.Queue()
+ self._prefetch_queue =
q
ueue.Queue()
self._prefetch_threads = []
self._prefetch_threads = []
- for i in
x
range(0, self.num_get_threads):
+ for i in range(0, self.num_get_threads):
thread = threading.Thread(target=self._block_prefetch_worker)
self._prefetch_threads.append(thread)
thread.daemon = True
thread = threading.Thread(target=self._block_prefetch_worker)
self._prefetch_threads.append(thread)
thread.daemon = True
@@
-578,7
+584,7
@@
class _BlockManager(object):
# A WRITABLE block with its owner.closed() implies that it's
# size is <= KEEP_BLOCK_SIZE/2.
try:
# A WRITABLE block with its owner.closed() implies that it's
# size is <= KEEP_BLOCK_SIZE/2.
try:
- small_blocks = [b for b in
self._bufferblocks.values(
) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ small_blocks = [b for b in
list(self._bufferblocks.values()
) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
except AttributeError:
# Writable blocks without owner shouldn't exist.
raise UnownedBlockError()
except AttributeError:
# Writable blocks without owner shouldn't exist.
raise UnownedBlockError()
@@
-691,7
+697,7
@@
class _BlockManager(object):
self.repack_small_blocks(force=True, sync=True)
with self.lock:
self.repack_small_blocks(force=True, sync=True)
with self.lock:
- items =
self._bufferblocks.items(
)
+ items =
list(self._bufferblocks.items()
)
for k,v in items:
if v.state() != _BufferBlock.COMMITTED and v.owner:
for k,v in items:
if v.state() != _BufferBlock.COMMITTED and v.owner:
@@
-823,7
+829,7
@@
class ArvadosFile(object):
with self.lock:
if len(self._segments) != len(othersegs):
return False
with self.lock:
if len(self._segments) != len(othersegs):
return False
- for i in
x
range(0, len(othersegs)):
+ for i in range(0, len(othersegs)):
seg1 = self._segments[i]
seg2 = othersegs[i]
loc1 = seg1.locator
seg1 = self._segments[i]
seg2 = othersegs[i]
loc1 = seg1.locator
@@
-883,7
+889,7
@@
class ArvadosFile(object):
"""
self._writers.remove(writer)
"""
self._writers.remove(writer)
- if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
+ if flush or self.size() > config.KEEP_BLOCK_SIZE /
/
2:
# File writer closed, not small enough for repacking
self.flush()
elif self.closed():
# File writer closed, not small enough for repacking
self.flush()
elif self.closed():