#
# SPDX-License-Identifier: AGPL-3.0
+from __future__ import absolute_import
+from future.utils import viewitems
+from builtins import str
+from builtins import object
import json
import llfuse
import logging
import arvados
import arvados_fuse as fuse
-import run_test_server
+from . import run_test_server
-from mount_test_base import MountTestBase
+from .mount_test_base import MountTestBase
logger = logging.getLogger('arvados.arv-mount')
self.done = False
return self
- def next(self):
+ def __next__(self):
if self.done:
raise StopIteration
return self.attempt
cw.write("data 8")
cw.start_new_stream('edgecases')
- for f in ":/.../-/*/\x01\\/ ".split("/"):
+ for f in ":/.../-/*/ ".split("/"):
cw.start_new_file(f)
cw.write('x')
- for f in ":/.../-/*/\x01\\/ ".split("/"):
+ for f in ":/.../-/*/ ".split("/"):
cw.start_new_stream('edgecases/dirs/' + f)
cw.start_new_file('x/x')
cw.write('x')
self.assertDirContents('dir2', ['thing5.txt', 'thing6.txt', 'dir3'])
self.assertDirContents('dir2/dir3', ['thing7.txt', 'thing8.txt'])
self.assertDirContents('edgecases',
- "dirs/:/.../-/*/\x01\\/ ".split("/"))
+ "dirs/:/.../-/*/ ".split("/"))
self.assertDirContents('edgecases/dirs',
- ":/.../-/*/\x01\\/ ".split("/"))
+ ":/.../-/*/ ".split("/"))
files = {'thing1.txt': 'data 1',
'thing2.txt': 'data 2',
'dir2/dir3/thing7.txt': 'data 7',
'dir2/dir3/thing8.txt': 'data 8'}
- for k, v in files.items():
+ for k, v in viewitems(files):
with open(os.path.join(self.mounttmp, k)) as f:
self.assertEqual(v, f.read())
self.assertIn(self.testcollection,
llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
self.assertIn(self.test_project, mount_ls)
- self.assertIn(self.test_project,
+ self.assertIn(self.test_project,
llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
with self.assertRaises(OSError):
llfuse.listdir(os.path.join(self.mounttmp, 'by_id', self.non_project_group))
files = {}
- files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
+ files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = b'data 1'
- for k, v in files.items():
- with open(os.path.join(self.mounttmp, k)) as f:
+ for k, v in viewitems(files):
+ with open(os.path.join(self.mounttmp, k), 'rb') as f:
self.assertEqual(v, f.read())
# check mtime on collection
st = os.stat(baz_path)
try:
- mtime = st.st_mtime_ns / 1000000000
+ mtime = st.st_mtime_ns // 1000000000
except AttributeError:
mtime = st.st_mtime
self.assertEqual(mtime, 1391448174)
'anonymously_accessible_project']
found_in = 0
found_not_in = 0
- for name, item in run_test_server.fixture('collections').iteritems():
+ for name, item in viewitems(run_test_server.fixture('collections')):
if 'name' not in item:
pass
elif item['owner_uuid'] == public_project['uuid']:
r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$')
self.pool.apply(fuseRmTestHelperDeleteFile, (self.mounttmp,))
- # Can't have empty directories :-( so manifest will be empty.
+ # Empty directories are represented by an empty file named "."
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
- self.assertEqual(collection2["manifest_text"], "")
+ self.assertRegexpMatches(collection2["manifest_text"],
+ r'./testdir d41d8cd98f00b204e9800998ecf8427e\+0\+A\S+ 0:0:\\056\n')
self.pool.apply(fuseRmTestHelperRmdir, (self.mounttmp,))
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
self.assertRegexpMatches(collection2["manifest_text"],
- r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt$')
+ r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A\S+ 0:12:file1\.txt\n\./testdir d41d8cd98f00b204e9800998ecf8427e\+0\+A\S+ 0:0:\\056\n')
def fuseRenameTestHelper(mounttmp):
llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
files = {}
- files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
+ files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = b'data 1'
- for k, v in files.items():
- with open(os.path.join(self.mounttmp, k)) as f:
+ for k, v in viewitems(files):
+ with open(os.path.join(self.mounttmp, k), 'rb') as f:
self.assertEqual(v, f.read())
# look up using uuid should fail when pdh_only is set