From 3847357436978a97470e0769f3200d354a2bd08e Mon Sep 17 00:00:00 2001 From: Lucas Di Pentima Date: Tue, 5 Jul 2016 18:53:37 -0300 Subject: [PATCH] 9463: Lots of progress today, resume upload code written, many tests to do! --- sdk/python/arvados/commands/put.py | 151 +++++++++++++++++++++-------- sdk/python/tests/test_arv_put.py | 51 +++++++--- 2 files changed, 145 insertions(+), 57 deletions(-) diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index dffa576957..6e216ee374 100755 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -278,8 +278,6 @@ class ResumeCache(object): class ArvPutCollectionCache(object): - CACHE_DIR = '.cache/arvados/arv-put' - def __init__(self, paths): md5 = hashlib.md5() md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost')) @@ -288,8 +286,12 @@ class ArvPutCollectionCache(object): for path in realpaths: self._get_file_data(path) # Only hash args paths - self.cache_hash = md5.update('\0'.join(realpaths)) - self.cache_file = open(os.path.join(self.CACHE_DIR, self.cache_hash), 'a+') + md5.update('\0'.join(realpaths)) + self.cache_hash = md5.hexdigest() + + self.cache_file = open(os.path.join( + arv_cmd.make_home_conf_dir('.cache/arvados/arv-put', 0o700, 'raise'), + self.cache_hash), 'a+') self._lock_file(self.cache_file) self.filename = self.cache_file.name self.data = self._load() @@ -301,25 +303,56 @@ class ArvPutCollectionCache(object): except ValueError: # File empty, set up new cache ret = { - 'col_uuid' : None, # Collection UUID - 'uploaded' : [], # Uploaded file - 'files': {}, # Complete cached file (path, mtime, size) list + 'col_locator' : None, # Collection + 'uploaded' : {}, # Uploaded file list: {path : {size, mtime}} } return ret + def _save(self): + """ + Atomically save (create temp file & rename() it) + """ + # TODO: Should be a good idea to avoid _save() spamming? when writing + # lots of small files. + print "SAVE START" + try: + new_cache_fd, new_cache_name = tempfile.mkstemp( + dir=os.path.dirname(self.filename)) + self._lock_file(new_cache_fd) + new_cache = os.fdopen(new_cache_fd, 'r+') + json.dump(self.data, new_cache) + os.rename(new_cache_name, self.filename) + except (IOError, OSError, ResumeCacheConflict) as error: + print "SAVE ERROR: %s" % error + try: + os.unlink(new_cache_name) + except NameError: # mkstemp failed. + pass + else: + print "SAVE DONE!! %s" % self.filename + self.cache_file.close() + self.cache_file = new_cache + + def file_uploaded(self, path): + if path in self.files.keys(): + self.data['uploaded'][path] = self.files[path] + self._save() + def set_collection(self, uuid): - self.data['col_uuid'] = uuid + self.data['col_locator'] = uuid def collection(self): - return self.data['col_uuid'] + return self.data['col_locator'] def is_dirty(self, path): - if (self.files[path]['mtime'] != self.data['files'][path]['mtime']) or - (self.files[path]['size'] != self.data['files'][path]['size']): + if not path in self.data['uploaded'].keys(): + # Cannot be dirty is it wasn't even uploaded + return False + + if (self.files[path]['mtime'] != self.data['uploaded'][path]['mtime']) or (self.files[path]['size'] != self.data['uploaded'][path]['size']): return True else: return False - def dirty_files(self): """ @@ -327,7 +360,7 @@ class ArvPutCollectionCache(object): upload runs. These files should be re-uploaded. """ dirty = [] - for f in self.data['uploaded']: + for f in self.data['uploaded'].keys(): if self.is_dirty(f): dirty.append(f) return dirty @@ -338,14 +371,14 @@ class ArvPutCollectionCache(object): upload runs. These files should be checked for partial uploads """ uploaded = [] - for f in self.data['uploaded']: + for f in self.data['uploaded'].keys(): if not self.is_dirty(f): uploaded.append(f) return uploaded def pending_files(self): """ - Files that should be uploaded, because they're dirty or thet + Files that should be uploaded, because of being dirty or that never had the chance to be uploaded yet. """ pending = [] @@ -369,6 +402,16 @@ class ArvPutCollectionCache(object): except IOError: raise ResumeCacheConflict("{} locked".format(fileobj.name)) + def close(self): + self.cache_file.close() + + def destroy(self): + # try: + # os.unlink(self.filename) + # except OSError as error: + # if error.errno != errno.ENOENT: # That's what we wanted anyway. + # raise + self.close() class ArvPutUploader(object): def __init__(self, paths): @@ -377,26 +420,23 @@ class ArvPutUploader(object): self.collection = ArvPutCollection(locator=self.cache.collection(), cache=self.cache) else: self.collection = ArvPutCollection(cache=self.cache) - self.cache.set_collection(self.collection.locator()) - - # # Remove uploaded 'dirty' files from collection - # for f in self.cache.dirty_files(): - # self.collection.remove_file(f) - # - # # Upload pending files - # for f in self.cache.pending_files(): - # self.collection.write_file(f, os.path.join('.', f)) - # self.cache.file_uploaded(f) #TODO - + self.cache.set_collection(self.collection.manifest_locator()) for p in paths: if os.path.isdir(p): self.collection.write_directory_tree(p) elif os.path.isfile(p): self.collection.write_file(p) + self.cache.destroy() + + def manifest(self): + return self.collection.manifest() + + def bytes_written(self): + return self.collection.bytes_written class ArvPutCollection(object): - def __init__(self, locator = None, cache=None, reporter=None, + def __init__(self, locator=None, cache=None, reporter=None, bytes_expected=None, **kwargs): self.collection_flush_time = 60 self.bytes_written = 0 @@ -410,23 +450,47 @@ class ArvPutCollection(object): self.collection.save_new() else: self.collection = arvados.collection.Collection(locator) + + def manifest_locator(self): + return self.collection.manifest_locator() def write_file(self, source, filename): - if source in self.cache.dirty_files(): - print "DIRTY: Removing file %s to be uploaded again" % source + if self.cache and source in self.cache.dirty_files(): + print "DIRTY: Removing file %s from collection to be uploaded again" % source self.collection.remove(filename) - elif source in self.cache.uploaded_files(): - # TODO: Check for partial uploads - pass - # if not source in self.cache.pending_files(): - # print "WARNING: file %s already uploaded, skipping!" % source - # return - - print "Writing file, source: %s, filename: %s" % (source, filename) - with self.collection as c: - with open(source, 'r') as source_fd: - output = c.open(filename, 'w') + resume_offset = 0 + resume_upload = False + + print "FIND file %s" % filename + if self.collection.find(filename): + print "File %s already in the collection, checking!" % source + if os.path.getsize(source) == self.collection.find(filename).size(): + print "WARNING: file %s already uploaded, skipping!" % source + # File already there, skip it. + return + elif os.path.getsize(source) > self.collection.find(filename).size(): + print "WARNING: RESUMING file %s" % source + # File partially uploaded, resume! + resume_upload = True + resume_offset = self.collection.find(filename).size() + else: + # Source file smaller than uploaded file, what happened here? + # TODO: Raise exception of some kind? + pass + + with open(source, 'r') as source_fd: + with self.collection as c: + if resume_upload: + print "Resuming file, source: %s, filename: %s" % (source, filename) + output = c.open(filename, 'a') + source_fd.seek(resume_offset) + first_block = False + else: + print "Writing file, source: %s, filename: %s" % (source, filename) + output = c.open(filename, 'w') + first_block = True + start_time = time.time() while True: data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) @@ -439,6 +503,11 @@ class ArvPutCollection(object): if (time.time() - start_time) > self.collection_flush_time: self.collection.save() start_time = time.time() + # Once a block is written on each file, mark it as uploaded on the cache + if first_block: + if self.cache: + self.cache.file_uploaded(source) + first_block = False # File write finished output.close() self.collection.save() # One last save... @@ -456,7 +525,7 @@ class ArvPutCollection(object): def manifest(self): print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE - print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.collection.manifest_locator(), self.collection.manifest_text()) + print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.manifest_locator(), self.collection.manifest_text()) return True def report_progress(self): diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py index 54a70bfa39..c4ce823caa 100755 --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@ -237,21 +237,37 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers): MAIN_SERVER = {} KEEP_SERVER = {} + import shutil - def test_write_files(self): - c = arv_put.ArvPutCollection() - data = 'a' * 1024 * 1024 # 1 MB - tmpdir = tempfile.mkdtemp() - for size in [1, 10, 64, 128]: - with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f: - for _ in range(size): - f.write(data) - c.write_file(f.name, os.path.basename(f.name)) - os.unlink(f.name) - self.assertEqual(True, c.manifest()) - - def test_write_directory(self): - c = arv_put.ArvPutCollection() + # def test_write_files(self): + # c = arv_put.ArvPutCollection() + # data = 'a' * 1024 * 1024 # 1 MB + # tmpdir = tempfile.mkdtemp() + # for size in [1, 10, 64, 128]: + # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f: + # for _ in range(size): + # f.write(data) + # c.write_file(f.name, os.path.basename(f.name)) + # shutil.rmtree(tmpdir) + # self.assertEqual(True, c.manifest()) + # + # def test_write_directory(self): + # data = 'b' * 1024 * 1024 + # tmpdir = tempfile.mkdtemp() + # for size in [1, 5, 10, 70]: + # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f: + # for _ in range(size): + # f.write(data) + # os.mkdir(os.path.join(tmpdir, 'subdir1')) + # for size in [2, 4, 6]: + # with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f: + # for _ in range(size): + # f.write(data) + # c = arv_put.ArvPutUploader([tmpdir]) + # shutil.rmtree(tmpdir) + # self.assertEqual(True, c.manifest()) + + def test_write_directory_twice(self): data = 'b' * 1024 * 1024 tmpdir = tempfile.mkdtemp() for size in [1, 5, 10, 70]: @@ -263,8 +279,11 @@ class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers): with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f: for _ in range(size): f.write(data) - c.write_directory_tree(tmpdir, os.path.join('.', os.path.basename(tmpdir))) - self.assertEqual(True, c.manifest()) + c = arv_put.ArvPutUploader([tmpdir]) + d = arv_put.ArvPutUploader([tmpdir]) + print "ESCRIDIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written()) + shutil.rmtree(tmpdir) + self.assertEqual(0, d.bytes_written()) class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers, -- 2.30.2