7 class TaskOutputDirTest(unittest.TestCase):
9 self.tmp = tempfile.mkdtemp()
10 os.environ['TASK_KEEPMOUNT_TMP'] = self.tmp
13 os.environ.pop('TASK_KEEPMOUNT_TMP')
14 shutil.rmtree(self.tmp)
16 def test_env_var(self):
17 out = arvados.crunch.TaskOutputDir()
18 self.assertEqual(out.path, self.tmp)
20 with open(os.path.join(self.tmp, '.arvados#collection'), 'w') as f:
21 f.write('{\n "manifest_text":"",\n "uuid":null\n}\n')
22 self.assertEqual(out.manifest_text(), '')
24 # Special file must be re-read on each call to manifest_text().
25 with open(os.path.join(self.tmp, '.arvados#collection'), 'w') as f:
26 f.write(r'{"manifest_text":". unparsed 0:3:foo\n","uuid":null}')
27 self.assertEqual(out.manifest_text(), ". unparsed 0:3:foo\n")