import arvados import arvados_fuse as fuse import llfuse import logging import os import sys import unittest from .. import run_test_server from ..mount_test_base import MountTestBase logger = logging.getLogger('arvados.arv-mount') from performance_profiler import profiled def fuse_CreateCollection(mounttmp, streams=1, files_per_stream=1, data='x'): class Test(unittest.TestCase): def runTest(self): for i in range(0, streams): os.mkdir(os.path.join(mounttmp, "./stream" + str(i))) # Create files for j in range(0, files_per_stream): with open(os.path.join(mounttmp, "./stream" + str(i), "file" + str(j) +".txt"), "w") as f: f.write(data) Test().runTest() def fuse_ReadContentsFromCollectionWithManyFiles(mounttmp, streams=1, files_per_stream=1, data='x'): class Test(unittest.TestCase): def runTest(self): for i in range(0, streams): d1 = llfuse.listdir(os.path.join(mounttmp, 'stream'+str(i))) for j in range(0, files_per_stream): with open(os.path.join(mounttmp, 'stream'+str(i), 'file'+str(i)+'.txt')) as f: self.assertEqual(data, f.read()) Test().runTest() def fuse_MoveFileFromCollectionWithManyFiles(mounttmp, stream, filename): class Test(unittest.TestCase): def runTest(self): d1 = llfuse.listdir(os.path.join(mounttmp, stream)) self.assertIn(filename, d1) os.rename(os.path.join(mounttmp, stream, filename), os.path.join(mounttmp, 'moved_from_'+stream+'_'+filename)) d1 = llfuse.listdir(os.path.join(mounttmp)) self.assertIn('moved_from_'+stream+'_'+filename, d1) d1 = llfuse.listdir(os.path.join(mounttmp, stream)) self.assertNotIn(filename, d1) Test().runTest() def fuse_DeleteFileFromCollectionWithManyFiles(mounttmp, stream, filename): class Test(unittest.TestCase): def runTest(self): os.remove(os.path.join(mounttmp, stream, filename)) Test().runTest() # Create a collection with 2 streams, 3 files_per_stream, 2 blocks_per_file, 2**26 bytes_per_block class CreateCollectionWithMultipleBlocksAndMoveAndDeleteFile(MountTestBase): def setUp(self): super(CreateCollectionWithMultipleBlocksAndMoveAndDeleteFile, self).setUp() @profiled def createCollectionWithMultipleBlocks(self, streams, files_per_stream, data): self.pool.apply(fuse_CreateCollection, (self.mounttmp, streams, files_per_stream, data,)) @profiled def readContentsOfCollectionWithMultipleBlocks(self, streams, files_per_stream, data): self.pool.apply(fuse_ReadContentsFromCollectionWithManyFiles, (self.mounttmp, streams, files_per_stream, data,)) @profiled def moveFileFromCollectionWithMultipleBlocks(self, streams): for i in range(0, streams): self.pool.apply(fuse_MoveFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file0.txt',)) @profiled def removeFileFromCollectionWithMultipleBlocks(self, streams): for i in range(0, streams): self.pool.apply(fuse_DeleteFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file1.txt')) def test_CreateCollectionWithManyBlocksAndMoveAndDeleteFile(self): collection = arvados.collection.Collection(api_client=self.api) collection.save_new() m = self.make_mount(fuse.CollectionDirectory) with llfuse.lock: m.new_collection(collection.api_response(), collection) self.assertTrue(m.writable()) streams = 2 files_per_stream = 3 blocks_per_file = 2 bytes_per_block = 2**26 data = 'x' * blocks_per_file * bytes_per_block self.createCollectionWithMultipleBlocks(streams, files_per_stream, data) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() for i in range(0, streams): self.assertIn('./stream' + str(i), collection2["manifest_text"]) for i in range(0, files_per_stream): self.assertIn('file' + str(i) + '.txt', collection2["manifest_text"]) # Read file contents self.readContentsOfCollectionWithMultipleBlocks(streams, files_per_stream, data) # Move file0.txt out of the streams into . self.moveFileFromCollectionWithMultipleBlocks(streams) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() manifest_streams = collection2['manifest_text'].split('\n') self.assertEqual(4, len(manifest_streams)) for i in range(0, streams): self.assertIn('file0.txt', manifest_streams[0]) for i in range(0, streams): self.assertNotIn('file0.txt', manifest_streams[i+1]) for i in range(0, streams): for j in range(1, files_per_stream): self.assertIn('file' + str(j) + '.txt', manifest_streams[i+1]) # Delete 'file1.txt' from all the streams self.removeFileFromCollectionWithMultipleBlocks(streams) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() manifest_streams = collection2['manifest_text'].split('\n') self.assertEqual(4, len(manifest_streams)) for i in range(0, streams): self.assertIn('file0.txt', manifest_streams[0]) self.assertNotIn('file1.txt', collection2['manifest_text']) for i in range(0, streams): for j in range(2, files_per_stream): self.assertIn('file' + str(j) + '.txt', manifest_streams[i+1]) # Create a collection with two streams, each with 200 files class CreateCollectionWithManyFilesAndMoveAndDeleteFile(MountTestBase): def setUp(self): super(CreateCollectionWithManyFilesAndMoveAndDeleteFile, self).setUp() @profiled def createCollectionWithManyFiles(self, streams, files_per_stream, data): self.pool.apply(fuse_CreateCollection, (self.mounttmp, streams, files_per_stream, data,)) @profiled def readContentsOfCollectionWithManyFiles(self, streams, files_per_stream, data): self.pool.apply(fuse_ReadContentsFromCollectionWithManyFiles, (self.mounttmp, streams, files_per_stream, data,)) @profiled def moveFileFromCollectionWithManyFiles(self, streams): for i in range(0, streams): self.pool.apply(fuse_MoveFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file0.txt',)) @profiled def removeFileFromCollectionWithManyFiles(self, streams): for i in range(0, streams): self.pool.apply(fuse_DeleteFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file1.txt')) def test_CreateCollectionWithManyFilesAndMoveAndDeleteFile(self): collection = arvados.collection.Collection(api_client=self.api) collection.save_new() m = self.make_mount(fuse.CollectionDirectory) with llfuse.lock: m.new_collection(collection.api_response(), collection) self.assertTrue(m.writable()) streams = 2 files_per_stream = 200 data = 'x' self.createCollectionWithManyFiles(streams, files_per_stream, data) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() for i in range(0, streams): self.assertIn('./stream' + str(i), collection2["manifest_text"]) for i in range(0, files_per_stream): self.assertIn('file' + str(i) + '.txt', collection2["manifest_text"]) # Read file contents self.readContentsOfCollectionWithManyFiles(streams, files_per_stream, data) # Move file0.txt out of the streams into . self.moveFileFromCollectionWithManyFiles(streams) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() manifest_streams = collection2['manifest_text'].split('\n') self.assertEqual(4, len(manifest_streams)) for i in range(0, streams): self.assertIn('file0.txt', manifest_streams[0]) for i in range(0, streams): self.assertNotIn('file0.txt', manifest_streams[i+1]) for i in range(0, streams): for j in range(1, files_per_stream): self.assertIn('file' + str(j) + '.txt', manifest_streams[i+1]) # Delete 'file1.txt' from all the streams self.removeFileFromCollectionWithManyFiles(streams) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() manifest_streams = collection2['manifest_text'].split('\n') self.assertEqual(4, len(manifest_streams)) for i in range(0, streams): self.assertIn('file0.txt', manifest_streams[0]) self.assertNotIn('file1.txt', collection2['manifest_text']) for i in range(0, streams): for j in range(2, files_per_stream): self.assertIn('file' + str(j) + '.txt', manifest_streams[i+1]) def magicDirTest_MoveFileFromCollection(mounttmp, collection1, collection2, stream, filename): class Test(unittest.TestCase): def runTest(self): #os.rename(os.path.join(mounttmp, collection1, stream, filename), os.path.join(mounttmp, collection2, stream, filename)) os.rename(os.path.join(mounttmp, collection1, filename), os.path.join(mounttmp, collection2, filename)) Test().runTest() def magicDirTest_RemoveFileFromCollection(mounttmp, collection1, stream, filename): class Test(unittest.TestCase): def runTest(self): os.remove(os.path.join(mounttmp, collection1, filename)) Test().runTest() class UsingMagicDir_CreateCollectionWithManyFilesAndMoveAndDeleteFile(MountTestBase): def setUp(self): super(UsingMagicDir_CreateCollectionWithManyFilesAndMoveAndDeleteFile, self).setUp() @profiled def magicDirTest_createCollectionWithManyFiles(self, streams=0, files_per_stream=0, data='x'): # Create collection collection = arvados.collection.Collection(api_client=self.api) for j in range(0, files_per_stream): with collection.open("file"+str(j)+".txt", "w") as f: f.write(data) collection.save_new() return collection @profiled def magicDirTest_readCollectionContents(self, collection, streams=1, files_per_stream=1, data='x'): mount_ls = os.listdir(os.path.join(self.mounttmp, collection)) files = {} for j in range(0, files_per_stream): files[os.path.join(self.mounttmp, collection, 'file'+str(j)+'.txt')] = data #files[os.path.join(self.mounttmp, collection, 'stream'+str(i)+'/file'+str(j)+'.txt')] = data for k, v in files.items(): with open(os.path.join(self.mounttmp, collection, k)) as f: self.assertEqual(v, f.read()) @profiled def magicDirTest_moveFileFromCollection(self, from_collection, to_collection): self.pool.apply(magicDirTest_MoveFileFromCollection, (self.mounttmp, from_collection.manifest_locator(), to_collection.manifest_locator(), 'stream0', 'file0.txt',)) from_collection.update() to_collection.update() @profiled def magicDirTest_removeFileFromCollection(self, collection): self.pool.apply(magicDirTest_RemoveFileFromCollection, (self.mounttmp, collection.manifest_locator(), 'stream0', 'file1.txt',)) collection.update() def test_UsingMagicDirCreateCollectionWithManyFilesAndMoveAndDeleteFile(self): streams = 2 files_per_stream = 200 data = 'x' collection1 = self.magicDirTest_createCollectionWithManyFiles() # Create collection with multiple files collection2 = self.magicDirTest_createCollectionWithManyFiles(streams, files_per_stream, data) # Mount FuseMagicDir self.make_mount(fuse.MagicDirectory) self.magicDirTest_readCollectionContents(collection2.manifest_locator(), streams, files_per_stream, data) # Move file0.txt out of the collection2 into collection1 self.magicDirTest_moveFileFromCollection(collection2, collection1) updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute() self.assertFalse('file0.txt' in updated_collection['manifest_text']) self.assertTrue('file1.txt' in updated_collection['manifest_text']) # Delete file1.txt from collection2 self.magicDirTest_removeFileFromCollection(collection2) updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute() self.assertFalse('file1.txt' in updated_collection['manifest_text']) self.assertTrue('file2.txt' in updated_collection['manifest_text']) class UsingMagicDir_CreateCollectionWithManyFilesAndMoveAllFilesIntoAnother(MountTestBase): def setUp(self): super(UsingMagicDir_CreateCollectionWithManyFilesAndMoveAllFilesIntoAnother, self).setUp() @profiled def magicDirTestMoveAllFiles_createCollectionWithManyFiles(self, streams=0, files_per_stream=0, blocks_per_file=0, bytes_per_block=0, data='x'): # Create collection collection = arvados.collection.Collection(api_client=self.api) for j in range(0, files_per_stream): with collection.open("file"+str(j)+".txt", "w") as f: f.write(data) collection.save_new() return collection @profiled def magicDirTestMoveAllFiles_moveFilesFromCollection(self, from_collection, to_collection, files_per_stream): for j in range(0, files_per_stream): self.pool.apply(magicDirTest_MoveFileFromCollection, (self.mounttmp, from_collection.manifest_locator(), to_collection.manifest_locator(), 'stream0', 'file'+str(j)+'.txt',)) from_collection.update() to_collection.update() def test_UsingMagicDirCreateCollectionWithManyFilesAndMoveAllFilesIntoAnother(self): streams = 2 files_per_stream = 200 data = 'x' collection1 = self.magicDirTestMoveAllFiles_createCollectionWithManyFiles() # Create collection with multiple files collection2 = self.magicDirTestMoveAllFiles_createCollectionWithManyFiles(streams, files_per_stream, data) # Mount FuseMagicDir self.make_mount(fuse.MagicDirectory) # Move all files from collection2 into collection1 self.magicDirTestMoveAllFiles_moveFilesFromCollection(collection2, collection1, files_per_stream) updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute() file_names = ["file%i.txt" % i for i in range(0, files_per_stream)] for name in file_names: self.assertFalse(name in updated_collection['manifest_text']) updated_collection = self.api.collections().get(uuid=collection1.manifest_locator()).execute() for name in file_names: self.assertTrue(name in updated_collection['manifest_text']) # Move one file at a time from one collection into another class UsingMagicDir_CreateCollectionWithManyFilesAndMoveEachFileIntoAnother(MountTestBase): def setUp(self): super(UsingMagicDir_CreateCollectionWithManyFilesAndMoveEachFileIntoAnother, self).setUp() @profiled def magicDirTestMoveFiles_createCollectionWithManyFiles(self, streams=0, files_per_stream=0, data='x'): # Create collection collection = arvados.collection.Collection(api_client=self.api) for j in range(0, files_per_stream): with collection.open("file"+str(j)+".txt", "w") as f: f.write(data) collection.save_new() return collection @profiled def magicDirTestMoveFiles_oneEachIntoAnother(self, from_collection, to_collection, files_per_stream): for j in range(0, files_per_stream): self.pool.apply(magicDirTest_MoveFileFromCollection, (self.mounttmp, from_collection.manifest_locator(), to_collection.manifest_locator(), 'stream0', 'file'+str(j)+'.txt',)) from_collection.update() to_collection.update() def test_UsingMagicDirCreateCollectionWithManyFilesAndMoveEachFileIntoAnother(self): streams = 2 files_per_stream = 200 data = 'x' collection1 = self.magicDirTestMoveFiles_createCollectionWithManyFiles() # Create collection with multiple files collection2 = self.magicDirTestMoveFiles_createCollectionWithManyFiles(streams, files_per_stream, data) # Mount FuseMagicDir self.make_mount(fuse.MagicDirectory) # Move all files from collection2 into collection1 self.magicDirTestMoveFiles_oneEachIntoAnother(collection2, collection1, files_per_stream) updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute() file_names = ["file%i.txt" % i for i in range(0, files_per_stream)] for name in file_names: self.assertFalse(name in updated_collection['manifest_text']) updated_collection = self.api.collections().get(uuid=collection1.manifest_locator()).execute() for name in file_names: self.assertTrue(name in updated_collection['manifest_text'])