Add more checks in test cases.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>
return rid
def get_config_once(svc):
+ if not svc._rootDesc.get('resources')['configs']:
+ # Old API server version, no config export endpoint
+ return {}
if not hasattr(svc, '_cached_config'):
svc._cached_config = svc.configs().get().execute()
return svc._cached_config
self._entries = {}
self._mtime = time.time()
+ def forward_slash_subst(self):
+ if not hasattr(self, '_fsns'):
+ self._fsns = None
+ config = self.apiconfig()
+ try:
+ self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
+ except KeyError:
+ # old API server with no FSNS config
+ self._fsns = '_'
+ else:
+ if self._fsns == '' or self._fsns == '/':
+ self._fsns = None
+ return self._fsns
+
def unsanitize_filename(self, incoming):
"""Replace ForwardSlashNameSubstitution value with /"""
- fsns = self.apiconfig()["Collections"]["ForwardSlashNameSubstitution"]
- if isinstance(fsns, str) and fsns != '' and fsns != '/':
+ fsns = self.forward_slash_subst()
+ if isinstance(fsns, str):
return incoming.replace(fsns, '/')
else:
return incoming
elif dirty == '..':
return '__'
else:
- fsns = self.apiconfig()["Collections"]["ForwardSlashNameSubstitution"]
- if isinstance(fsns, str) and fsns != '':
+ fsns = self.forward_slash_subst()
+ if isinstance(fsns, str):
dirty = dirty.replace('/', fsns)
return _disallowed_filename_characters.sub('_', dirty)
def test_slash_substitution_before_listing(self, get_config_once):
get_config_once.return_value = {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
self.pool_test(os.path.join(self.mnt, 'zzz'), self.fusename)
+ self.checkContents()
@staticmethod
def _test_slash_substitution_before_listing(self, tmpdir, fusename):
with open(os.path.join(tmpdir, 'foo-bar-baz', 'waz'), 'w') as f:
- f.write('foo')
+ f.write('xxx')
with open(os.path.join(tmpdir, fusename, 'waz'), 'w') as f:
f.write('foo')
def test_slash_substitution_after_listing(self, get_config_once):
get_config_once.return_value = {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
self.pool_test(os.path.join(self.mnt, 'zzz'), self.fusename)
+ self.checkContents()
@staticmethod
def _test_slash_substitution_after_listing(self, tmpdir, fusename):
with open(os.path.join(tmpdir, 'foo-bar-baz', 'waz'), 'w') as f:
- f.write('foo')
+ f.write('xxx')
os.listdir(tmpdir)
with open(os.path.join(tmpdir, fusename, 'waz'), 'w') as f:
f.write('foo')
+
+ def checkContents(self):
+ self.assertRegexpMatches(self.api.collections().get(uuid=self.testcoll['uuid']).execute()['manifest_text'], ' acbd18db') # md5(foo)
+ self.assertRegexpMatches(self.api.collections().get(uuid=self.testcolleasy['uuid']).execute()['manifest_text'], ' f561aaf6') # md5(xxx)