From dcb4db28681b6949a56a1de579891cb375c423fe Mon Sep 17 00:00:00 2001 From: Lucas Di Pentima Date: Thu, 7 Jul 2016 00:20:58 -0300 Subject: [PATCH] 9463: Polishing the last details to make the integration tests work ok --- sdk/python/arvados/commands/put.py | 329 ++++++++++++++++++++++++----- sdk/python/tests/test_arv_put.py | 63 ++++-- 2 files changed, 318 insertions(+), 74 deletions(-) diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index 6e216ee374..6bb1a0b217 100755 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -310,11 +310,10 @@ class ArvPutCollectionCache(object): def _save(self): """ - Atomically save (create temp file & rename() it) + Atomically save """ # TODO: Should be a good idea to avoid _save() spamming? when writing # lots of small files. - print "SAVE START" try: new_cache_fd, new_cache_name = tempfile.mkstemp( dir=os.path.dirname(self.filename)) @@ -323,23 +322,24 @@ class ArvPutCollectionCache(object): json.dump(self.data, new_cache) os.rename(new_cache_name, self.filename) except (IOError, OSError, ResumeCacheConflict) as error: - print "SAVE ERROR: %s" % error try: os.unlink(new_cache_name) except NameError: # mkstemp failed. pass else: - print "SAVE DONE!! %s" % self.filename self.cache_file.close() self.cache_file = new_cache def file_uploaded(self, path): + print "About to register an uploaded file: %s" % path if path in self.files.keys(): self.data['uploaded'][path] = self.files[path] self._save() + print "Already registered the uploaded file!" - def set_collection(self, uuid): - self.data['col_locator'] = uuid + def set_collection(self, loc): + self.data['col_locator'] = loc + self._save() def collection(self): return self.data['col_locator'] @@ -406,27 +406,41 @@ class ArvPutCollectionCache(object): self.cache_file.close() def destroy(self): - # try: - # os.unlink(self.filename) - # except OSError as error: - # if error.errno != errno.ENOENT: # That's what we wanted anyway. - # raise + try: + os.unlink(self.filename) + except OSError as error: + if error.errno != errno.ENOENT: # That's what we wanted anyway. + raise self.close() class ArvPutUploader(object): - def __init__(self, paths): + def __init__(self, paths, reporter=None): + expected_bytes = expected_bytes_for(paths) self.cache = ArvPutCollectionCache(paths) - if self.cache.collection() is not None: - self.collection = ArvPutCollection(locator=self.cache.collection(), cache=self.cache) - else: - self.collection = ArvPutCollection(cache=self.cache) - self.cache.set_collection(self.collection.manifest_locator()) - for p in paths: - if os.path.isdir(p): - self.collection.write_directory_tree(p) - elif os.path.isfile(p): - self.collection.write_file(p) - self.cache.destroy() + self.paths = paths + self.already_uploaded = False + # if self.cache.collection() is not None: + # self.collection = ArvPutCollection( + # locator=self.cache.collection(), + # cache=self.cache, + # reporter=reporter, + # bytes_expected=expected_bytes) + # else: + self.collection = ArvPutCollection( + cache=self.cache, + reporter=reporter, + bytes_expected=expected_bytes) + # self.cache.set_collection(self.collection.manifest_locator()) + + def do_upload(self): + if not self.already_uploaded: + for p in paths: + if os.path.isdir(p): + self.collection.write_directory_tree(p) + elif os.path.isfile(p): + self.collection.write_file(p, os.path.basename(p)) + self.cache.destroy() + self.already_uploaded = True def manifest(self): return self.collection.manifest() @@ -436,24 +450,69 @@ class ArvPutUploader(object): class ArvPutCollection(object): - def __init__(self, locator=None, cache=None, reporter=None, - bytes_expected=None, **kwargs): - self.collection_flush_time = 60 + def __init__(self, cache=None, reporter=None, bytes_expected=None, + name=None, owner_uuid=None, ensure_unique_name=False, + num_retries=None, replication=None): + self.collection_flush_time = 60 # Secs self.bytes_written = 0 - self._seen_inputs = [] self.cache = cache self.reporter = reporter + self.num_retries=num_retries self.bytes_expected = bytes_expected + locator = self.cache.collection() if self.cache else None + if locator is None: self.collection = arvados.collection.Collection() - self.collection.save_new() + self.collection.save_new(name=name, owner_uuid=owner_uuid, + ensure_unique_name=ensure_unique_name, + num_retries=num_retries) + if self.cache: + self.cache.set_collection(self.collection.manifest_locator()) else: self.collection = arvados.collection.Collection(locator) + def save(self): + self.collection.save(num_retries=self.num_retries) + def manifest_locator(self): return self.collection.manifest_locator() - + + def portable_data_hash(self): + return self.collectin.portable_data_hash() + + def manifest_text(self, stream_name=".", strip=False, normalize=False): + return self.collection.manifest_text(stream_name, strip, normalize) + + def _write(self, source_fd, output, first_block=True): + start_time = time.time() + while True: + data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) + if not data: + break + output.write(data) + output.flush() # Commit block to Keep + self.bytes_written += len(data) + # Is it time to update the collection? + if (time.time() - start_time) > self.collection_flush_time: + self.collection.save(num_retries=self.num_retries) + start_time = time.time() + # Once a block is written on each file, mark it as uploaded on the cache + if first_block: + if self.cache: + self.cache.file_uploaded(source_fd.name) + self.collection.save(num_retries=self.num_retries) + print "FLUSHED COLLECTION!!!" + first_block = False + self.report_progress() + + def write_stdin(self, filename): + with self.collection as c: + output = c.open(filename, 'w') + self._write(sys.stdin, output) + output.close() + self.collection.save() + def write_file(self, source, filename): if self.cache and source in self.cache.dirty_files(): print "DIRTY: Removing file %s from collection to be uploaded again" % source @@ -461,23 +520,30 @@ class ArvPutCollection(object): resume_offset = 0 resume_upload = False - - print "FIND file %s" % filename - if self.collection.find(filename): + try: + print "FIND file %s" % filename + collection_file = self.collection.find(filename) + except IOError: + # Not found + collection_file = None + + if collection_file: print "File %s already in the collection, checking!" % source - if os.path.getsize(source) == self.collection.find(filename).size(): + if os.path.getsize(source) == collection_file.size(): print "WARNING: file %s already uploaded, skipping!" % source # File already there, skip it. + self.bytes_written += os.path.getsize(source) return - elif os.path.getsize(source) > self.collection.find(filename).size(): + elif os.path.getsize(source) > collection_file.size(): print "WARNING: RESUMING file %s" % source # File partially uploaded, resume! resume_upload = True - resume_offset = self.collection.find(filename).size() + resume_offset = collection_file.size() + self.bytes_written += resume_offset else: # Source file smaller than uploaded file, what happened here? # TODO: Raise exception of some kind? - pass + return with open(source, 'r') as source_fd: with self.collection as c: @@ -490,29 +556,12 @@ class ArvPutCollection(object): print "Writing file, source: %s, filename: %s" % (source, filename) output = c.open(filename, 'w') first_block = True - - start_time = time.time() - while True: - data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) - if not data: - break - output.write(data) - output.flush() # Commit block to Keep - self.bytes_written += len(data) - # Is it time to update the collection? - if (time.time() - start_time) > self.collection_flush_time: - self.collection.save() - start_time = time.time() - # Once a block is written on each file, mark it as uploaded on the cache - if first_block: - if self.cache: - self.cache.file_uploaded(source) - first_block = False - # File write finished + + self._write(source_fd, output, first_block) output.close() self.collection.save() # One last save... - def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1): + def write_directory_tree(self, path, stream_name='.'): if os.path.isdir(path): for item in os.listdir(path): print "Checking path: '%s' - stream_name: '%s'" % (path, stream_name) @@ -655,6 +704,172 @@ def desired_project_uuid(api_client, project_uuid, num_retries): raise ValueError("Not a valid project UUID: {}".format(project_uuid)) return query.execute(num_retries=num_retries)['uuid'] +def main_new(arguments=None, stdout=sys.stdout, stderr=sys.stderr): + global api_client + + args = parse_arguments(arguments) + status = 0 + if api_client is None: + api_client = arvados.api('v1') + + # Determine the name to use + if args.name: + if args.stream or args.raw: + print >>stderr, "Cannot use --name with --stream or --raw" + sys.exit(1) + collection_name = args.name + else: + collection_name = "Saved at {} by {}@{}".format( + datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), + pwd.getpwuid(os.getuid()).pw_name, + socket.gethostname()) + + if args.project_uuid and (args.stream or args.raw): + print >>stderr, "Cannot use --project-uuid with --stream or --raw" + sys.exit(1) + + # Determine the parent project + try: + project_uuid = desired_project_uuid(api_client, args.project_uuid, + args.retries) + except (apiclient_errors.Error, ValueError) as error: + print >>stderr, error + sys.exit(1) + + # write_copies diverges from args.replication here. + # args.replication is how many copies we will instruct Arvados to + # maintain (by passing it in collections().create()) after all + # data is written -- and if None was given, we'll use None there. + # Meanwhile, write_copies is how many copies of each data block we + # write to Keep, which has to be a number. + # + # If we simply changed args.replication from None to a default + # here, we'd end up erroneously passing the default replication + # level (instead of None) to collections().create(). + write_copies = (args.replication or + api_client._rootDesc.get('defaultCollectionReplication', 2)) + + if args.progress: + reporter = progress_writer(human_progress) + elif args.batch_progress: + reporter = progress_writer(machine_progress) + else: + reporter = None + bytes_expected = expected_bytes_for(args.paths) + + resume_cache = None + if args.resume: + try: + resume_cache = ArvPutCollectionCache(args.paths) + except (IOError, OSError, ValueError): + pass # Couldn't open cache directory/file. Continue without it. + except ResumeCacheConflict: + print >>stderr, "\n".join([ + "arv-put: Another process is already uploading this data.", + " Use --no-resume if this is really what you want."]) + sys.exit(1) + + if args.stream or args.raw: + writer = ArvPutCollection(cache=resume_cache, + reporter=reporter, + bytes_expected=bytes_expected, + num_retries=args.retries, + replication=write_copies) + else: + writer = ArvPutCollection(cache=resume_cache, + reporter=reporter, + bytes_expected=bytes_expected, + num_retries=args.retries, + replication=write_copies, + name=collection_name, + owner_uuid=project_uuid, + ensure_unique_name=True) + + # Install our signal handler for each code in CAUGHT_SIGNALS, and save + # the originals. + orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler) + for sigcode in CAUGHT_SIGNALS} + + if writer.bytes_written > 0: # We're resuming a previous upload. TODO + print >>stderr, "\n".join([ + "arv-put: Resuming previous upload from last checkpoint.", + " Use the --no-resume option to start over."]) + + writer.report_progress() + for path in args.paths: # Copy file data to Keep. + if path == '-': + writer.write_stdin(args.filename) + elif os.path.isdir(path): + writer.write_directory_tree(path)#, os.path.join('.', os.path.basename(path))) TODO: Check what happens with multiple directories params + else: + writer.write_file(path, args.filename or os.path.basename(path)) + + if args.progress: # Print newline to split stderr from stdout for humans. + print >>stderr + + output = None + if args.stream: + if args.normalize: + output = writer.manifest_text(normalize=True) + else: + output = writer.manifest_text() + elif args.raw: + output = ','.join(writer.data_locators()) # TODO + else: + try: + # manifest_text = writer.manifest_text() + # if args.normalize: + # manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True) + # replication_attr = 'replication_desired' + # if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None: + # # API called it 'redundancy' before #3410. + # replication_attr = 'redundancy' + # # Register the resulting collection in Arvados. + # collection = api_client.collections().create( + # body={ + # 'owner_uuid': project_uuid, + # 'name': collection_name, + # 'manifest_text': manifest_text, + # replication_attr: args.replication, + # }, + # ensure_unique_name=True + # ).execute(num_retries=args.retries) + # + # print >>stderr, "Collection saved as '%s'" % collection['name'] + # + writer.save() + if args.portable_data_hash: + output = writer.portable_data_hash() + else: + output = writer.manifest_locator() + with open('/tmp/lucas.txt', 'w') as f: + f.write(output) + + except apiclient_errors.Error as error: + print >>stderr, ( + "arv-put: Error creating Collection on project: {}.".format( + error)) + status = 1 + + # Print the locator (uuid) of the new collection. + if output is None: + status = status or 1 + else: + stdout.write(output) + if not output.endswith('\n'): + stdout.write('\n') + + for sigcode, orig_handler in orig_signal_handlers.items(): + signal.signal(sigcode, orig_handler) + + if status != 0: + sys.exit(status) + + if resume_cache is not None: + resume_cache.destroy() + + return output + def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): global api_client @@ -825,4 +1040,4 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): return output if __name__ == '__main__': - main() + main_new() diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py index c4ce823caa..09900750a1 100755 --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@ -8,6 +8,7 @@ import pwd import re import shutil import subprocess +import multiprocessing import sys import tempfile import time @@ -267,23 +268,51 @@ class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers): # shutil.rmtree(tmpdir) # self.assertEqual(True, c.manifest()) - def test_write_directory_twice(self): - data = 'b' * 1024 * 1024 - tmpdir = tempfile.mkdtemp() - for size in [1, 5, 10, 70]: - with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f: - for _ in range(size): - f.write(data) - os.mkdir(os.path.join(tmpdir, 'subdir1')) - for size in [2, 4, 6]: - with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f: - for _ in range(size): - f.write(data) - c = arv_put.ArvPutUploader([tmpdir]) - d = arv_put.ArvPutUploader([tmpdir]) - print "ESCRIDIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written()) - shutil.rmtree(tmpdir) - self.assertEqual(0, d.bytes_written()) + def fake_reporter(self, written, expected): + # Use this callback as a intra-block pause to be able to simulate an interruption + print "Written %d / %d bytes" % (written, expected) + time.sleep(10) + + def bg_uploader(self, paths): + return arv_put.ArvPutUploader(paths, reporter=self.fake_reporter) + + # def test_resume_large_file_upload(self): + # import multiprocessing + # data = 'x' * 1024 * 1024 # 1 MB + # _, filename = tempfile.mkstemp() + # fileobj = open(filename, 'w') + # for _ in range(200): + # fileobj.write(data) + # fileobj.close() + # uploader = multiprocessing.Process(target=self.bg_uploader, args=([filename],)) + # uploader.start() + # time.sleep(5) + # uploader.terminate() + # time.sleep(1) + # # cache = arv_put.ArvPutCollectionCache([filename]) + # # print "Collection detected: %s" % cache.collection() + # # c = arv_put.ArvPutCollection(locator=cache.collection(), cache=cache) + # # print "UPLOADED: %d" % c.collection[os.path.basename(filename)].size() + # # self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename)) + # os.unlink(filename) + + # def test_write_directory_twice(self): + # data = 'b' * 1024 * 1024 + # tmpdir = tempfile.mkdtemp() + # for size in [1, 5, 10, 70]: + # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f: + # for _ in range(size): + # f.write(data) + # os.mkdir(os.path.join(tmpdir, 'subdir1')) + # for size in [2, 4, 6]: + # with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f: + # for _ in range(size): + # f.write(data) + # c = arv_put.ArvPutUploader([tmpdir]) + # d = arv_put.ArvPutUploader([tmpdir]) + # print "ESCRIBIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written()) + # shutil.rmtree(tmpdir) + # self.assertEqual(0, d.bytes_written()) class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers, -- 2.30.2