import json
import os
import pwd
+import time
import signal
import socket
import sys
self.__init__(self.filename)
+class ArvPutCollection(object):
+ def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
+ self.collection_flush_time = 60
+ self.bytes_written = 0
+ self._seen_inputs = []
+ self.cache = cache
+ self.reporter = reporter
+ self.bytes_expected = bytes_expected
+ # super(ArvPutCollection, self).__init__(**kwargs)
+ self.collection = arvados.collection.Collection()
+ self.collection.save_new()
+
+ def write_file(self, source, filename):
+ with self.collection as c:
+ with open(source, 'r') as source_fd:
+ output = c.open(filename, 'w')
+ start_time = time.time()
+ while True:
+ data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+ if not data:
+ break
+ output.write(data)
+ output.flush()
+ # Is it time to update the collection?
+ if (time.time() - start_time) > self.collection_flush_time:
+ self.collection.save()
+ start_time = time.time()
+ # File write finished
+ output.close()
+ self.collection.save() # TODO: Is this necessary?
+
+ def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
+ if os.path.isdir(path):
+ for item in os.listdir(path):
+ if os.path.isdir(item):
+ self.write_directory_tree(os.path.join(path, item),
+ os.path.join(stream_name, path, item))
+ else:
+ self.write_file(os.path.join(path, item),
+ os.path.join(stream_name, item))
+
+ def manifest(self):
+ print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE
+ print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.collection.manifest_locator(), self.collection.manifest_text())
+ return True
+
+ def report_progress(self):
+ if self.reporter is not None:
+ self.reporter(self.bytes_written, self.bytes_expected)
+
+
class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
['bytes_written', '_seen_inputs'])
arv_put.ResumeCache, path)
+class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
+ MAIN_SERVER = {}
+ KEEP_SERVER = {}
+
+ def test_write_files(self):
+ c = arv_put.ArvPutCollection()
+ data = 'a' * 1024 * 1024 # 1 MB
+ tmpdir = tempfile.mkdtemp()
+ for size in [1, 10, 64, 128]:
+ with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ for _ in range(size):
+ f.write(data)
+ c.write_file(f.name, os.path.basename(f.name))
+ os.unlink(f.name)
+ self.assertEqual(True, c.manifest())
+
+ def test_write_directory(self):
+ c = arv_put.ArvPutCollection()
+ data = 'b' * 1024 * 1024
+ tmpdir = tempfile.mkdtemp()
+ for size in [1, 5, 10, 70]:
+ with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ for _ in range(size):
+ f.write(data)
+ os.mkdir(os.path.join(tmpdir, 'subdir1'))
+ for size in [2, 4, 6]:
+ with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+ for _ in range(size):
+ f.write(data)
+ c.write_directory_tree(tmpdir, os.path.join('.', os.path.basename(tmpdir)))
+ self.assertEqual(True, c.manifest())
+
+
class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
ArvadosBaseTestCase):
def setUp(self):