10383: Merge branch 'master' into 10383-arv-put-incremental-upload
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import io
6 import mock
7 import os
8 import pwd
9 import re
10 import shutil
11 import subprocess
12 import sys
13 import tempfile
14 import time
15 import unittest
16 import yaml
17 import threading
18 import hashlib
19 import random
20
21 from cStringIO import StringIO
22
23 import arvados
24 import arvados.commands.put as arv_put
25 import arvados_testutil as tutil
26
27 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
28 import run_test_server
29
30 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
31     CACHE_ARGSET = [
32         [],
33         ['/dev/null'],
34         ['/dev/null', '--filename', 'empty'],
35         ['/tmp']
36         ]
37
38     def tearDown(self):
39         super(ArvadosPutResumeCacheTest, self).tearDown()
40         try:
41             self.last_cache.destroy()
42         except AttributeError:
43             pass
44
45     def cache_path_from_arglist(self, arglist):
46         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
47
48     def test_cache_names_stable(self):
49         for argset in self.CACHE_ARGSET:
50             self.assertEqual(self.cache_path_from_arglist(argset),
51                               self.cache_path_from_arglist(argset),
52                               "cache name changed for {}".format(argset))
53
54     def test_cache_names_unique(self):
55         results = []
56         for argset in self.CACHE_ARGSET:
57             path = self.cache_path_from_arglist(argset)
58             self.assertNotIn(path, results)
59             results.append(path)
60
61     def test_cache_names_simple(self):
62         # The goal here is to make sure the filename doesn't use characters
63         # reserved by the filesystem.  Feel free to adjust this regexp as
64         # long as it still does that.
65         bad_chars = re.compile(r'[^-\.\w]')
66         for argset in self.CACHE_ARGSET:
67             path = self.cache_path_from_arglist(argset)
68             self.assertFalse(bad_chars.search(os.path.basename(path)),
69                              "path too exotic: {}".format(path))
70
71     def test_cache_names_ignore_argument_order(self):
72         self.assertEqual(
73             self.cache_path_from_arglist(['a', 'b', 'c']),
74             self.cache_path_from_arglist(['c', 'a', 'b']))
75         self.assertEqual(
76             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
77             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
78
79     def test_cache_names_differ_for_similar_paths(self):
80         # This test needs names at / that don't exist on the real filesystem.
81         self.assertNotEqual(
82             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
83             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
84
85     def test_cache_names_ignore_irrelevant_arguments(self):
86         # Workaround: parse_arguments bails on --filename with a directory.
87         path1 = self.cache_path_from_arglist(['/tmp'])
88         args = arv_put.parse_arguments(['/tmp'])
89         args.filename = 'tmp'
90         path2 = arv_put.ResumeCache.make_path(args)
91         self.assertEqual(path1, path2,
92                          "cache path considered --filename for directory")
93         self.assertEqual(
94             self.cache_path_from_arglist(['-']),
95             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
96             "cache path considered --max-manifest-depth for file")
97
98     def test_cache_names_treat_negative_manifest_depths_identically(self):
99         base_args = ['/tmp', '--max-manifest-depth']
100         self.assertEqual(
101             self.cache_path_from_arglist(base_args + ['-1']),
102             self.cache_path_from_arglist(base_args + ['-2']))
103
104     def test_cache_names_treat_stdin_consistently(self):
105         self.assertEqual(
106             self.cache_path_from_arglist(['-', '--filename', 'test']),
107             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
108
109     def test_cache_names_identical_for_synonymous_names(self):
110         self.assertEqual(
111             self.cache_path_from_arglist(['.']),
112             self.cache_path_from_arglist([os.path.realpath('.')]))
113         testdir = self.make_tmpdir()
114         looplink = os.path.join(testdir, 'loop')
115         os.symlink(testdir, looplink)
116         self.assertEqual(
117             self.cache_path_from_arglist([testdir]),
118             self.cache_path_from_arglist([looplink]))
119
120     def test_cache_names_different_by_api_host(self):
121         config = arvados.config.settings()
122         orig_host = config.get('ARVADOS_API_HOST')
123         try:
124             name1 = self.cache_path_from_arglist(['.'])
125             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
126             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
127         finally:
128             if orig_host is None:
129                 del config['ARVADOS_API_HOST']
130             else:
131                 config['ARVADOS_API_HOST'] = orig_host
132
133     @mock.patch('arvados.keep.KeepClient.head')
134     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
135         keep_client_head.side_effect = [True]
136         thing = {}
137         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
138         with tempfile.NamedTemporaryFile() as cachefile:
139             self.last_cache = arv_put.ResumeCache(cachefile.name)
140         self.last_cache.save(thing)
141         self.last_cache.close()
142         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
143         self.assertNotEqual(None, resume_cache)
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_finished_streams(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
159         keep_client_head.side_effect = Exception('Locator not found')
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168         self.assertRaises(None, resume_cache.check_cache())
169
170     def test_basic_cache_storage(self):
171         thing = ['test', 'list']
172         with tempfile.NamedTemporaryFile() as cachefile:
173             self.last_cache = arv_put.ResumeCache(cachefile.name)
174         self.last_cache.save(thing)
175         self.assertEqual(thing, self.last_cache.load())
176
177     def test_empty_cache(self):
178         with tempfile.NamedTemporaryFile() as cachefile:
179             cache = arv_put.ResumeCache(cachefile.name)
180         self.assertRaises(ValueError, cache.load)
181
182     def test_cache_persistent(self):
183         thing = ['test', 'list']
184         path = os.path.join(self.make_tmpdir(), 'cache')
185         cache = arv_put.ResumeCache(path)
186         cache.save(thing)
187         cache.close()
188         self.last_cache = arv_put.ResumeCache(path)
189         self.assertEqual(thing, self.last_cache.load())
190
191     def test_multiple_cache_writes(self):
192         thing = ['short', 'list']
193         with tempfile.NamedTemporaryFile() as cachefile:
194             self.last_cache = arv_put.ResumeCache(cachefile.name)
195         # Start writing an object longer than the one we test, to make
196         # sure the cache file gets truncated.
197         self.last_cache.save(['long', 'long', 'list'])
198         self.last_cache.save(thing)
199         self.assertEqual(thing, self.last_cache.load())
200
201     def test_cache_is_locked(self):
202         with tempfile.NamedTemporaryFile() as cachefile:
203             cache = arv_put.ResumeCache(cachefile.name)
204             self.assertRaises(arv_put.ResumeCacheConflict,
205                               arv_put.ResumeCache, cachefile.name)
206
207     def test_cache_stays_locked(self):
208         with tempfile.NamedTemporaryFile() as cachefile:
209             self.last_cache = arv_put.ResumeCache(cachefile.name)
210             path = cachefile.name
211         self.last_cache.save('test')
212         self.assertRaises(arv_put.ResumeCacheConflict,
213                           arv_put.ResumeCache, path)
214
215     def test_destroy_cache(self):
216         cachefile = tempfile.NamedTemporaryFile(delete=False)
217         try:
218             cache = arv_put.ResumeCache(cachefile.name)
219             cache.save('test')
220             cache.destroy()
221             try:
222                 arv_put.ResumeCache(cachefile.name)
223             except arv_put.ResumeCacheConflict:
224                 self.fail("could not load cache after destroying it")
225             self.assertRaises(ValueError, cache.load)
226         finally:
227             if os.path.exists(cachefile.name):
228                 os.unlink(cachefile.name)
229
230     def test_restart_cache(self):
231         path = os.path.join(self.make_tmpdir(), 'cache')
232         cache = arv_put.ResumeCache(path)
233         cache.save('test')
234         cache.restart()
235         self.assertRaises(ValueError, cache.load)
236         self.assertRaises(arv_put.ResumeCacheConflict,
237                           arv_put.ResumeCache, path)
238
239
240 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
241                           ArvadosBaseTestCase):
242
243     def setUp(self):
244         super(ArvPutUploadJobTest, self).setUp()
245         run_test_server.authorize_with('active')
246         # Temp files creation
247         self.tempdir = tempfile.mkdtemp()
248         subdir = os.path.join(self.tempdir, 'subdir')
249         os.mkdir(subdir)
250         data = "x" * 1024 # 1 KB
251         for i in range(1, 5):
252             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
253                 f.write(data * i)
254         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
255             f.write(data * 5)
256         # Large temp file for resume test
257         _, self.large_file_name = tempfile.mkstemp()
258         fileobj = open(self.large_file_name, 'w')
259         # Make sure to write just a little more than one block
260         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
261             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
262             fileobj.write(data)
263         fileobj.close()
264         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
265
266     def tearDown(self):
267         super(ArvPutUploadJobTest, self).tearDown()
268         shutil.rmtree(self.tempdir)
269         os.unlink(self.large_file_name)
270
271     def test_writer_works_without_cache(self):
272         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
273         cwriter.start(save_collection=False)
274         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
275
276     def test_writer_works_with_cache(self):
277         with tempfile.NamedTemporaryFile() as f:
278             f.write('foo')
279             f.flush()
280             cwriter = arv_put.ArvPutUploadJob([f.name])
281             cwriter.start(save_collection=False)
282             self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
283             # Don't destroy the cache, and start another upload
284             cwriter_new = arv_put.ArvPutUploadJob([f.name])
285             cwriter_new.start(save_collection=False)
286             cwriter_new.destroy_cache()
287             self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
288
289     def make_progress_tester(self):
290         progression = []
291         def record_func(written, expected):
292             progression.append((written, expected))
293         return progression, record_func
294
295     def test_progress_reporting(self):
296         with tempfile.NamedTemporaryFile() as f:
297             f.write('foo')
298             f.flush()
299             for expect_count in (None, 8):
300                 progression, reporter = self.make_progress_tester()
301                 cwriter = arv_put.ArvPutUploadJob([f.name],
302                     reporter=reporter, bytes_expected=expect_count)
303                 cwriter.start(save_collection=False)
304                 cwriter.destroy_cache()
305                 self.assertIn((3, expect_count), progression)
306
307     def test_writer_upload_directory(self):
308         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
309         cwriter.start(save_collection=False)
310         cwriter.destroy_cache()
311         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
312
313     def test_resume_large_file_upload(self):
314         def wrapped_write(*args, **kwargs):
315             data = args[1]
316             # Exit only on last block
317             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
318                 raise SystemExit("Simulated error")
319             return self.arvfile_write(*args, **kwargs)
320
321         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
322                         autospec=True) as mocked_write:
323             mocked_write.side_effect = wrapped_write
324             writer = arv_put.ArvPutUploadJob([self.large_file_name],
325                                              replication_desired=1)
326             with self.assertRaises(SystemExit):
327                 writer.start(save_collection=False)
328             # Confirm that the file was partially uploaded
329             self.assertGreater(writer.bytes_written, 0)
330             self.assertLess(writer.bytes_written,
331                             os.path.getsize(self.large_file_name))
332         # Retry the upload
333         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
334                                           replication_desired=1)
335         writer2.start(save_collection=False)
336         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
337                          os.path.getsize(self.large_file_name))
338         writer2.destroy_cache()
339
340     def test_no_resume_when_asked(self):
341         def wrapped_write(*args, **kwargs):
342             data = args[1]
343             # Exit only on last block
344             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
345                 raise SystemExit("Simulated error")
346             return self.arvfile_write(*args, **kwargs)
347
348         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
349                         autospec=True) as mocked_write:
350             mocked_write.side_effect = wrapped_write
351             writer = arv_put.ArvPutUploadJob([self.large_file_name],
352                                              replication_desired=1)
353             with self.assertRaises(SystemExit):
354                 writer.start(save_collection=False)
355             # Confirm that the file was partially uploaded
356             self.assertGreater(writer.bytes_written, 0)
357             self.assertLess(writer.bytes_written,
358                             os.path.getsize(self.large_file_name))
359         # Retry the upload, this time without resume
360         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
361                                           replication_desired=1,
362                                           resume=False)
363         writer2.start(save_collection=False)
364         self.assertEqual(writer2.bytes_skipped, 0)
365         self.assertEqual(writer2.bytes_written,
366                          os.path.getsize(self.large_file_name))
367         writer2.destroy_cache()
368
369     def test_no_resume_when_no_cache(self):
370         def wrapped_write(*args, **kwargs):
371             data = args[1]
372             # Exit only on last block
373             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
374                 raise SystemExit("Simulated error")
375             return self.arvfile_write(*args, **kwargs)
376
377         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
378                         autospec=True) as mocked_write:
379             mocked_write.side_effect = wrapped_write
380             writer = arv_put.ArvPutUploadJob([self.large_file_name],
381                                              replication_desired=1)
382             with self.assertRaises(SystemExit):
383                 writer.start(save_collection=False)
384             # Confirm that the file was partially uploaded
385             self.assertGreater(writer.bytes_written, 0)
386             self.assertLess(writer.bytes_written,
387                             os.path.getsize(self.large_file_name))
388         # Retry the upload, this time without cache usage
389         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
390                                           replication_desired=1,
391                                           resume=False,
392                                           use_cache=False)
393         writer2.start(save_collection=False)
394         self.assertEqual(writer2.bytes_skipped, 0)
395         self.assertEqual(writer2.bytes_written,
396                          os.path.getsize(self.large_file_name))
397         writer2.destroy_cache()
398
399
400 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
401     TEST_SIZE = os.path.getsize(__file__)
402
403     def test_expected_bytes_for_file(self):
404         self.assertEqual(self.TEST_SIZE,
405                           arv_put.expected_bytes_for([__file__]))
406
407     def test_expected_bytes_for_tree(self):
408         tree = self.make_tmpdir()
409         shutil.copyfile(__file__, os.path.join(tree, 'one'))
410         shutil.copyfile(__file__, os.path.join(tree, 'two'))
411         self.assertEqual(self.TEST_SIZE * 2,
412                           arv_put.expected_bytes_for([tree]))
413         self.assertEqual(self.TEST_SIZE * 3,
414                           arv_put.expected_bytes_for([tree, __file__]))
415
416     def test_expected_bytes_for_device(self):
417         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
418         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
419
420
421 class ArvadosPutReportTest(ArvadosBaseTestCase):
422     def test_machine_progress(self):
423         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
424             expect = ": {} written {} total\n".format(
425                 count, -1 if (total is None) else total)
426             self.assertTrue(
427                 arv_put.machine_progress(count, total).endswith(expect))
428
429     def test_known_human_progress(self):
430         for count, total in [(0, 1), (2, 4), (45, 60)]:
431             expect = '{:.1%}'.format(float(count) / total)
432             actual = arv_put.human_progress(count, total)
433             self.assertTrue(actual.startswith('\r'))
434             self.assertIn(expect, actual)
435
436     def test_unknown_human_progress(self):
437         for count in [1, 20, 300, 4000, 50000]:
438             self.assertTrue(re.search(r'\b{}\b'.format(count),
439                                       arv_put.human_progress(count, None)))
440
441
442 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
443     MAIN_SERVER = {}
444     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
445
446     def call_main_with_args(self, args):
447         self.main_stdout = StringIO()
448         self.main_stderr = StringIO()
449         return arv_put.main(args, self.main_stdout, self.main_stderr)
450
451     def call_main_on_test_file(self, args=[]):
452         with self.make_test_file() as testfile:
453             path = testfile.name
454             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
455         self.assertTrue(
456             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
457                                         '098f6bcd4621d373cade4e832627b4f6')),
458             "did not find file stream in Keep store")
459
460     def setUp(self):
461         super(ArvadosPutTest, self).setUp()
462         run_test_server.authorize_with('active')
463         arv_put.api_client = None
464
465     def tearDown(self):
466         for outbuf in ['main_stdout', 'main_stderr']:
467             if hasattr(self, outbuf):
468                 getattr(self, outbuf).close()
469                 delattr(self, outbuf)
470         super(ArvadosPutTest, self).tearDown()
471
472     def test_version_argument(self):
473         err = io.BytesIO()
474         out = io.BytesIO()
475         with tutil.redirected_streams(stdout=out, stderr=err):
476             with self.assertRaises(SystemExit):
477                 self.call_main_with_args(['--version'])
478         self.assertEqual(out.getvalue(), '')
479         self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
480
481     def test_simple_file_put(self):
482         self.call_main_on_test_file()
483
484     def test_put_with_unwriteable_cache_dir(self):
485         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
486         cachedir = self.make_tmpdir()
487         os.chmod(cachedir, 0o0)
488         arv_put.ResumeCache.CACHE_DIR = cachedir
489         try:
490             self.call_main_on_test_file()
491         finally:
492             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
493             os.chmod(cachedir, 0o700)
494
495     def test_put_with_unwritable_cache_subdir(self):
496         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
497         cachedir = self.make_tmpdir()
498         os.chmod(cachedir, 0o0)
499         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
500         try:
501             self.call_main_on_test_file()
502         finally:
503             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
504             os.chmod(cachedir, 0o700)
505
506     def test_put_block_replication(self):
507         self.call_main_on_test_file()
508         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
509             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
510             self.call_main_on_test_file(['--replication', '1'])
511             self.call_main_on_test_file(['--replication', '4'])
512             self.call_main_on_test_file(['--replication', '5'])
513             self.assertEqual(
514                 [x[-1].get('copies') for x in put_mock.call_args_list],
515                 [1, 4, 5])
516
517     def test_normalize(self):
518         testfile1 = self.make_test_file()
519         testfile2 = self.make_test_file()
520         test_paths = [testfile1.name, testfile2.name]
521         # Reverse-sort the paths, so normalization must change their order.
522         test_paths.sort(reverse=True)
523         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
524                                  test_paths)
525         manifest = self.main_stdout.getvalue()
526         # Assert the second file we specified appears first in the manifest.
527         file_indices = [manifest.find(':' + os.path.basename(path))
528                         for path in test_paths]
529         self.assertGreater(*file_indices)
530
531     def test_error_name_without_collection(self):
532         self.assertRaises(SystemExit, self.call_main_with_args,
533                           ['--name', 'test without Collection',
534                            '--stream', '/dev/null'])
535
536     def test_error_when_project_not_found(self):
537         self.assertRaises(SystemExit,
538                           self.call_main_with_args,
539                           ['--project-uuid', self.Z_UUID])
540
541     def test_error_bad_project_uuid(self):
542         self.assertRaises(SystemExit,
543                           self.call_main_with_args,
544                           ['--project-uuid', self.Z_UUID, '--stream'])
545
546     def test_api_error_handling(self):
547         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
548         coll_save_mock.side_effect = arvados.errors.ApiError(
549             fake_httplib2_response(403), '{}')
550         with mock.patch('arvados.collection.Collection.save_new',
551                         new=coll_save_mock):
552             with self.assertRaises(SystemExit) as exc_test:
553                 self.call_main_with_args(['/dev/null'])
554             self.assertLess(0, exc_test.exception.args[0])
555             self.assertLess(0, coll_save_mock.call_count)
556             self.assertEqual("", self.main_stdout.getvalue())
557
558
559 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
560                             ArvadosBaseTestCase):
561     def _getKeepServerConfig():
562         for config_file, mandatory in [
563                 ['application.yml', False], ['application.default.yml', True]]:
564             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
565                                 "api", "config", config_file)
566             if not mandatory and not os.path.exists(path):
567                 continue
568             with open(path) as f:
569                 rails_config = yaml.load(f.read())
570                 for config_section in ['test', 'common']:
571                     try:
572                         key = rails_config[config_section]["blob_signing_key"]
573                     except (KeyError, TypeError):
574                         pass
575                     else:
576                         return {'blob_signing_key': key,
577                                 'enforce_permissions': True}
578         return {'blog_signing_key': None, 'enforce_permissions': False}
579
580     MAIN_SERVER = {}
581     KEEP_SERVER = _getKeepServerConfig()
582     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
583
584     @classmethod
585     def setUpClass(cls):
586         super(ArvPutIntegrationTest, cls).setUpClass()
587         cls.ENVIRON = os.environ.copy()
588         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
589
590     def setUp(self):
591         super(ArvPutIntegrationTest, self).setUp()
592         arv_put.api_client = None
593
594     def authorize_with(self, token_name):
595         run_test_server.authorize_with(token_name)
596         for v in ["ARVADOS_API_HOST",
597                   "ARVADOS_API_HOST_INSECURE",
598                   "ARVADOS_API_TOKEN"]:
599             self.ENVIRON[v] = arvados.config.settings()[v]
600         arv_put.api_client = arvados.api('v1')
601
602     def current_user(self):
603         return arv_put.api_client.users().current().execute()
604
605     def test_check_real_project_found(self):
606         self.authorize_with('active')
607         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
608                         "did not correctly find test fixture project")
609
610     def test_check_error_finding_nonexistent_uuid(self):
611         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
612         self.authorize_with('active')
613         try:
614             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
615                                                   0)
616         except ValueError as error:
617             self.assertIn(BAD_UUID, error.message)
618         else:
619             self.assertFalse(result, "incorrectly found nonexistent project")
620
621     def test_check_error_finding_nonexistent_project(self):
622         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
623         self.authorize_with('active')
624         with self.assertRaises(apiclient.errors.HttpError):
625             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
626                                                   0)
627
628     def test_short_put_from_stdin(self):
629         # Have to run this as an integration test since arv-put can't
630         # read from the tests' stdin.
631         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
632         # case, because the /proc entry is already gone by the time it tries.
633         pipe = subprocess.Popen(
634             [sys.executable, arv_put.__file__, '--stream'],
635             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
636             stderr=subprocess.STDOUT, env=self.ENVIRON)
637         pipe.stdin.write('stdin test\n')
638         pipe.stdin.close()
639         deadline = time.time() + 5
640         while (pipe.poll() is None) and (time.time() < deadline):
641             time.sleep(.1)
642         returncode = pipe.poll()
643         if returncode is None:
644             pipe.terminate()
645             self.fail("arv-put did not PUT from stdin within 5 seconds")
646         elif returncode != 0:
647             sys.stdout.write(pipe.stdout.read())
648             self.fail("arv-put returned exit code {}".format(returncode))
649         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
650
651     def test_ArvPutSignedManifest(self):
652         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
653         # the newly created manifest from the API server, testing to confirm
654         # that the block locators in the returned manifest are signed.
655         self.authorize_with('active')
656
657         # Before doing anything, demonstrate that the collection
658         # we're about to create is not present in our test fixture.
659         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
660         with self.assertRaises(apiclient.errors.HttpError):
661             notfound = arv_put.api_client.collections().get(
662                 uuid=manifest_uuid).execute()
663
664         datadir = self.make_tmpdir()
665         with open(os.path.join(datadir, "foo"), "w") as f:
666             f.write("The quick brown fox jumped over the lazy dog")
667         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
668                              stdout=subprocess.PIPE, env=self.ENVIRON)
669         (arvout, arverr) = p.communicate()
670         self.assertEqual(arverr, None)
671         self.assertEqual(p.returncode, 0)
672
673         # The manifest text stored in the API server under the same
674         # manifest UUID must use signed locators.
675         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
676         self.assertRegexpMatches(
677             c['manifest_text'],
678             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
679
680         os.remove(os.path.join(datadir, "foo"))
681         os.rmdir(datadir)
682
683     def run_and_find_collection(self, text, extra_args=[]):
684         self.authorize_with('active')
685         pipe = subprocess.Popen(
686             [sys.executable, arv_put.__file__] + extra_args,
687             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
688             stderr=subprocess.PIPE, env=self.ENVIRON)
689         stdout, stderr = pipe.communicate(text)
690         search_key = ('portable_data_hash'
691                       if '--portable-data-hash' in extra_args else 'uuid')
692         collection_list = arvados.api('v1').collections().list(
693             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
694         self.assertEqual(1, len(collection_list))
695         return collection_list[0]
696
697     def test_put_collection_with_later_update(self):
698         tmpdir = self.make_tmpdir()
699         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
700             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
701         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
702         self.assertNotEqual(None, col['uuid'])
703         # Add a new file to the directory
704         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
705             f.write('The quick brown fox jumped over the lazy dog')
706         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
707         self.assertEqual(col['uuid'], updated_col['uuid'])
708         # Get the manifest and check that the new file is being included
709         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
710         self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
711
712     def test_put_collection_with_high_redundancy(self):
713         # Write empty data: we're not testing CollectionWriter, just
714         # making sure collections.create tells the API server what our
715         # desired replication level is.
716         collection = self.run_and_find_collection("", ['--replication', '4'])
717         self.assertEqual(4, collection['replication_desired'])
718
719     def test_put_collection_with_default_redundancy(self):
720         collection = self.run_and_find_collection("")
721         self.assertEqual(None, collection['replication_desired'])
722
723     def test_put_collection_with_unnamed_project_link(self):
724         link = self.run_and_find_collection(
725             "Test unnamed collection",
726             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
727         username = pwd.getpwuid(os.getuid()).pw_name
728         self.assertRegexpMatches(
729             link['name'],
730             r'^Saved at .* by {}@'.format(re.escape(username)))
731
732     def test_put_collection_with_name_and_no_project(self):
733         link_name = 'Test Collection Link in home project'
734         collection = self.run_and_find_collection(
735             "Test named collection in home project",
736             ['--portable-data-hash', '--name', link_name])
737         self.assertEqual(link_name, collection['name'])
738         my_user_uuid = self.current_user()['uuid']
739         self.assertEqual(my_user_uuid, collection['owner_uuid'])
740
741     def test_put_collection_with_named_project_link(self):
742         link_name = 'Test auto Collection Link'
743         collection = self.run_and_find_collection("Test named collection",
744                                       ['--portable-data-hash',
745                                        '--name', link_name,
746                                        '--project-uuid', self.PROJECT_UUID])
747         self.assertEqual(link_name, collection['name'])
748
749
750 if __name__ == '__main__':
751     unittest.main()