9463: Finished first draft on arv-put command use of Collection class. Also, partial...
authorLucas Di Pentima <lucas@curoverse.com>
Fri, 8 Jul 2016 22:23:54 +0000 (19:23 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Fri, 8 Jul 2016 22:23:54 +0000 (19:23 -0300)
sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py

index 6bb1a0b217776b8b63b5d4792b7c8d279046d24d..b3e1c5f181b58f5923839ccc0ff72d442a724ad4 100755 (executable)
@@ -283,6 +283,7 @@ class ArvPutCollectionCache(object):
         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
         realpaths = sorted(os.path.realpath(path) for path in paths)
         self.files = {}
+        self.bytes_written = 0 # Approximate number of bytes already uploaded (partial uploaded files are counted in full)
         for path in realpaths:
             self._get_file_data(path)
         # Only hash args paths
@@ -295,6 +296,8 @@ class ArvPutCollectionCache(object):
         self._lock_file(self.cache_file)
         self.filename = self.cache_file.name
         self.data = self._load()
+        for f in self.data['uploaded'].values():
+            self.bytes_written += f['size']
     
     def _load(self):
         try:
@@ -312,14 +315,14 @@ class ArvPutCollectionCache(object):
         """
         Atomically save
         """
-        # TODO: Should be a good idea to avoid _save() spamming? when writing 
-        # lots of small files.
         try:
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self.filename))
             self._lock_file(new_cache_fd)
             new_cache = os.fdopen(new_cache_fd, 'r+')
             json.dump(self.data, new_cache)
+            new_cache.flush()
+            os.fsync(new_cache)
             os.rename(new_cache_name, self.filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
             try:
@@ -331,11 +334,9 @@ class ArvPutCollectionCache(object):
             self.cache_file = new_cache
     
     def file_uploaded(self, path):
-        print "About to register an uploaded file: %s" % path
         if path in self.files.keys():
             self.data['uploaded'][path] = self.files[path]
             self._save()
-            print "Already registered the uploaded file!"
     
     def set_collection(self, loc):
         self.data['col_locator'] = loc
@@ -413,73 +414,51 @@ class ArvPutCollectionCache(object):
                 raise
         self.close()
 
-class ArvPutUploader(object):
-    def __init__(self, paths, reporter=None):
-        expected_bytes = expected_bytes_for(paths)
-        self.cache = ArvPutCollectionCache(paths)
-        self.paths = paths
-        self.already_uploaded = False
-        # if self.cache.collection() is not None:
-        #     self.collection = ArvPutCollection(
-        #                         locator=self.cache.collection(),
-        #                         cache=self.cache,
-        #                         reporter=reporter,
-        #                         bytes_expected=expected_bytes)
-        # else:
-        self.collection = ArvPutCollection(
-                            cache=self.cache, 
-                            reporter=reporter,
-                            bytes_expected=expected_bytes)
-            # self.cache.set_collection(self.collection.manifest_locator())
-    
-    def do_upload(self):
-        if not self.already_uploaded:
-            for p in paths:
-                if os.path.isdir(p):
-                    self.collection.write_directory_tree(p)
-                elif os.path.isfile(p):
-                    self.collection.write_file(p, os.path.basename(p))
-            self.cache.destroy()
-            self.already_uploaded = True
-    
-    def manifest(self):
-        return self.collection.manifest()
-    
-    def bytes_written(self):
-        return self.collection.bytes_written
-
 
 class ArvPutCollection(object):
     def __init__(self, cache=None, reporter=None, bytes_expected=None, 
                 name=None, owner_uuid=None, ensure_unique_name=False, 
-                num_retries=None, replication=None):
+                num_retries=None, write_copies=None, replication=None,
+                should_save=True):
         self.collection_flush_time = 60 # Secs
         self.bytes_written = 0
+        self.bytes_skipped = 0
         self.cache = cache
         self.reporter = reporter
-        self.num_retries=num_retries
+        self.num_retries = num_retries
+        self.write_copies = write_copies
+        self.replication = replication
         self.bytes_expected = bytes_expected
+        self.should_save = should_save
         
         locator = self.cache.collection() if self.cache else None
         
         if locator is None:
-            self.collection = arvados.collection.Collection()
-            self.collection.save_new(name=name, owner_uuid=owner_uuid, 
-                                        ensure_unique_name=ensure_unique_name,
-                                        num_retries=num_retries)
-            if self.cache:
-                self.cache.set_collection(self.collection.manifest_locator())
+            self.collection = arvados.collection.Collection(
+                                        num_write_copies=self.write_copies)
+            if self.should_save:
+                self.collection.save_new(name=name, owner_uuid=owner_uuid, 
+                                            ensure_unique_name=ensure_unique_name,
+                                            num_retries=num_retries,
+                                            replication_desired=self.replication)
+                if self.cache:
+                    self.cache.set_collection(self.collection.manifest_locator())
         else:
-            self.collection = arvados.collection.Collection(locator)
+            self.collection = arvados.collection.Collection(locator,
+                                        num_write_copies=self.write_copies)
+    
+    def name(self):
+        return self.collection.api_response()['name'] if self.collection.api_response() else None
     
     def save(self):
-        self.collection.save(num_retries=self.num_retries)
+        if self.should_save:
+            self.collection.save(num_retries=self.num_retries)
     
     def manifest_locator(self):
         return self.collection.manifest_locator()
     
     def portable_data_hash(self):
-        return self.collectin.portable_data_hash()
+        return self.collection.portable_data_hash()
     
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
         return self.collection.manifest_text(stream_name, strip, normalize)
@@ -494,15 +473,14 @@ class ArvPutCollection(object):
             output.flush() # Commit block to Keep
             self.bytes_written += len(data)
             # Is it time to update the collection?
-            if (time.time() - start_time) > self.collection_flush_time:
+            if self.should_save and ((time.time() - start_time) > self.collection_flush_time):
                 self.collection.save(num_retries=self.num_retries)
                 start_time = time.time()
             # Once a block is written on each file, mark it as uploaded on the cache
-            if first_block:
+            if self.should_save and first_block:
                 if self.cache:
                     self.cache.file_uploaded(source_fd.name)
                     self.collection.save(num_retries=self.num_retries)
-                    print "FLUSHED COLLECTION!!!"
                 first_block = False
             self.report_progress()
     
@@ -511,35 +489,31 @@ class ArvPutCollection(object):
             output = c.open(filename, 'w')
             self._write(sys.stdin, output)
             output.close()
-            self.collection.save()
+            if self.should_save:
+                self.collection.save()
     
     def write_file(self, source, filename):
         if self.cache and source in self.cache.dirty_files():
-            print "DIRTY: Removing file %s from collection to be uploaded again" % source
             self.collection.remove(filename)
         
         resume_offset = 0
         resume_upload = False
         try:
-            print "FIND file %s" % filename
             collection_file = self.collection.find(filename)
         except IOError:
             # Not found
             collection_file = None
         
         if collection_file:
-            print "File %s already in the collection, checking!" % source
             if os.path.getsize(source) == collection_file.size():
-                print "WARNING: file %s already uploaded, skipping!" % source
                 # File already there, skip it.
-                self.bytes_written += os.path.getsize(source)
+                self.bytes_skipped += os.path.getsize(source)
                 return
             elif os.path.getsize(source) > collection_file.size():
-                print "WARNING: RESUMING file %s" % source
                 # File partially uploaded, resume!
                 resume_upload = True
                 resume_offset = collection_file.size()
-                self.bytes_written += resume_offset
+                self.bytes_skipped += resume_offset
             else:
                 # Source file smaller than uploaded file, what happened here?
                 # TODO: Raise exception of some kind?
@@ -548,38 +522,62 @@ class ArvPutCollection(object):
         with open(source, 'r') as source_fd:
             with self.collection as c:
                 if resume_upload:
-                    print "Resuming file, source: %s, filename: %s" % (source, filename)
                     output = c.open(filename, 'a')
                     source_fd.seek(resume_offset)
                     first_block = False
                 else:
-                    print "Writing file, source: %s, filename: %s" % (source, filename)
                     output = c.open(filename, 'w')
                     first_block = True
                 
                 self._write(source_fd, output, first_block)
                 output.close()
-                self.collection.save() # One last save...
+                if self.should_save:
+                    self.collection.save() # One last save...
 
     def write_directory_tree(self, path, stream_name='.'):
-        if os.path.isdir(path):
-            for item in os.listdir(path):
-                print "Checking path: '%s' - stream_name: '%s'" % (path, stream_name)
-                if os.path.isdir(os.path.join(path, item)):
-                    self.write_directory_tree(os.path.join(path, item), 
-                                    os.path.join(stream_name, item))
-                else:
-                    self.write_file(os.path.join(path, item), 
-                                    os.path.join(stream_name, item))
+        # TODO: Check what happens when multiple directories are passes as arguments
+        # If the below code is uncommented, integration test
+        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest) fails, 
+        # I suppose it is because the manifest_uuid changes because of the dir addition to
+        # stream_name.
+        #
+        # if stream_name == '.':
+        #     stream_name = os.path.join('.', os.path.basename(path))
+        for item in os.listdir(path):
+            if os.path.isdir(os.path.join(path, item)):
+                self.write_directory_tree(os.path.join(path, item), 
+                                os.path.join(stream_name, item))
+            else:
+                self.write_file(os.path.join(path, item), 
+                                os.path.join(stream_name, item))
 
-    def manifest(self):
-        print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE
-        print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.manifest_locator(), self.collection.manifest_text())
-        return True
-    
     def report_progress(self):
         if self.reporter is not None:
-            self.reporter(self.bytes_written, self.bytes_expected)
+            self.reporter(self.bytes_written+self.bytes_skipped, self.bytes_expected)
+    
+    def _datablocks_on_item(self, item):
+        """
+        Return a list of datablock locators, recursively navigating
+        through subcollections
+        """
+        if isinstance(item, arvados.arvfile.ArvadosFile):
+            locators = []
+            for segment in item.segments():
+                loc = segment.locator
+                if loc.startswith("bufferblock"):
+                    loc = self._bufferblocks[loc].calculate_locator()
+                locators.append(loc)
+            return locators
+        elif isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
+            l = [self._datablocks_on_item(x) for x in item.values()]
+            # Fast list flattener method taken from: 
+            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
+            return [loc for sublist in l for loc in sublist]
+        else:
+            return None
+    
+    def data_locators(self):
+        return self._datablocks_on_item(self.collection)
 
 
 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
@@ -774,13 +772,16 @@ def main_new(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                         reporter=reporter,
                         bytes_expected=bytes_expected,
                         num_retries=args.retries,
-                        replication=write_copies)
+                        write_copies=write_copies,
+                        replication=args.replication,
+                        should_save=False)
     else:
         writer = ArvPutCollection(cache=resume_cache, 
                         reporter=reporter,
                         bytes_expected=bytes_expected,
                         num_retries=args.retries,
-                        replication=write_copies,
+                        write_copies=write_copies,
+                        replication=args.replication,
                         name=collection_name,
                         owner_uuid=project_uuid,
                         ensure_unique_name=True)
@@ -790,7 +791,7 @@ def main_new(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if writer.bytes_written > 0:  # We're resuming a previous upload. TODO
+    if resume_cache and resume_cache.bytes_written > 0:
         print >>stderr, "\n".join([
                 "arv-put: Resuming previous upload from last checkpoint.",
                 "         Use the --no-resume option to start over."])
@@ -800,7 +801,7 @@ def main_new(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         if path == '-':
             writer.write_stdin(args.filename)
         elif os.path.isdir(path):
-            writer.write_directory_tree(path)#, os.path.join('.', os.path.basename(path))) TODO: Check what happens with multiple directories params
+            writer.write_directory_tree(path)
         else:
             writer.write_file(path, args.filename or os.path.basename(path))
 
@@ -814,36 +815,15 @@ def main_new(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         else:
             output = writer.manifest_text()
     elif args.raw:
-        output = ','.join(writer.data_locators()) # TODO
+        output = ','.join(writer.data_locators())
     else:
         try:
-    #         manifest_text = writer.manifest_text()
-    #         if args.normalize:
-    #             manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
-    #         replication_attr = 'replication_desired'
-    #         if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
-    #             # API called it 'redundancy' before #3410.
-    #             replication_attr = 'redundancy'
-    #         # Register the resulting collection in Arvados.
-    #         collection = api_client.collections().create(
-    #             body={
-    #                 'owner_uuid': project_uuid,
-    #                 'name': collection_name,
-    #                 'manifest_text': manifest_text,
-    #                 replication_attr: args.replication,
-    #                 },
-    #             ensure_unique_name=True
-    #             ).execute(num_retries=args.retries)
-    #
-    #         print >>stderr, "Collection saved as '%s'" % collection['name']
-    #
             writer.save()
+            print >>stderr, "Collection saved as '%s'" % writer.name()
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
                 output = writer.manifest_locator()
-            with open('/tmp/lucas.txt', 'w') as f:
-                f.write(output)
 
         except apiclient_errors.Error as error:
             print >>stderr, (
index 09900750a155337d9e751a5295f81c89d8aa752c..a373de9df20f0bb900eb7bb6d614dfc8b7c58eaf 100755 (executable)
@@ -8,12 +8,15 @@ import pwd
 import re
 import shutil
 import subprocess
-import multiprocessing
 import sys
 import tempfile
 import time
 import unittest
 import yaml
+import multiprocessing
+import shutil
+import hashlib
+import random
 
 from cStringIO import StringIO
 
@@ -238,81 +241,116 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
 class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
     KEEP_SERVER = {}
-    import shutil
-        
-    # def test_write_files(self):
-    #     c = arv_put.ArvPutCollection()
-    #     data = 'a' * 1024 * 1024 # 1 MB
-    #     tmpdir = tempfile.mkdtemp()
-    #     for size in [1, 10, 64, 128]:
-    #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
-    #             for _ in range(size):
-    #                 f.write(data)
-    #         c.write_file(f.name, os.path.basename(f.name))
-    #     shutil.rmtree(tmpdir)
-    #     self.assertEqual(True, c.manifest())
-    #
-    # def test_write_directory(self):
-    #     data = 'b' * 1024 * 1024
-    #     tmpdir = tempfile.mkdtemp()
-    #     for size in [1, 5, 10, 70]:
-    #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
-    #             for _ in range(size):
-    #                 f.write(data)
-    #     os.mkdir(os.path.join(tmpdir, 'subdir1'))
-    #     for size in [2, 4, 6]:
-    #         with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
-    #             for _ in range(size):
-    #                 f.write(data)
-    #     c = arv_put.ArvPutUploader([tmpdir])
-    #     shutil.rmtree(tmpdir)
-    #     self.assertEqual(True, c.manifest())
+    
+    def setUp(self):
+        self.lock = multiprocessing.Lock()
 
     def fake_reporter(self, written, expected):
-        # Use this callback as a intra-block pause to be able to simulate an interruption
-        print "Written %d / %d bytes" % (written, expected)
-        time.sleep(10)
+        self.lock.release() # Allow caller process to terminate() us...
     
-    def bg_uploader(self, paths):
-        return arv_put.ArvPutUploader(paths, reporter=self.fake_reporter)
-
-    # def test_resume_large_file_upload(self):
-    #     import multiprocessing
-    #     data = 'x' * 1024 * 1024 # 1 MB
-    #     _, filename = tempfile.mkstemp()
-    #     fileobj = open(filename, 'w')
-    #     for _ in range(200):
-    #         fileobj.write(data)
-    #     fileobj.close()
-    #     uploader = multiprocessing.Process(target=self.bg_uploader, args=([filename],))
-    #     uploader.start()
-    #     time.sleep(5)
-    #     uploader.terminate()
-    #     time.sleep(1)
-    #     # cache = arv_put.ArvPutCollectionCache([filename])
-    #     # print "Collection detected: %s" % cache.collection()
-    #     # c = arv_put.ArvPutCollection(locator=cache.collection(), cache=cache)
-    #     # print "UPLOADED: %d" % c.collection[os.path.basename(filename)].size()
-    #     # self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename))
-    #     os.unlink(filename)
-
-    # def test_write_directory_twice(self):
-    #     data = 'b' * 1024 * 1024
-    #     tmpdir = tempfile.mkdtemp()
-    #     for size in [1, 5, 10, 70]:
-    #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
-    #             for _ in range(size):
-    #                 f.write(data)
-    #     os.mkdir(os.path.join(tmpdir, 'subdir1'))
-    #     for size in [2, 4, 6]:
-    #         with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
-    #             for _ in range(size):
-    #                 f.write(data)
-    #     c = arv_put.ArvPutUploader([tmpdir])
-    #     d = arv_put.ArvPutUploader([tmpdir])
-    #     print "ESCRIBIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
-    #     shutil.rmtree(tmpdir)
-    #     self.assertEqual(0, d.bytes_written())
+    def bg_uploader(self, filename):
+        cache = arv_put.ArvPutCollectionCache([filename])
+        c = arv_put.ArvPutCollection(reporter=self.fake_reporter, cache=cache)
+        c.collection_flush_time = 0 # flush collection on every block flush, just for this test
+        c.write_file(filename, os.path.basename(filename))
+
+    def test_write_collection_with_name(self):
+        name = 'This is a collection'
+        c = arv_put.ArvPutCollection(name=name)
+        self.assertEqual(name, c.name())
+
+    def test_write_file_on_collection_without_save(self):
+        c = arv_put.ArvPutCollection(should_save=False)
+        with tempfile.NamedTemporaryFile(delete=False) as f:
+            f.write("The quick brown fox jumped over the lazy dog")
+        c.write_file(f.name, os.path.basename(f.name))
+        self.assertEqual(None, c.manifest_locator())
+        os.unlink(f.name)
+
+    def test_write_file_and_check_data_locators(self):
+        c = arv_put.ArvPutCollection(should_save=False)
+        with tempfile.NamedTemporaryFile(delete=False) as f:
+            # Writing ~90 MB, so that it writes 2 data blocks
+            for _ in range(2 * 1024 * 1024):
+                f.write("The quick brown fox jumped over the lazy dog\n")
+        c.write_file(f.name, os.path.basename(f.name))
+        self.assertEqual(2, len(c.data_locators()))
+        os.unlink(f.name)
+        
+    def test_write_directory_and_check_data_locators(self):
+        data = 'b' * 1024 * 1024 # 1 MB
+        tmpdir = tempfile.mkdtemp()
+        for size in [1, 5, 10, 70]:
+            with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+                for _ in range(size):
+                    f.write(data)
+        os.mkdir(os.path.join(tmpdir, 'subdir1'))
+        for size in [2, 4, 6]:
+            with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+                for _ in range(size):
+                    f.write(data)
+        c = arv_put.ArvPutCollection()
+        c.write_directory_tree(tmpdir)
+        shutil.rmtree(tmpdir)
+        self.assertEqual(8, len(c.data_locators()))
+
+    def test_resume_large_file_upload(self):
+        _, filename = tempfile.mkstemp()
+        md5_original = hashlib.md5()
+        md5_uploaded = hashlib.md5()
+        fileobj = open(filename, 'w')
+        for _ in range(70):
+            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+            fileobj.write(data)
+            md5_original.update(data)
+        fileobj.close()
+        self.lock.acquire()
+        uploader = multiprocessing.Process(target=self.bg_uploader, args=(filename,))
+        uploader.start()
+        self.lock.acquire() # We can now proceed, because one block and collection flush()ed
+        self.lock.release()
+        uploader.terminate()
+        uploader.join()
+        cache = arv_put.ArvPutCollectionCache([filename])
+        c = arv_put.ArvPutCollection(cache=cache)
+        self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename))
+        c.write_file(filename, os.path.basename(filename))
+        uploaded = c.collection.open(os.path.basename(filename), 'r')
+        while True:
+            data = uploaded.read(1024*1024)
+            if not data:
+                break
+            md5_uploaded.update(data)
+        os.unlink(filename)
+        cache.destroy()
+        self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
+
+    def test_write_directory_twice(self):
+        data = 'b' * 1024 * 1024
+        tmpdir = tempfile.mkdtemp()
+        for size in [1, 5, 10, 70]:
+            with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+                for _ in range(size):
+                    f.write(data)
+        os.mkdir(os.path.join(tmpdir, 'subdir1'))
+        for size in [2, 4, 6]:
+            with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+                for _ in range(size):
+                    f.write(data)
+        c_cache = arv_put.ArvPutCollectionCache([tmpdir])
+        c = arv_put.ArvPutCollection(cache=c_cache)
+        c.write_directory_tree(tmpdir)
+        c_cache.close()
+        d_cache = arv_put.ArvPutCollectionCache([tmpdir])
+        d = arv_put.ArvPutCollection(cache=d_cache)
+        d.write_directory_tree(tmpdir)
+        d_cache.close()
+        c_cache.destroy()
+        d_cache.destroy()
+        shutil.rmtree(tmpdir)
+        self.assertNotEqual(c.bytes_written, d.bytes_written)
+        # self.assertGreater(c.bytes_written, 0)
+        # self.assertEqual(d.bytes_written, 0)
         
 
 class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,