import arvados import arvados.safeapi import arvados_fuse as fuse import glob import json import llfuse import os import shutil import subprocess import sys import tempfile import threading import time import unittest import logging import multiprocessing from .. import run_test_server from ..mount_test_base import MountTestBase logger = logging.getLogger('arvados.arv-mount') from performance_profiler import profiled def fuse_CreateCollection(mounttmp, streams=1, files_per_stream=1, data='x'): class Test(unittest.TestCase): def runTest(self): for i in range(0, streams): os.mkdir(os.path.join(mounttmp, "./stream" + str(i))) # Create files for j in range(0, files_per_stream): with open(os.path.join(mounttmp, "./stream" + str(i), "file" + str(j) +".txt"), "w") as f: f.write(data) Test().runTest() def fuse_ReadContentsFromCollectionWithManyFiles(mounttmp, streams=1, files_per_stream=1, data='x'): class Test(unittest.TestCase): def runTest(self): for i in range(0, streams): d1 = llfuse.listdir(os.path.join(mounttmp, 'stream'+str(i))) for j in range(0, files_per_stream): with open(os.path.join(mounttmp, 'stream'+str(i), 'file'+str(i)+'.txt')) as f: self.assertEqual(data, f.read()) Test().runTest() def fuse_MoveFileFromCollectionWithManyFiles(mounttmp, stream, filename): class Test(unittest.TestCase): def runTest(self): d1 = llfuse.listdir(os.path.join(mounttmp, stream)) self.assertIn(filename, d1) os.rename(os.path.join(mounttmp, stream, filename), os.path.join(mounttmp, 'moved_from_'+stream+'_'+filename)) d1 = llfuse.listdir(os.path.join(mounttmp)) self.assertIn('moved_from_'+stream+'_'+filename, d1) d1 = llfuse.listdir(os.path.join(mounttmp, stream)) self.assertNotIn(filename, d1) Test().runTest() def fuse_DeleteFileFromCollectionWithManyFiles(mounttmp, stream, filename): class Test(unittest.TestCase): def runTest(self): os.remove(os.path.join(mounttmp, stream, filename)) Test().runTest() # Create a collection with 2 streams, 3 files_per_stream, 2 blocks_per_file, 2**26 bytes_per_block class CreateCollectionWithMultipleBlocksAndMoveAndDeleteFile(MountTestBase): def setUp(self): super(CreateCollectionWithMultipleBlocksAndMoveAndDeleteFile, self).setUp() @profiled def createCollectionWithMultipleBlocks(self, streams, files_per_stream, data): self.pool.apply(fuse_CreateCollection, (self.mounttmp, streams, files_per_stream, data,)) @profiled def readContentsOfCollectionWithMultipleBlocks(self, streams, files_per_stream, data): self.pool.apply(fuse_ReadContentsFromCollectionWithManyFiles, (self.mounttmp, streams, files_per_stream, data,)) @profiled def moveFileFromCollectionWithMultipleBlocks(self, streams): for i in range(0, streams): self.pool.apply(fuse_MoveFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file0.txt',)) @profiled def removeFileFromCollectionWithMultipleBlocks(self, streams): for i in range(0, streams): self.pool.apply(fuse_DeleteFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file1.txt')) def test_CreateCollectionWithManyBlocksAndMoveAndDeleteFile(self): collection = arvados.collection.Collection(api_client=self.api) collection.save_new() m = self.make_mount(fuse.CollectionDirectory) with llfuse.lock: m.new_collection(collection.api_response(), collection) self.assertTrue(m.writable()) streams = 2 files_per_stream = 3 blocks_per_file = 2 bytes_per_block = 2**26 data = 'x' * blocks_per_file * bytes_per_block self.createCollectionWithMultipleBlocks(streams, files_per_stream, data) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() for i in range(0, streams): self.assertIn('./stream' + str(i), collection2["manifest_text"]) for i in range(0, files_per_stream): self.assertIn('file' + str(i) + '.txt', collection2["manifest_text"]) # Read file contents self.readContentsOfCollectionWithMultipleBlocks(streams, files_per_stream, data) # Move file0.txt out of the streams into . self.moveFileFromCollectionWithMultipleBlocks(streams) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() manifest_streams = collection2['manifest_text'].split('\n') self.assertEqual(4, len(manifest_streams)) for i in range(0, streams): self.assertIn('file0.txt', manifest_streams[0]) for i in range(0, streams): self.assertNotIn('file0.txt', manifest_streams[i+1]) for i in range(0, streams): for j in range(1, files_per_stream): self.assertIn('file' + str(j) + '.txt', manifest_streams[i+1]) # Delete 'file1.txt' from all the streams self.removeFileFromCollectionWithMultipleBlocks(streams) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() manifest_streams = collection2['manifest_text'].split('\n') self.assertEqual(4, len(manifest_streams)) for i in range(0, streams): self.assertIn('file0.txt', manifest_streams[0]) self.assertNotIn('file1.txt', collection2['manifest_text']) for i in range(0, streams): for j in range(2, files_per_stream): self.assertIn('file' + str(j) + '.txt', manifest_streams[i+1]) # Create a collection with two streams, each with 200 files class CreateCollectionWithManyFilesAndMoveAndDeleteFile(MountTestBase): def setUp(self): super(CreateCollectionWithManyFilesAndMoveAndDeleteFile, self).setUp() @profiled def createCollectionWithManyFiles(self, streams, files_per_stream, data): self.pool.apply(fuse_CreateCollection, (self.mounttmp, streams, files_per_stream, data,)) @profiled def readContentsOfCollectionWithManyFiles(self, streams, files_per_stream, data): self.pool.apply(fuse_ReadContentsFromCollectionWithManyFiles, (self.mounttmp, streams, files_per_stream, data,)) @profiled def moveFileFromCollectionWithManyFiles(self, streams): for i in range(0, streams): self.pool.apply(fuse_MoveFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file0.txt',)) @profiled def removeFileFromCollectionWithManyFiles(self, streams): for i in range(0, streams): self.pool.apply(fuse_DeleteFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file1.txt')) def test_CreateCollectionWithManyFilesAndMoveAndDeleteFile(self): collection = arvados.collection.Collection(api_client=self.api) collection.save_new() m = self.make_mount(fuse.CollectionDirectory) with llfuse.lock: m.new_collection(collection.api_response(), collection) self.assertTrue(m.writable()) streams = 2 files_per_stream = 200 data = 'x' self.createCollectionWithManyFiles(streams, files_per_stream, data) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() for i in range(0, streams): self.assertIn('./stream' + str(i), collection2["manifest_text"]) for i in range(0, files_per_stream): self.assertIn('file' + str(i) + '.txt', collection2["manifest_text"]) # Read file contents self.readContentsOfCollectionWithManyFiles(streams, files_per_stream, data) # Move file0.txt out of the streams into . self.moveFileFromCollectionWithManyFiles(streams) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() manifest_streams = collection2['manifest_text'].split('\n') self.assertEqual(4, len(manifest_streams)) for i in range(0, streams): self.assertIn('file0.txt', manifest_streams[0]) for i in range(0, streams): self.assertNotIn('file0.txt', manifest_streams[i+1]) for i in range(0, streams): for j in range(1, files_per_stream): self.assertIn('file' + str(j) + '.txt', manifest_streams[i+1]) # Delete 'file1.txt' from all the streams self.removeFileFromCollectionWithManyFiles(streams) collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() manifest_streams = collection2['manifest_text'].split('\n') self.assertEqual(4, len(manifest_streams)) for i in range(0, streams): self.assertIn('file0.txt', manifest_streams[0]) self.assertNotIn('file1.txt', collection2['manifest_text']) for i in range(0, streams): for j in range(2, files_per_stream): self.assertIn('file' + str(j) + '.txt', manifest_streams[i+1]) def magicDirTest_MoveFileFromCollection(mounttmp, collection1, collection2, stream, filename): class Test(unittest.TestCase): def runTest(self): #os.rename(os.path.join(mounttmp, collection1, stream, filename), os.path.join(mounttmp, collection2, stream, filename)) os.rename(os.path.join(mounttmp, collection1, filename), os.path.join(mounttmp, collection2, filename)) Test().runTest() def magicDirTest_RemoveFileFromCollection(mounttmp, collection1, stream, filename): class Test(unittest.TestCase): def runTest(self): os.remove(os.path.join(mounttmp, collection1, filename)) Test().runTest() class UsingMagicDir_CreateCollectionWithManyFilesAndMoveAndDeleteFile(MountTestBase): def setUp(self): super(UsingMagicDir_CreateCollectionWithManyFilesAndMoveAndDeleteFile, self).setUp() @profiled def magicDirTest_createCollectionWithManyFiles(self, streams=0, files_per_stream=0, data='x'): # Create collection collection = arvados.collection.Collection(api_client=self.api) for j in range(0, files_per_stream): with collection.open("file"+str(j)+".txt", "w") as f: f.write(data) collection.save_new() return collection @profiled def magicDirTest_readCollectionContents(self, collection, streams=1, files_per_stream=1, data='x'): mount_ls = os.listdir(os.path.join(self.mounttmp, collection)) files = {} for j in range(0, files_per_stream): files[os.path.join(self.mounttmp, collection, 'file'+str(j)+'.txt')] = data #files[os.path.join(self.mounttmp, collection, 'stream'+str(i)+'/file'+str(j)+'.txt')] = data for k, v in files.items(): with open(os.path.join(self.mounttmp, collection, k)) as f: self.assertEqual(v, f.read()) @profiled def magicDirTest_moveFileFromCollection(self, from_collection, to_collection): self.pool.apply(magicDirTest_MoveFileFromCollection, (self.mounttmp, from_collection.manifest_locator(), to_collection.manifest_locator(), 'stream0', 'file0.txt',)) from_collection.update() to_collection.update() @profiled def magicDirTest_removeFileFromCollection(self, collection): self.pool.apply(magicDirTest_RemoveFileFromCollection, (self.mounttmp, collection.manifest_locator(), 'stream0', 'file1.txt',)) collection.update() def test_UsingMagicDirCreateCollectionWithManyFilesAndMoveAndDeleteFile(self): streams = 2 files_per_stream = 200 data = 'x' collection1 = self.magicDirTest_createCollectionWithManyFiles() # Create collection with multiple files collection2 = self.magicDirTest_createCollectionWithManyFiles(streams, files_per_stream, data) # Mount FuseMagicDir self.make_mount(fuse.MagicDirectory) self.magicDirTest_readCollectionContents(collection2.manifest_locator(), streams, files_per_stream, data) # Move file0.txt out of the collection2 into collection1 self.magicDirTest_moveFileFromCollection(collection2, collection1) updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute() self.assertFalse('file0.txt' in updated_collection['manifest_text']) self.assertTrue('file1.txt' in updated_collection['manifest_text']) # Delete file1.txt from collection2 self.magicDirTest_removeFileFromCollection(collection2) updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute() self.assertFalse('file1.txt' in updated_collection['manifest_text']) self.assertTrue('file2.txt' in updated_collection['manifest_text']) class UsingMagicDir_CreateCollectionWithManyFilesAndMoveAllFilesIntoAnother(MountTestBase): def setUp(self): super(UsingMagicDir_CreateCollectionWithManyFilesAndMoveAllFilesIntoAnother, self).setUp() @profiled def magicDirTestMoveAllFiles_createCollectionWithManyFiles(self, streams=0, files_per_stream=0, blocks_per_file=0, bytes_per_block=0, data='x'): # Create collection collection = arvados.collection.Collection(api_client=self.api) for j in range(0, files_per_stream): with collection.open("file"+str(j)+".txt", "w") as f: f.write(data) collection.save_new() return collection @profiled def magicDirTestMoveAllFiles_moveFilesFromCollection(self, from_collection, to_collection, files_per_stream): for j in range(0, files_per_stream): self.pool.apply(magicDirTest_MoveFileFromCollection, (self.mounttmp, from_collection.manifest_locator(), to_collection.manifest_locator(), 'stream0', 'file'+str(j)+'.txt',)) from_collection.update() to_collection.update() def test_UsingMagicDirCreateCollectionWithManyFilesAndMoveAllFilesIntoAnother(self): streams = 2 files_per_stream = 200 data = 'x' collection1 = self.magicDirTestMoveAllFiles_createCollectionWithManyFiles() # Create collection with multiple files collection2 = self.magicDirTestMoveAllFiles_createCollectionWithManyFiles(streams, files_per_stream, data) # Mount FuseMagicDir self.make_mount(fuse.MagicDirectory) # Move all files from collection2 into collection1 self.magicDirTestMoveAllFiles_moveFilesFromCollection(collection2, collection1, files_per_stream) updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute() file_names = ["file%i.txt" % i for i in range(0, files_per_stream)] for name in file_names: self.assertFalse(name in updated_collection['manifest_text']) updated_collection = self.api.collections().get(uuid=collection1.manifest_locator()).execute() for name in file_names: self.assertTrue(name in updated_collection['manifest_text']) # Move one file at a time from one collection into another class UsingMagicDir_CreateCollectionWithManyFilesAndMoveEachFileIntoAnother(MountTestBase): def setUp(self): super(UsingMagicDir_CreateCollectionWithManyFilesAndMoveEachFileIntoAnother, self).setUp() @profiled def magicDirTestMoveFiles_createCollectionWithManyFiles(self, streams=0, files_per_stream=0, data='x'): # Create collection collection = arvados.collection.Collection(api_client=self.api) for j in range(0, files_per_stream): with collection.open("file"+str(j)+".txt", "w") as f: f.write(data) collection.save_new() return collection @profiled def magicDirTestMoveFiles_oneEachIntoAnother(self, from_collection, to_collection, files_per_stream): for j in range(0, files_per_stream): self.pool.apply(magicDirTest_MoveFileFromCollection, (self.mounttmp, from_collection.manifest_locator(), to_collection.manifest_locator(), 'stream0', 'file'+str(j)+'.txt',)) from_collection.update() to_collection.update() def test_UsingMagicDirCreateCollectionWithManyFilesAndMoveEachFileIntoAnother(self): streams = 2 files_per_stream = 200 data = 'x' collection1 = self.magicDirTestMoveFiles_createCollectionWithManyFiles() # Create collection with multiple files collection2 = self.magicDirTestMoveFiles_createCollectionWithManyFiles(streams, files_per_stream, data) # Mount FuseMagicDir self.make_mount(fuse.MagicDirectory) # Move all files from collection2 into collection1 self.magicDirTestMoveFiles_oneEachIntoAnother(collection2, collection1, files_per_stream) updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute() file_names = ["file%i.txt" % i for i in range(0, files_per_stream)] for name in file_names: self.assertFalse(name in updated_collection['manifest_text']) updated_collection = self.api.collections().get(uuid=collection1.manifest_locator()).execute() for name in file_names: self.assertTrue(name in updated_collection['manifest_text'])