#
# SPDX-License-Identifier: AGPL-3.0
-from __future__ import absolute_import
-from six import assertRegex
import arvados_fuse.command
import json
import multiprocessing
import os
-from . import run_test_server
+import shlex
import tempfile
import unittest
+from . import run_test_server
from .integration_test import workerPool
-try:
- from shlex import quote
-except:
- from pipes import quote
-
def try_exec(mnt, cmd):
try:
+ os.environ['KEEP_LOCAL_STORE'] = tempfile.mkdtemp()
arvados_fuse.command.Mount(
arvados_fuse.command.ArgumentParser().parse_args([
'--read-write',
def test_exec(self):
workerPool().apply(try_exec, (self.mnt, [
- 'sh', '-c',
- 'echo -n foo >{}; cp {} {}'.format(
- quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),
- quote(os.path.join(self.mnt, 'zzz', '.arvados#collection')),
- quote(os.path.join(self.okfile)))]))
+ 'sh', '-c', 'echo -n foo >{}; cp {} {}'.format(
+ shlex.quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),
+ shlex.quote(os.path.join(self.mnt, 'zzz', '.arvados#collection')),
+ shlex.quote(os.path.join(self.okfile)),
+ )]))
with open(self.okfile) as f:
- assertRegex(
- self,
- json.load(f)['manifest_text'],
- r' 0:3:foo.txt\n')
+ self.assertRegex(json.load(f)['manifest_text'], r' 0:3:foo.txt\n')