9463: Initial coding with tests
authorLucas Di Pentima <lucas@curoverse.com>
Fri, 1 Jul 2016 22:05:18 +0000 (19:05 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Fri, 1 Jul 2016 22:05:18 +0000 (19:05 -0300)
sdk/python/arvados/commands/put.py [changed mode: 0644->0755]
sdk/python/tests/test_arv_put.py [changed mode: 0644->0755]

old mode 100644 (file)
new mode 100755 (executable)
index 5cb699f..906a6bf
@@ -14,6 +14,7 @@ import hashlib
 import json
 import os
 import pwd
+import time
 import signal
 import socket
 import sys
@@ -276,6 +277,57 @@ class ResumeCache(object):
         self.__init__(self.filename)
 
 
+class ArvPutCollection(object):
+    def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
+        self.collection_flush_time = 60
+        self.bytes_written = 0
+        self._seen_inputs = []
+        self.cache = cache
+        self.reporter = reporter
+        self.bytes_expected = bytes_expected
+        # super(ArvPutCollection, self).__init__(**kwargs)
+        self.collection = arvados.collection.Collection()
+        self.collection.save_new()
+
+    def write_file(self, source, filename):
+        with self.collection as c:
+            with open(source, 'r') as source_fd:
+                output = c.open(filename, 'w')
+                start_time = time.time()
+                while True:
+                    data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+                    if not data:
+                        break
+                    output.write(data)
+                    output.flush()
+                    # Is it time to update the collection?
+                    if (time.time() - start_time) > self.collection_flush_time:
+                        self.collection.save()
+                        start_time = time.time()
+                # File write finished
+                output.close()
+                self.collection.save() # TODO: Is this necessary?
+
+    def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
+        if os.path.isdir(path):
+            for item in os.listdir(path):
+                if os.path.isdir(item):
+                    self.write_directory_tree(os.path.join(path, item), 
+                                    os.path.join(stream_name, path, item))
+                else:
+                    self.write_file(os.path.join(path, item), 
+                                    os.path.join(stream_name, item))
+
+    def manifest(self):
+        print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE
+        print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.collection.manifest_locator(), self.collection.manifest_text())
+        return True
+    
+    def report_progress(self):
+        if self.reporter is not None:
+            self.reporter(self.bytes_written, self.bytes_expected)
+
+
 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
                    ['bytes_written', '_seen_inputs'])
old mode 100644 (file)
new mode 100755 (executable)
index e64d914..54a70bf
@@ -234,6 +234,39 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
                           arv_put.ResumeCache, path)
 
 
+class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
+    MAIN_SERVER = {}
+    KEEP_SERVER = {}
+        
+    def test_write_files(self):
+        c = arv_put.ArvPutCollection()
+        data = 'a' * 1024 * 1024 # 1 MB
+        tmpdir = tempfile.mkdtemp()
+        for size in [1, 10, 64, 128]:
+            with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+                for _ in range(size):
+                    f.write(data)
+            c.write_file(f.name, os.path.basename(f.name))
+            os.unlink(f.name)
+        self.assertEqual(True, c.manifest())
+    
+    def test_write_directory(self):
+        c = arv_put.ArvPutCollection()
+        data = 'b' * 1024 * 1024
+        tmpdir = tempfile.mkdtemp()
+        for size in [1, 5, 10, 70]:
+            with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+                for _ in range(size):
+                    f.write(data)
+        os.mkdir(os.path.join(tmpdir, 'subdir1'))
+        for size in [2, 4, 6]:
+            with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+                for _ in range(size):
+                    f.write(data)
+        c.write_directory_tree(tmpdir, os.path.join('.', os.path.basename(tmpdir)))
+        self.assertEqual(True, c.manifest())
+        
+
 class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
                                      ArvadosBaseTestCase):
     def setUp(self):