md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
realpaths = sorted(os.path.realpath(path) for path in paths)
self.files = {}
+ self.bytes_written = 0 # Approximate number of bytes already uploaded (partial uploaded files are counted in full)
for path in realpaths:
self._get_file_data(path)
# Only hash args paths
self._lock_file(self.cache_file)
self.filename = self.cache_file.name
self.data = self._load()
+ for f in self.data['uploaded'].values():
+ self.bytes_written += f['size']
def _load(self):
try:
"""
Atomically save
"""
- # TODO: Should be a good idea to avoid _save() spamming? when writing
- # lots of small files.
try:
new_cache_fd, new_cache_name = tempfile.mkstemp(
dir=os.path.dirname(self.filename))
self._lock_file(new_cache_fd)
new_cache = os.fdopen(new_cache_fd, 'r+')
json.dump(self.data, new_cache)
+ new_cache.flush()
+ os.fsync(new_cache)
os.rename(new_cache_name, self.filename)
except (IOError, OSError, ResumeCacheConflict) as error:
try:
self.cache_file = new_cache
def file_uploaded(self, path):
- print "About to register an uploaded file: %s" % path
if path in self.files.keys():
self.data['uploaded'][path] = self.files[path]
self._save()
- print "Already registered the uploaded file!"
def set_collection(self, loc):
self.data['col_locator'] = loc
raise
self.close()
-class ArvPutUploader(object):
- def __init__(self, paths, reporter=None):
- expected_bytes = expected_bytes_for(paths)
- self.cache = ArvPutCollectionCache(paths)
- self.paths = paths
- self.already_uploaded = False
- # if self.cache.collection() is not None:
- # self.collection = ArvPutCollection(
- # locator=self.cache.collection(),
- # cache=self.cache,
- # reporter=reporter,
- # bytes_expected=expected_bytes)
- # else:
- self.collection = ArvPutCollection(
- cache=self.cache,
- reporter=reporter,
- bytes_expected=expected_bytes)
- # self.cache.set_collection(self.collection.manifest_locator())
-
- def do_upload(self):
- if not self.already_uploaded:
- for p in paths:
- if os.path.isdir(p):
- self.collection.write_directory_tree(p)
- elif os.path.isfile(p):
- self.collection.write_file(p, os.path.basename(p))
- self.cache.destroy()
- self.already_uploaded = True
-
- def manifest(self):
- return self.collection.manifest()
-
- def bytes_written(self):
- return self.collection.bytes_written
-
class ArvPutCollection(object):
def __init__(self, cache=None, reporter=None, bytes_expected=None,
name=None, owner_uuid=None, ensure_unique_name=False,
- num_retries=None, replication=None):
+ num_retries=None, write_copies=None, replication=None,
+ should_save=True):
self.collection_flush_time = 60 # Secs
self.bytes_written = 0
+ self.bytes_skipped = 0
self.cache = cache
self.reporter = reporter
- self.num_retries=num_retries
+ self.num_retries = num_retries
+ self.write_copies = write_copies
+ self.replication = replication
self.bytes_expected = bytes_expected
+ self.should_save = should_save
locator = self.cache.collection() if self.cache else None
if locator is None:
- self.collection = arvados.collection.Collection()
- self.collection.save_new(name=name, owner_uuid=owner_uuid,
- ensure_unique_name=ensure_unique_name,
- num_retries=num_retries)
- if self.cache:
- self.cache.set_collection(self.collection.manifest_locator())
+ self.collection = arvados.collection.Collection(
+ num_write_copies=self.write_copies)
+ if self.should_save:
+ self.collection.save_new(name=name, owner_uuid=owner_uuid,
+ ensure_unique_name=ensure_unique_name,
+ num_retries=num_retries,
+ replication_desired=self.replication)
+ if self.cache:
+ self.cache.set_collection(self.collection.manifest_locator())
else:
- self.collection = arvados.collection.Collection(locator)
+ self.collection = arvados.collection.Collection(locator,
+ num_write_copies=self.write_copies)
+
+ def name(self):
+ return self.collection.api_response()['name'] if self.collection.api_response() else None
def save(self):
- self.collection.save(num_retries=self.num_retries)
+ if self.should_save:
+ self.collection.save(num_retries=self.num_retries)
def manifest_locator(self):
return self.collection.manifest_locator()
def portable_data_hash(self):
- return self.collectin.portable_data_hash()
+ return self.collection.portable_data_hash()
def manifest_text(self, stream_name=".", strip=False, normalize=False):
return self.collection.manifest_text(stream_name, strip, normalize)
output.flush() # Commit block to Keep
self.bytes_written += len(data)
# Is it time to update the collection?
- if (time.time() - start_time) > self.collection_flush_time:
+ if self.should_save and ((time.time() - start_time) > self.collection_flush_time):
self.collection.save(num_retries=self.num_retries)
start_time = time.time()
# Once a block is written on each file, mark it as uploaded on the cache
- if first_block:
+ if self.should_save and first_block:
if self.cache:
self.cache.file_uploaded(source_fd.name)
self.collection.save(num_retries=self.num_retries)
- print "FLUSHED COLLECTION!!!"
first_block = False
self.report_progress()
output = c.open(filename, 'w')
self._write(sys.stdin, output)
output.close()
- self.collection.save()
+ if self.should_save:
+ self.collection.save()
def write_file(self, source, filename):
if self.cache and source in self.cache.dirty_files():
- print "DIRTY: Removing file %s from collection to be uploaded again" % source
self.collection.remove(filename)
resume_offset = 0
resume_upload = False
try:
- print "FIND file %s" % filename
collection_file = self.collection.find(filename)
except IOError:
# Not found
collection_file = None
if collection_file:
- print "File %s already in the collection, checking!" % source
if os.path.getsize(source) == collection_file.size():
- print "WARNING: file %s already uploaded, skipping!" % source
# File already there, skip it.
- self.bytes_written += os.path.getsize(source)
+ self.bytes_skipped += os.path.getsize(source)
return
elif os.path.getsize(source) > collection_file.size():
- print "WARNING: RESUMING file %s" % source
# File partially uploaded, resume!
resume_upload = True
resume_offset = collection_file.size()
- self.bytes_written += resume_offset
+ self.bytes_skipped += resume_offset
else:
# Source file smaller than uploaded file, what happened here?
# TODO: Raise exception of some kind?
with open(source, 'r') as source_fd:
with self.collection as c:
if resume_upload:
- print "Resuming file, source: %s, filename: %s" % (source, filename)
output = c.open(filename, 'a')
source_fd.seek(resume_offset)
first_block = False
else:
- print "Writing file, source: %s, filename: %s" % (source, filename)
output = c.open(filename, 'w')
first_block = True
self._write(source_fd, output, first_block)
output.close()
- self.collection.save() # One last save...
+ if self.should_save:
+ self.collection.save() # One last save...
def write_directory_tree(self, path, stream_name='.'):
- if os.path.isdir(path):
- for item in os.listdir(path):
- print "Checking path: '%s' - stream_name: '%s'" % (path, stream_name)
- if os.path.isdir(os.path.join(path, item)):
- self.write_directory_tree(os.path.join(path, item),
- os.path.join(stream_name, item))
- else:
- self.write_file(os.path.join(path, item),
- os.path.join(stream_name, item))
+ # TODO: Check what happens when multiple directories are passes as arguments
+ # If the below code is uncommented, integration test
+ # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest) fails,
+ # I suppose it is because the manifest_uuid changes because of the dir addition to
+ # stream_name.
+ #
+ # if stream_name == '.':
+ # stream_name = os.path.join('.', os.path.basename(path))
+ for item in os.listdir(path):
+ if os.path.isdir(os.path.join(path, item)):
+ self.write_directory_tree(os.path.join(path, item),
+ os.path.join(stream_name, item))
+ else:
+ self.write_file(os.path.join(path, item),
+ os.path.join(stream_name, item))
- def manifest(self):
- print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE
- print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.manifest_locator(), self.collection.manifest_text())
- return True
-
def report_progress(self):
if self.reporter is not None:
- self.reporter(self.bytes_written, self.bytes_expected)
+ self.reporter(self.bytes_written+self.bytes_skipped, self.bytes_expected)
+
+ def _datablocks_on_item(self, item):
+ """
+ Return a list of datablock locators, recursively navigating
+ through subcollections
+ """
+ if isinstance(item, arvados.arvfile.ArvadosFile):
+ locators = []
+ for segment in item.segments():
+ loc = segment.locator
+ if loc.startswith("bufferblock"):
+ loc = self._bufferblocks[loc].calculate_locator()
+ locators.append(loc)
+ return locators
+ elif isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
+ l = [self._datablocks_on_item(x) for x in item.values()]
+ # Fast list flattener method taken from:
+ # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
+ return [loc for sublist in l for loc in sublist]
+ else:
+ return None
+
+ def data_locators(self):
+ return self._datablocks_on_item(self.collection)
class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
reporter=reporter,
bytes_expected=bytes_expected,
num_retries=args.retries,
- replication=write_copies)
+ write_copies=write_copies,
+ replication=args.replication,
+ should_save=False)
else:
writer = ArvPutCollection(cache=resume_cache,
reporter=reporter,
bytes_expected=bytes_expected,
num_retries=args.retries,
- replication=write_copies,
+ write_copies=write_copies,
+ replication=args.replication,
name=collection_name,
owner_uuid=project_uuid,
ensure_unique_name=True)
orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
for sigcode in CAUGHT_SIGNALS}
- if writer.bytes_written > 0: # We're resuming a previous upload. TODO
+ if resume_cache and resume_cache.bytes_written > 0:
print >>stderr, "\n".join([
"arv-put: Resuming previous upload from last checkpoint.",
" Use the --no-resume option to start over."])
if path == '-':
writer.write_stdin(args.filename)
elif os.path.isdir(path):
- writer.write_directory_tree(path)#, os.path.join('.', os.path.basename(path))) TODO: Check what happens with multiple directories params
+ writer.write_directory_tree(path)
else:
writer.write_file(path, args.filename or os.path.basename(path))
else:
output = writer.manifest_text()
elif args.raw:
- output = ','.join(writer.data_locators()) # TODO
+ output = ','.join(writer.data_locators())
else:
try:
- # manifest_text = writer.manifest_text()
- # if args.normalize:
- # manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
- # replication_attr = 'replication_desired'
- # if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
- # # API called it 'redundancy' before #3410.
- # replication_attr = 'redundancy'
- # # Register the resulting collection in Arvados.
- # collection = api_client.collections().create(
- # body={
- # 'owner_uuid': project_uuid,
- # 'name': collection_name,
- # 'manifest_text': manifest_text,
- # replication_attr: args.replication,
- # },
- # ensure_unique_name=True
- # ).execute(num_retries=args.retries)
- #
- # print >>stderr, "Collection saved as '%s'" % collection['name']
- #
writer.save()
+ print >>stderr, "Collection saved as '%s'" % writer.name()
if args.portable_data_hash:
output = writer.portable_data_hash()
else:
output = writer.manifest_locator()
- with open('/tmp/lucas.txt', 'w') as f:
- f.write(output)
except apiclient_errors.Error as error:
print >>stderr, (
import re
import shutil
import subprocess
-import multiprocessing
import sys
import tempfile
import time
import unittest
import yaml
+import multiprocessing
+import shutil
+import hashlib
+import random
from cStringIO import StringIO
class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
KEEP_SERVER = {}
- import shutil
-
- # def test_write_files(self):
- # c = arv_put.ArvPutCollection()
- # data = 'a' * 1024 * 1024 # 1 MB
- # tmpdir = tempfile.mkdtemp()
- # for size in [1, 10, 64, 128]:
- # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
- # for _ in range(size):
- # f.write(data)
- # c.write_file(f.name, os.path.basename(f.name))
- # shutil.rmtree(tmpdir)
- # self.assertEqual(True, c.manifest())
- #
- # def test_write_directory(self):
- # data = 'b' * 1024 * 1024
- # tmpdir = tempfile.mkdtemp()
- # for size in [1, 5, 10, 70]:
- # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
- # for _ in range(size):
- # f.write(data)
- # os.mkdir(os.path.join(tmpdir, 'subdir1'))
- # for size in [2, 4, 6]:
- # with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
- # for _ in range(size):
- # f.write(data)
- # c = arv_put.ArvPutUploader([tmpdir])
- # shutil.rmtree(tmpdir)
- # self.assertEqual(True, c.manifest())
+
+ def setUp(self):
+ self.lock = multiprocessing.Lock()
def fake_reporter(self, written, expected):
- # Use this callback as a intra-block pause to be able to simulate an interruption
- print "Written %d / %d bytes" % (written, expected)
- time.sleep(10)
+ self.lock.release() # Allow caller process to terminate() us...
- def bg_uploader(self, paths):
- return arv_put.ArvPutUploader(paths, reporter=self.fake_reporter)
-
- # def test_resume_large_file_upload(self):
- # import multiprocessing
- # data = 'x' * 1024 * 1024 # 1 MB
- # _, filename = tempfile.mkstemp()
- # fileobj = open(filename, 'w')
- # for _ in range(200):
- # fileobj.write(data)
- # fileobj.close()
- # uploader = multiprocessing.Process(target=self.bg_uploader, args=([filename],))
- # uploader.start()
- # time.sleep(5)
- # uploader.terminate()
- # time.sleep(1)
- # # cache = arv_put.ArvPutCollectionCache([filename])
- # # print "Collection detected: %s" % cache.collection()
- # # c = arv_put.ArvPutCollection(locator=cache.collection(), cache=cache)
- # # print "UPLOADED: %d" % c.collection[os.path.basename(filename)].size()
- # # self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename))
- # os.unlink(filename)
-
- # def test_write_directory_twice(self):
- # data = 'b' * 1024 * 1024
- # tmpdir = tempfile.mkdtemp()
- # for size in [1, 5, 10, 70]:
- # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
- # for _ in range(size):
- # f.write(data)
- # os.mkdir(os.path.join(tmpdir, 'subdir1'))
- # for size in [2, 4, 6]:
- # with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
- # for _ in range(size):
- # f.write(data)
- # c = arv_put.ArvPutUploader([tmpdir])
- # d = arv_put.ArvPutUploader([tmpdir])
- # print "ESCRIBIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
- # shutil.rmtree(tmpdir)
- # self.assertEqual(0, d.bytes_written())
+ def bg_uploader(self, filename):
+ cache = arv_put.ArvPutCollectionCache([filename])
+ c = arv_put.ArvPutCollection(reporter=self.fake_reporter, cache=cache)
+ c.collection_flush_time = 0 # flush collection on every block flush, just for this test
+ c.write_file(filename, os.path.basename(filename))
+
+ def test_write_collection_with_name(self):
+ name = 'This is a collection'
+ c = arv_put.ArvPutCollection(name=name)
+ self.assertEqual(name, c.name())
+
+ def test_write_file_on_collection_without_save(self):
+ c = arv_put.ArvPutCollection(should_save=False)
+ with tempfile.NamedTemporaryFile(delete=False) as f:
+ f.write("The quick brown fox jumped over the lazy dog")
+ c.write_file(f.name, os.path.basename(f.name))
+ self.assertEqual(None, c.manifest_locator())
+ os.unlink(f.name)
+
+ def test_write_file_and_check_data_locators(self):
+ c = arv_put.ArvPutCollection(should_save=False)
+ with tempfile.NamedTemporaryFile(delete=False) as f:
+ # Writing ~90 MB, so that it writes 2 data blocks
+ for _ in range(2 * 1024 * 1024):
+ f.write("The quick brown fox jumped over the lazy dog\n")
+ c.write_file(f.name, os.path.basename(f.name))
+ self.assertEqual(2, len(c.data_locators()))
+ os.unlink(f.name)
+
+ def test_write_directory_and_check_data_locators(self):
+ data = 'b' * 1024 * 1024 # 1 MB
+ tmpdir = tempfile.mkdtemp()
+ for size in [1, 5, 10, 70]:
+ with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ for _ in range(size):
+ f.write(data)
+ os.mkdir(os.path.join(tmpdir, 'subdir1'))
+ for size in [2, 4, 6]:
+ with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+ for _ in range(size):
+ f.write(data)
+ c = arv_put.ArvPutCollection()
+ c.write_directory_tree(tmpdir)
+ shutil.rmtree(tmpdir)
+ self.assertEqual(8, len(c.data_locators()))
+
+ def test_resume_large_file_upload(self):
+ _, filename = tempfile.mkstemp()
+ md5_original = hashlib.md5()
+ md5_uploaded = hashlib.md5()
+ fileobj = open(filename, 'w')
+ for _ in range(70):
+ data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+ fileobj.write(data)
+ md5_original.update(data)
+ fileobj.close()
+ self.lock.acquire()
+ uploader = multiprocessing.Process(target=self.bg_uploader, args=(filename,))
+ uploader.start()
+ self.lock.acquire() # We can now proceed, because one block and collection flush()ed
+ self.lock.release()
+ uploader.terminate()
+ uploader.join()
+ cache = arv_put.ArvPutCollectionCache([filename])
+ c = arv_put.ArvPutCollection(cache=cache)
+ self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename))
+ c.write_file(filename, os.path.basename(filename))
+ uploaded = c.collection.open(os.path.basename(filename), 'r')
+ while True:
+ data = uploaded.read(1024*1024)
+ if not data:
+ break
+ md5_uploaded.update(data)
+ os.unlink(filename)
+ cache.destroy()
+ self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
+
+ def test_write_directory_twice(self):
+ data = 'b' * 1024 * 1024
+ tmpdir = tempfile.mkdtemp()
+ for size in [1, 5, 10, 70]:
+ with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ for _ in range(size):
+ f.write(data)
+ os.mkdir(os.path.join(tmpdir, 'subdir1'))
+ for size in [2, 4, 6]:
+ with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+ for _ in range(size):
+ f.write(data)
+ c_cache = arv_put.ArvPutCollectionCache([tmpdir])
+ c = arv_put.ArvPutCollection(cache=c_cache)
+ c.write_directory_tree(tmpdir)
+ c_cache.close()
+ d_cache = arv_put.ArvPutCollectionCache([tmpdir])
+ d = arv_put.ArvPutCollection(cache=d_cache)
+ d.write_directory_tree(tmpdir)
+ d_cache.close()
+ c_cache.destroy()
+ d_cache.destroy()
+ shutil.rmtree(tmpdir)
+ self.assertNotEqual(c.bytes_written, d.bytes_written)
+ # self.assertGreater(c.bytes_written, 0)
+ # self.assertEqual(d.bytes_written, 0)
class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,