+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+from __future__ import absolute_import
+from future.utils import viewitems
+from builtins import str
+from builtins import range
import arvados
-import arvados.safeapi
import arvados_fuse as fuse
-import glob
-import json
import llfuse
+import logging
import os
-import shutil
-import subprocess
import sys
-import tempfile
-import threading
-import time
import unittest
-import logging
-import multiprocessing
from .. import run_test_server
from ..mount_test_base import MountTestBase
+from ..slow_test import slow_test
logger = logging.getLogger('arvados.arv-mount')
-from performance_profiler import profiled
+from .performance_profiler import profiled
-@profiled
-def fuseCreateCollectionWithManyFiles(mounttmp, streams=1, files_per_stream=1, blocks_per_file=1, bytes_per_block=1, data='x'):
+def fuse_createCollectionWithMultipleBlocks(mounttmp, streams=1, files_per_stream=1, data='x'):
class Test(unittest.TestCase):
def runTest(self):
- names = 'file0.txt'
- for i in range(1, files_per_stream):
- names += ',file' + str(i) + '.txt'
- file_names = names.split(',')
+ self.createCollectionWithMultipleBlocks()
+ @profiled
+ def createCollectionWithMultipleBlocks(self):
for i in range(0, streams):
- with self.assertRaises(IOError):
- with open(os.path.join(mounttmp, "./stream", "file0.txt"), "w") as f:
+ os.mkdir(os.path.join(mounttmp, "./stream" + str(i)))
+
+ # Create files
+ for j in range(0, files_per_stream):
+ with open(os.path.join(mounttmp, "./stream" + str(i), "file" + str(j) +".txt"), "w") as f:
f.write(data)
- os.mkdir(os.path.join(mounttmp, "./stream" + str(i)))
+ Test().runTest()
+
+def fuse_readContentsFromCollectionWithMultipleBlocks(mounttmp, streams=1, files_per_stream=1, data='x'):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ self.readContentsFromCollectionWithMultipleBlocks()
+
+ @profiled
+ def readContentsFromCollectionWithMultipleBlocks(self):
+ for i in range(0, streams):
+ d1 = llfuse.listdir(os.path.join(mounttmp, 'stream'+str(i)))
+ for j in range(0, files_per_stream):
+ with open(os.path.join(mounttmp, 'stream'+str(i), 'file'+str(i)+'.txt')) as f:
+ self.assertEqual(data, f.read())
+
+ Test().runTest()
+
+def fuse_moveFileFromCollectionWithMultipleBlocks(mounttmp, stream, filename):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ self.moveFileFromCollectionWithMultipleBlocks()
+
+ @profiled
+ def moveFileFromCollectionWithMultipleBlocks(self):
+ d1 = llfuse.listdir(os.path.join(mounttmp, stream))
+ self.assertIn(filename, d1)
+
+ os.rename(os.path.join(mounttmp, stream, filename), os.path.join(mounttmp, 'moved_from_'+stream+'_'+filename))
+
+ d1 = llfuse.listdir(os.path.join(mounttmp))
+ self.assertIn('moved_from_'+stream+'_'+filename, d1)
+
+ d1 = llfuse.listdir(os.path.join(mounttmp, stream))
+ self.assertNotIn(filename, d1)
+
+ Test().runTest()
+
+def fuse_deleteFileFromCollectionWithMultipleBlocks(mounttmp, stream, filename):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ self.deleteFileFromCollectionWithMultipleBlocks()
+
+ @profiled
+ def deleteFileFromCollectionWithMultipleBlocks(self):
+ os.remove(os.path.join(mounttmp, stream, filename))
+
+ Test().runTest()
+
+# Create a collection with 2 streams, 3 files_per_stream, 2 blocks_per_file, 2**26 bytes_per_block
+class CreateCollectionWithMultipleBlocksAndMoveAndDeleteFile(MountTestBase):
+ def setUp(self):
+ super(CreateCollectionWithMultipleBlocksAndMoveAndDeleteFile, self).setUp()
+
+ @slow_test
+ def test_CreateCollectionWithManyBlocksAndMoveAndDeleteFile(self):
+ collection = arvados.collection.Collection(api_client=self.api)
+ collection.save_new()
+
+ m = self.make_mount(fuse.CollectionDirectory)
+ with llfuse.lock:
+ m.new_collection(collection.api_response(), collection)
+ self.assertTrue(m.writable())
+
+ streams = 2
+ files_per_stream = 3
+ blocks_per_file = 2
+ bytes_per_block = 2**26
+
+ data = 'x' * blocks_per_file * bytes_per_block
+
+ self.pool.apply(fuse_createCollectionWithMultipleBlocks, (self.mounttmp, streams, files_per_stream, data,))
+
+ collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+
+ for i in range(0, streams):
+ self.assertIn('./stream' + str(i), collection2["manifest_text"])
+
+ for i in range(0, files_per_stream):
+ self.assertIn('file' + str(i) + '.txt', collection2["manifest_text"])
+
+ # Read file contents
+ self.pool.apply(fuse_readContentsFromCollectionWithMultipleBlocks, (self.mounttmp, streams, files_per_stream, data,))
+
+ # Move file0.txt out of the streams into .
+ for i in range(0, streams):
+ self.pool.apply(fuse_moveFileFromCollectionWithMultipleBlocks, (self.mounttmp, 'stream'+str(i), 'file0.txt',))
- with self.assertRaises(OSError):
- os.mkdir(os.path.join(mounttmp, "./stream" + str(i)))
+ collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+
+ manifest_streams = collection2['manifest_text'].split('\n')
+ self.assertEqual(4, len(manifest_streams))
+
+ for i in range(0, streams):
+ self.assertIn('file0.txt', manifest_streams[0])
+
+ for i in range(0, streams):
+ self.assertNotIn('file0.txt', manifest_streams[i+1])
+
+ for i in range(0, streams):
+ for j in range(1, files_per_stream):
+ self.assertIn('file' + str(j) + '.txt', manifest_streams[i+1])
+
+ # Delete 'file1.txt' from all the streams
+ for i in range(0, streams):
+ self.pool.apply(fuse_deleteFileFromCollectionWithMultipleBlocks, (self.mounttmp, 'stream'+str(i), 'file1.txt'))
+
+ collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+
+ manifest_streams = collection2['manifest_text'].split('\n')
+ self.assertEqual(4, len(manifest_streams))
+
+ for i in range(0, streams):
+ self.assertIn('file0.txt', manifest_streams[0])
+
+ self.assertNotIn('file1.txt', collection2['manifest_text'])
+
+ for i in range(0, streams):
+ for j in range(2, files_per_stream):
+ self.assertIn('file' + str(j) + '.txt', manifest_streams[i+1])
+
+
+def fuse_createCollectionWithManyFiles(mounttmp, streams=1, files_per_stream=1, data='x'):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ self.createCollectionWithManyFiles()
+
+ @profiled
+ def createCollectionWithManyFiles(self):
+ for i in range(0, streams):
+ os.mkdir(os.path.join(mounttmp, "./stream" + str(i)))
# Create files
for j in range(0, files_per_stream):
with open(os.path.join(mounttmp, "./stream" + str(i), "file" + str(j) +".txt"), "w") as f:
f.write(data)
- d1 = llfuse.listdir(os.path.join(mounttmp, "./stream" + str(i)))
- self.assertEqual(sorted(file_names), sorted(d1))
-
Test().runTest()
-@profiled
-def fuseReadContentsFromCollectionWithManyFiles(mounttmp, streams, files_per_stream, content):
+def fuse_readContentsFromCollectionWithManyFiles(mounttmp, streams=1, files_per_stream=1, data='x'):
class Test(unittest.TestCase):
def runTest(self):
+ self.readContentsFromCollectionWithManyFiles()
+
+ @profiled
+ def readContentsFromCollectionWithManyFiles(self):
for i in range(0, streams):
d1 = llfuse.listdir(os.path.join(mounttmp, 'stream'+str(i)))
for j in range(0, files_per_stream):
with open(os.path.join(mounttmp, 'stream'+str(i), 'file'+str(i)+'.txt')) as f:
- self.assertEqual(content, f.read())
+ self.assertEqual(data, f.read())
Test().runTest()
-@profiled
-def fuseMoveFileFromCollectionWithManyFiles(mounttmp, stream, filename):
+def fuse_moveFileFromCollectionWithManyFiles(mounttmp, stream, filename):
class Test(unittest.TestCase):
def runTest(self):
+ self.moveFileFromCollectionWithManyFiles()
+
+ @profiled
+ def moveFileFromCollectionWithManyFiles(self):
d1 = llfuse.listdir(os.path.join(mounttmp, stream))
self.assertIn(filename, d1)
- os.rename(os.path.join(mounttmp, stream, filename), os.path.join(mounttmp, 'moved-from-'+stream+'-'+filename))
+ os.rename(os.path.join(mounttmp, stream, filename), os.path.join(mounttmp, 'moved_from_'+stream+'_'+filename))
d1 = llfuse.listdir(os.path.join(mounttmp))
- self.assertIn('moved-from-'+stream+'-'+filename, d1)
+ self.assertIn('moved_from_'+stream+'_'+filename, d1)
d1 = llfuse.listdir(os.path.join(mounttmp, stream))
self.assertNotIn(filename, d1)
Test().runTest()
-@profiled
-def fuseDeleteFileFromCollectionWithManyFiles(mounttmp, stream, filename):
+def fuse_deleteFileFromCollectionWithManyFiles(mounttmp, stream, filename):
class Test(unittest.TestCase):
def runTest(self):
- d1 = llfuse.listdir(os.path.join(mounttmp, stream))
+ self.deleteFileFromCollectionWithManyFiles()
- # Delete file
+ @profiled
+ def deleteFileFromCollectionWithManyFiles(self):
os.remove(os.path.join(mounttmp, stream, filename))
- # Try to delete it again
- with self.assertRaises(OSError):
- os.remove(os.path.join(mounttmp, "testdir", "file1.txt"))
-
Test().runTest()
# Create a collection with two streams, each with 200 files
-class CreateCollectionWithManyFilesAndRenameMoveAndDeleteFile(MountTestBase):
- def runTest(self):
+class CreateCollectionWithManyFilesAndMoveAndDeleteFile(MountTestBase):
+ def setUp(self):
+ super(CreateCollectionWithManyFilesAndMoveAndDeleteFile, self).setUp()
+
+ @slow_test
+ def test_CreateCollectionWithManyFilesAndMoveAndDeleteFile(self):
collection = arvados.collection.Collection(api_client=self.api)
collection.save_new()
streams = 2
files_per_stream = 200
- blocks_per_file = 1
- bytes_per_block = 1
-
- data = 'x' * blocks_per_file * bytes_per_block
+ data = 'x'
- self.pool.apply(fuseCreateCollectionWithManyFiles, (self.mounttmp, streams, files_per_stream, blocks_per_file, bytes_per_block, data))
+ self.pool.apply(fuse_createCollectionWithManyFiles, (self.mounttmp, streams, files_per_stream, data,))
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
self.assertIn('file' + str(i) + '.txt', collection2["manifest_text"])
# Read file contents
- self.pool.apply(fuseReadContentsFromCollectionWithManyFiles, (self.mounttmp, streams, files_per_stream, data,))
+ self.pool.apply(fuse_readContentsFromCollectionWithManyFiles, (self.mounttmp, streams, files_per_stream, data,))
# Move file0.txt out of the streams into .
for i in range(0, streams):
- self.pool.apply(fuseMoveFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file0.txt',))
+ self.pool.apply(fuse_moveFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file0.txt',))
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
self.assertEqual(4, len(manifest_streams))
for i in range(0, streams):
- self.assertIn('moved-from-stream'+str(i)+'-file0.txt', manifest_streams[0])
+ self.assertIn('file0.txt', manifest_streams[0])
for i in range(0, streams):
self.assertNotIn('file0.txt', manifest_streams[i+1])
# Delete 'file1.txt' from all the streams
for i in range(0, streams):
- self.pool.apply(fuseDeleteFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file1.txt'))
+ self.pool.apply(fuse_deleteFileFromCollectionWithManyFiles, (self.mounttmp, 'stream'+str(i), 'file1.txt'))
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
self.assertEqual(4, len(manifest_streams))
for i in range(0, streams):
- self.assertIn('moved-from-stream'+str(i)+'-file0.txt', manifest_streams[0])
+ self.assertIn('file0.txt', manifest_streams[0])
self.assertNotIn('file1.txt', collection2['manifest_text'])
for i in range(0, streams):
for j in range(2, files_per_stream):
self.assertIn('file' + str(j) + '.txt', manifest_streams[i+1])
+
+
+def magicDirTest_MoveFileFromCollection(mounttmp, collection1, collection2, stream, filename):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ self.magicDirTest_moveFileFromCollection()
+
+ @profiled
+ def magicDirTest_moveFileFromCollection(self):
+ os.rename(os.path.join(mounttmp, collection1, filename), os.path.join(mounttmp, collection2, filename))
+
+ Test().runTest()
+
+def magicDirTest_RemoveFileFromCollection(mounttmp, collection1, stream, filename):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ self.magicDirTest_removeFileFromCollection()
+
+ @profiled
+ def magicDirTest_removeFileFromCollection(self):
+ os.remove(os.path.join(mounttmp, collection1, filename))
+
+ Test().runTest()
+
+class UsingMagicDir_CreateCollectionWithManyFilesAndMoveAndDeleteFile(MountTestBase):
+ def setUp(self):
+ super(UsingMagicDir_CreateCollectionWithManyFilesAndMoveAndDeleteFile, self).setUp()
+
+ @profiled
+ def magicDirTest_createCollectionWithManyFiles(self, streams=0, files_per_stream=0, data='x'):
+ # Create collection
+ collection = arvados.collection.Collection(api_client=self.api)
+ for j in range(0, files_per_stream):
+ with collection.open("file"+str(j)+".txt", "w") as f:
+ f.write(data)
+ collection.save_new()
+ return collection
+
+ @profiled
+ def magicDirTest_readCollectionContents(self, collection, streams=1, files_per_stream=1, data='x'):
+ mount_ls = os.listdir(os.path.join(self.mounttmp, collection))
+
+ files = {}
+ for j in range(0, files_per_stream):
+ files[os.path.join(self.mounttmp, collection, 'file'+str(j)+'.txt')] = data
+
+ for k, v in viewItems(files):
+ with open(os.path.join(self.mounttmp, collection, k)) as f:
+ self.assertEqual(v, f.read())
+
+ @slow_test
+ def test_UsingMagicDirCreateCollectionWithManyFilesAndMoveAndDeleteFile(self):
+ streams = 2
+ files_per_stream = 200
+ data = 'x'
+
+ collection1 = self.magicDirTest_createCollectionWithManyFiles()
+ # Create collection with multiple files
+ collection2 = self.magicDirTest_createCollectionWithManyFiles(streams, files_per_stream, data)
+
+ # Mount FuseMagicDir
+ self.make_mount(fuse.MagicDirectory)
+
+ self.magicDirTest_readCollectionContents(collection2.manifest_locator(), streams, files_per_stream, data)
+
+ # Move file0.txt out of the collection2 into collection1
+ self.pool.apply(magicDirTest_MoveFileFromCollection, (self.mounttmp, collection2.manifest_locator(),
+ collection1.manifest_locator(), 'stream0', 'file0.txt',))
+ updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute()
+ self.assertFalse('file0.txt' in updated_collection['manifest_text'])
+ self.assertTrue('file1.txt' in updated_collection['manifest_text'])
+
+ # Delete file1.txt from collection2
+ self.pool.apply(magicDirTest_RemoveFileFromCollection, (self.mounttmp, collection2.manifest_locator(), 'stream0', 'file1.txt',))
+ updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute()
+ self.assertFalse('file1.txt' in updated_collection['manifest_text'])
+ self.assertTrue('file2.txt' in updated_collection['manifest_text'])
+
+
+def magicDirTest_MoveAllFilesFromCollection(mounttmp, from_collection, to_collection, stream, files_per_stream):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ self.magicDirTest_moveAllFilesFromCollection()
+
+ @profiled
+ def magicDirTest_moveAllFilesFromCollection(self):
+ for j in range(0, files_per_stream):
+ os.rename(os.path.join(mounttmp, from_collection, 'file'+str(j)+'.txt'), os.path.join(mounttmp, to_collection, 'file'+str(j)+'.txt'))
+
+ Test().runTest()
+
+class UsingMagicDir_CreateCollectionWithManyFilesAndMoveAllFilesIntoAnother(MountTestBase):
+ def setUp(self):
+ super(UsingMagicDir_CreateCollectionWithManyFilesAndMoveAllFilesIntoAnother, self).setUp()
+
+ @profiled
+ def magicDirTestMoveAllFiles_createCollectionWithManyFiles(self, streams=0, files_per_stream=0,
+ blocks_per_file=0, bytes_per_block=0, data='x'):
+ # Create collection
+ collection = arvados.collection.Collection(api_client=self.api)
+ for j in range(0, files_per_stream):
+ with collection.open("file"+str(j)+".txt", "w") as f:
+ f.write(data)
+ collection.save_new()
+ return collection
+
+ @slow_test
+ def test_UsingMagicDirCreateCollectionWithManyFilesAndMoveAllFilesIntoAnother(self):
+ streams = 2
+ files_per_stream = 200
+ data = 'x'
+
+ collection1 = self.magicDirTestMoveAllFiles_createCollectionWithManyFiles()
+ # Create collection with multiple files
+ collection2 = self.magicDirTestMoveAllFiles_createCollectionWithManyFiles(streams, files_per_stream, data)
+
+ # Mount FuseMagicDir
+ self.make_mount(fuse.MagicDirectory)
+
+ # Move all files from collection2 into collection1
+ self.pool.apply(magicDirTest_MoveAllFilesFromCollection, (self.mounttmp, collection2.manifest_locator(),
+ collection1.manifest_locator(), 'stream0', files_per_stream,))
+
+ updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute()
+ file_names = ["file%i.txt" % i for i in range(0, files_per_stream)]
+ for name in file_names:
+ self.assertFalse(name in updated_collection['manifest_text'])
+
+ updated_collection = self.api.collections().get(uuid=collection1.manifest_locator()).execute()
+ for name in file_names:
+ self.assertTrue(name in updated_collection['manifest_text'])
+
+
+# Move one file at a time from one collection into another
+class UsingMagicDir_CreateCollectionWithManyFilesAndMoveEachFileIntoAnother(MountTestBase):
+ def setUp(self):
+ super(UsingMagicDir_CreateCollectionWithManyFilesAndMoveEachFileIntoAnother, self).setUp()
+
+ @profiled
+ def magicDirTestMoveFiles_createCollectionWithManyFiles(self, streams=0, files_per_stream=0, data='x'):
+ # Create collection
+ collection = arvados.collection.Collection(api_client=self.api)
+ for j in range(0, files_per_stream):
+ with collection.open("file"+str(j)+".txt", "w") as f:
+ f.write(data)
+ collection.save_new()
+ return collection
+
+ def magicDirTestMoveFiles_oneEachIntoAnother(self, from_collection, to_collection, files_per_stream):
+ for j in range(0, files_per_stream):
+ self.pool.apply(magicDirTest_MoveFileFromCollection, (self.mounttmp, from_collection.manifest_locator(),
+ to_collection.manifest_locator(), 'stream0', 'file'+str(j)+'.txt',))
+
+ @slow_test
+ def test_UsingMagicDirCreateCollectionWithManyFilesAndMoveEachFileIntoAnother(self):
+ streams = 2
+ files_per_stream = 200
+ data = 'x'
+
+ collection1 = self.magicDirTestMoveFiles_createCollectionWithManyFiles()
+ # Create collection with multiple files
+ collection2 = self.magicDirTestMoveFiles_createCollectionWithManyFiles(streams, files_per_stream, data)
+
+ # Mount FuseMagicDir
+ self.make_mount(fuse.MagicDirectory)
+
+ # Move all files from collection2 into collection1
+ self.magicDirTestMoveFiles_oneEachIntoAnother(collection2, collection1, files_per_stream)
+
+ updated_collection = self.api.collections().get(uuid=collection2.manifest_locator()).execute()
+ file_names = ["file%i.txt" % i for i in range(0, files_per_stream)]
+ for name in file_names:
+ self.assertFalse(name in updated_collection['manifest_text'])
+
+ updated_collection = self.api.collections().get(uuid=collection1.manifest_locator()).execute()
+ for name in file_names:
+ self.assertTrue(name in updated_collection['manifest_text'])
+
+class FuseListLargeProjectContents(MountTestBase):
+ @profiled
+ def getProjectWithManyCollections(self):
+ project_contents = llfuse.listdir(self.mounttmp)
+ self.assertEqual(201, len(project_contents))
+ self.assertIn('Collection_1', project_contents)
+
+ @profiled
+ def listContentsInProjectWithManyCollections(self):
+ project_contents = llfuse.listdir(self.mounttmp)
+ self.assertEqual(201, len(project_contents))
+ self.assertIn('Collection_1', project_contents)
+
+ for collection_name in project_contents:
+ collection_contents = llfuse.listdir(os.path.join(self.mounttmp, collection_name))
+ self.assertIn('baz', collection_contents)
+
+ @slow_test
+ def test_listLargeProjectContents(self):
+ self.make_mount(fuse.ProjectDirectory,
+ project_object=run_test_server.fixture('groups')['project_with_201_collections'])
+ self.getProjectWithManyCollections()
+ self.listContentsInProjectWithManyCollections()