11308: Merge branch 'master' into 11308-python3
[arvados.git] / sdk / python / tests / test_arv_put.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import str
6 from builtins import range
7 import apiclient
8 import mock
9 import os
10 import pwd
11 import re
12 import shutil
13 import subprocess
14 import sys
15 import tempfile
16 import time
17 import unittest
18 import yaml
19 import threading
20 import hashlib
21 import random
22
23 import arvados
24 import arvados.commands.put as arv_put
25 from . import arvados_testutil as tutil
26
27 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
28 from . import run_test_server
29
30 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
31     CACHE_ARGSET = [
32         [],
33         ['/dev/null'],
34         ['/dev/null', '--filename', 'empty'],
35         ['/tmp']
36         ]
37
38     def tearDown(self):
39         super(ArvadosPutResumeCacheTest, self).tearDown()
40         try:
41             self.last_cache.destroy()
42         except AttributeError:
43             pass
44
45     def cache_path_from_arglist(self, arglist):
46         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
47
48     def test_cache_names_stable(self):
49         for argset in self.CACHE_ARGSET:
50             self.assertEqual(self.cache_path_from_arglist(argset),
51                               self.cache_path_from_arglist(argset),
52                               "cache name changed for {}".format(argset))
53
54     def test_cache_names_unique(self):
55         results = []
56         for argset in self.CACHE_ARGSET:
57             path = self.cache_path_from_arglist(argset)
58             self.assertNotIn(path, results)
59             results.append(path)
60
61     def test_cache_names_simple(self):
62         # The goal here is to make sure the filename doesn't use characters
63         # reserved by the filesystem.  Feel free to adjust this regexp as
64         # long as it still does that.
65         bad_chars = re.compile(r'[^-\.\w]')
66         for argset in self.CACHE_ARGSET:
67             path = self.cache_path_from_arglist(argset)
68             self.assertFalse(bad_chars.search(os.path.basename(path)),
69                              "path too exotic: {}".format(path))
70
71     def test_cache_names_ignore_argument_order(self):
72         self.assertEqual(
73             self.cache_path_from_arglist(['a', 'b', 'c']),
74             self.cache_path_from_arglist(['c', 'a', 'b']))
75         self.assertEqual(
76             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
77             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
78
79     def test_cache_names_differ_for_similar_paths(self):
80         # This test needs names at / that don't exist on the real filesystem.
81         self.assertNotEqual(
82             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
83             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
84
85     def test_cache_names_ignore_irrelevant_arguments(self):
86         # Workaround: parse_arguments bails on --filename with a directory.
87         path1 = self.cache_path_from_arglist(['/tmp'])
88         args = arv_put.parse_arguments(['/tmp'])
89         args.filename = 'tmp'
90         path2 = arv_put.ResumeCache.make_path(args)
91         self.assertEqual(path1, path2,
92                          "cache path considered --filename for directory")
93         self.assertEqual(
94             self.cache_path_from_arglist(['-']),
95             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
96             "cache path considered --max-manifest-depth for file")
97
98     def test_cache_names_treat_negative_manifest_depths_identically(self):
99         base_args = ['/tmp', '--max-manifest-depth']
100         self.assertEqual(
101             self.cache_path_from_arglist(base_args + ['-1']),
102             self.cache_path_from_arglist(base_args + ['-2']))
103
104     def test_cache_names_treat_stdin_consistently(self):
105         self.assertEqual(
106             self.cache_path_from_arglist(['-', '--filename', 'test']),
107             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
108
109     def test_cache_names_identical_for_synonymous_names(self):
110         self.assertEqual(
111             self.cache_path_from_arglist(['.']),
112             self.cache_path_from_arglist([os.path.realpath('.')]))
113         testdir = self.make_tmpdir()
114         looplink = os.path.join(testdir, 'loop')
115         os.symlink(testdir, looplink)
116         self.assertEqual(
117             self.cache_path_from_arglist([testdir]),
118             self.cache_path_from_arglist([looplink]))
119
120     def test_cache_names_different_by_api_host(self):
121         config = arvados.config.settings()
122         orig_host = config.get('ARVADOS_API_HOST')
123         try:
124             name1 = self.cache_path_from_arglist(['.'])
125             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
126             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
127         finally:
128             if orig_host is None:
129                 del config['ARVADOS_API_HOST']
130             else:
131                 config['ARVADOS_API_HOST'] = orig_host
132
133     @mock.patch('arvados.keep.KeepClient.head')
134     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
135         keep_client_head.side_effect = [True]
136         thing = {}
137         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
138         with tempfile.NamedTemporaryFile() as cachefile:
139             self.last_cache = arv_put.ResumeCache(cachefile.name)
140         self.last_cache.save(thing)
141         self.last_cache.close()
142         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
143         self.assertNotEqual(None, resume_cache)
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_finished_streams(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
159         keep_client_head.side_effect = Exception('Locator not found')
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168         self.assertRaises(None, resume_cache.check_cache())
169
170     def test_basic_cache_storage(self):
171         thing = ['test', 'list']
172         with tempfile.NamedTemporaryFile() as cachefile:
173             self.last_cache = arv_put.ResumeCache(cachefile.name)
174         self.last_cache.save(thing)
175         self.assertEqual(thing, self.last_cache.load())
176
177     def test_empty_cache(self):
178         with tempfile.NamedTemporaryFile() as cachefile:
179             cache = arv_put.ResumeCache(cachefile.name)
180         self.assertRaises(ValueError, cache.load)
181
182     def test_cache_persistent(self):
183         thing = ['test', 'list']
184         path = os.path.join(self.make_tmpdir(), 'cache')
185         cache = arv_put.ResumeCache(path)
186         cache.save(thing)
187         cache.close()
188         self.last_cache = arv_put.ResumeCache(path)
189         self.assertEqual(thing, self.last_cache.load())
190
191     def test_multiple_cache_writes(self):
192         thing = ['short', 'list']
193         with tempfile.NamedTemporaryFile() as cachefile:
194             self.last_cache = arv_put.ResumeCache(cachefile.name)
195         # Start writing an object longer than the one we test, to make
196         # sure the cache file gets truncated.
197         self.last_cache.save(['long', 'long', 'list'])
198         self.last_cache.save(thing)
199         self.assertEqual(thing, self.last_cache.load())
200
201     def test_cache_is_locked(self):
202         with tempfile.NamedTemporaryFile() as cachefile:
203             cache = arv_put.ResumeCache(cachefile.name)
204             self.assertRaises(arv_put.ResumeCacheConflict,
205                               arv_put.ResumeCache, cachefile.name)
206
207     def test_cache_stays_locked(self):
208         with tempfile.NamedTemporaryFile() as cachefile:
209             self.last_cache = arv_put.ResumeCache(cachefile.name)
210             path = cachefile.name
211         self.last_cache.save('test')
212         self.assertRaises(arv_put.ResumeCacheConflict,
213                           arv_put.ResumeCache, path)
214
215     def test_destroy_cache(self):
216         cachefile = tempfile.NamedTemporaryFile(delete=False)
217         try:
218             cache = arv_put.ResumeCache(cachefile.name)
219             cache.save('test')
220             cache.destroy()
221             try:
222                 arv_put.ResumeCache(cachefile.name)
223             except arv_put.ResumeCacheConflict:
224                 self.fail("could not load cache after destroying it")
225             self.assertRaises(ValueError, cache.load)
226         finally:
227             if os.path.exists(cachefile.name):
228                 os.unlink(cachefile.name)
229
230     def test_restart_cache(self):
231         path = os.path.join(self.make_tmpdir(), 'cache')
232         cache = arv_put.ResumeCache(path)
233         cache.save('test')
234         cache.restart()
235         self.assertRaises(ValueError, cache.load)
236         self.assertRaises(arv_put.ResumeCacheConflict,
237                           arv_put.ResumeCache, path)
238
239
240 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
241                           ArvadosBaseTestCase):
242
243     def setUp(self):
244         super(ArvPutUploadJobTest, self).setUp()
245         run_test_server.authorize_with('active')
246         # Temp files creation
247         self.tempdir = tempfile.mkdtemp()
248         subdir = os.path.join(self.tempdir, 'subdir')
249         os.mkdir(subdir)
250         data = "x" * 1024 # 1 KB
251         for i in range(1, 5):
252             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
253                 f.write(data * i)
254         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
255             f.write(data * 5)
256         # Large temp file for resume test
257         _, self.large_file_name = tempfile.mkstemp()
258         fileobj = open(self.large_file_name, 'w')
259         # Make sure to write just a little more than one block
260         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
261             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
262             fileobj.write(data)
263         fileobj.close()
264         # Temp dir containing small files to be repacked
265         self.small_files_dir = tempfile.mkdtemp()
266         data = 'y' * 1024 * 1024 # 1 MB
267         for i in range(1, 70):
268             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
269                 f.write(data + str(i))
270         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
271
272     def tearDown(self):
273         super(ArvPutUploadJobTest, self).tearDown()
274         shutil.rmtree(self.tempdir)
275         os.unlink(self.large_file_name)
276         shutil.rmtree(self.small_files_dir)
277
278     def test_writer_works_without_cache(self):
279         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
280         cwriter.start(save_collection=False)
281         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
282
283     def test_writer_works_with_cache(self):
284         with tempfile.NamedTemporaryFile() as f:
285             f.write(b'foo')
286             f.flush()
287             cwriter = arv_put.ArvPutUploadJob([f.name])
288             cwriter.start(save_collection=False)
289             self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
290             # Don't destroy the cache, and start another upload
291             cwriter_new = arv_put.ArvPutUploadJob([f.name])
292             cwriter_new.start(save_collection=False)
293             cwriter_new.destroy_cache()
294             self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
295
296     def make_progress_tester(self):
297         progression = []
298         def record_func(written, expected):
299             progression.append((written, expected))
300         return progression, record_func
301
302     def test_progress_reporting(self):
303         with tempfile.NamedTemporaryFile() as f:
304             f.write(b'foo')
305             f.flush()
306             for expect_count in (None, 8):
307                 progression, reporter = self.make_progress_tester()
308                 cwriter = arv_put.ArvPutUploadJob([f.name],
309                     reporter=reporter, bytes_expected=expect_count)
310                 cwriter.start(save_collection=False)
311                 cwriter.destroy_cache()
312                 self.assertIn((3, expect_count), progression)
313
314     def test_writer_upload_directory(self):
315         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
316         cwriter.start(save_collection=False)
317         cwriter.destroy_cache()
318         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
319
320     def test_resume_large_file_upload(self):
321         def wrapped_write(*args, **kwargs):
322             data = args[1]
323             # Exit only on last block
324             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
325                 # Simulate a checkpoint before quitting. Ensure block commit.
326                 self.writer._update(final=True)
327                 raise SystemExit("Simulated error")
328             return self.arvfile_write(*args, **kwargs)
329
330         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
331                         autospec=True) as mocked_write:
332             mocked_write.side_effect = wrapped_write
333             writer = arv_put.ArvPutUploadJob([self.large_file_name],
334                                              replication_desired=1)
335             # We'll be accessing from inside the wrapper
336             self.writer = writer
337             with self.assertRaises(SystemExit):
338                 writer.start(save_collection=False)
339             # Confirm that the file was partially uploaded
340             self.assertGreater(writer.bytes_written, 0)
341             self.assertLess(writer.bytes_written,
342                             os.path.getsize(self.large_file_name))
343         # Retry the upload
344         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
345                                           replication_desired=1)
346         writer2.start(save_collection=False)
347         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
348                          os.path.getsize(self.large_file_name))
349         writer2.destroy_cache()
350         del(self.writer)
351
352     # Test for bug #11002
353     def test_graceful_exit_while_repacking_small_blocks(self):
354         def wrapped_commit(*args, **kwargs):
355             raise SystemExit("Simulated error")
356
357         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
358                         autospec=True) as mocked_commit:
359             mocked_commit.side_effect = wrapped_commit
360             # Upload a little more than 1 block, wrapped_commit will make the first block
361             # commit to fail.
362             # arv-put should not exit with an exception by trying to commit the collection
363             # as it's in an inconsistent state.
364             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
365                                              replication_desired=1)
366             try:
367                 with self.assertRaises(SystemExit):
368                     writer.start(save_collection=False)
369             except arvados.arvfile.UnownedBlockError:
370                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
371         writer.destroy_cache()
372
373     def test_no_resume_when_asked(self):
374         def wrapped_write(*args, **kwargs):
375             data = args[1]
376             # Exit only on last block
377             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
378                 # Simulate a checkpoint before quitting.
379                 self.writer._update()
380                 raise SystemExit("Simulated error")
381             return self.arvfile_write(*args, **kwargs)
382
383         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
384                         autospec=True) as mocked_write:
385             mocked_write.side_effect = wrapped_write
386             writer = arv_put.ArvPutUploadJob([self.large_file_name],
387                                              replication_desired=1)
388             # We'll be accessing from inside the wrapper
389             self.writer = writer
390             with self.assertRaises(SystemExit):
391                 writer.start(save_collection=False)
392             # Confirm that the file was partially uploaded
393             self.assertGreater(writer.bytes_written, 0)
394             self.assertLess(writer.bytes_written,
395                             os.path.getsize(self.large_file_name))
396         # Retry the upload, this time without resume
397         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
398                                           replication_desired=1,
399                                           resume=False)
400         writer2.start(save_collection=False)
401         self.assertEqual(writer2.bytes_skipped, 0)
402         self.assertEqual(writer2.bytes_written,
403                          os.path.getsize(self.large_file_name))
404         writer2.destroy_cache()
405         del(self.writer)
406
407     def test_no_resume_when_no_cache(self):
408         def wrapped_write(*args, **kwargs):
409             data = args[1]
410             # Exit only on last block
411             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
412                 # Simulate a checkpoint before quitting.
413                 self.writer._update()
414                 raise SystemExit("Simulated error")
415             return self.arvfile_write(*args, **kwargs)
416
417         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
418                         autospec=True) as mocked_write:
419             mocked_write.side_effect = wrapped_write
420             writer = arv_put.ArvPutUploadJob([self.large_file_name],
421                                              replication_desired=1)
422             # We'll be accessing from inside the wrapper
423             self.writer = writer
424             with self.assertRaises(SystemExit):
425                 writer.start(save_collection=False)
426             # Confirm that the file was partially uploaded
427             self.assertGreater(writer.bytes_written, 0)
428             self.assertLess(writer.bytes_written,
429                             os.path.getsize(self.large_file_name))
430         # Retry the upload, this time without cache usage
431         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
432                                           replication_desired=1,
433                                           resume=False,
434                                           use_cache=False)
435         writer2.start(save_collection=False)
436         self.assertEqual(writer2.bytes_skipped, 0)
437         self.assertEqual(writer2.bytes_written,
438                          os.path.getsize(self.large_file_name))
439         writer2.destroy_cache()
440         del(self.writer)
441
442     def test_dry_run_feature(self):
443         def wrapped_write(*args, **kwargs):
444             data = args[1]
445             # Exit only on last block
446             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
447                 # Simulate a checkpoint before quitting.
448                 self.writer._update()
449                 raise SystemExit("Simulated error")
450             return self.arvfile_write(*args, **kwargs)
451
452         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
453                         autospec=True) as mocked_write:
454             mocked_write.side_effect = wrapped_write
455             writer = arv_put.ArvPutUploadJob([self.large_file_name],
456                                              replication_desired=1)
457             # We'll be accessing from inside the wrapper
458             self.writer = writer
459             with self.assertRaises(SystemExit):
460                 writer.start(save_collection=False)
461             # Confirm that the file was partially uploaded
462             self.assertGreater(writer.bytes_written, 0)
463             self.assertLess(writer.bytes_written,
464                             os.path.getsize(self.large_file_name))
465         # Retry the upload using dry_run to check if there is a pending upload
466         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
467                                           replication_desired=1,
468                                           dry_run=True)
469         with self.assertRaises(arv_put.ArvPutUploadIsPending):
470             writer2.start(save_collection=False)
471         # Complete the pending upload
472         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
473                                           replication_desired=1)
474         writer3.start(save_collection=False)
475         # Confirm there's no pending upload with dry_run=True
476         writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
477                                           replication_desired=1,
478                                           dry_run=True)
479         with self.assertRaises(arv_put.ArvPutUploadNotPending):
480             writer4.start(save_collection=False)
481         writer4.destroy_cache()
482         # Test obvious cases
483         with self.assertRaises(arv_put.ArvPutUploadIsPending):
484             arv_put.ArvPutUploadJob([self.large_file_name],
485                                     replication_desired=1,
486                                     dry_run=True,
487                                     resume=False,
488                                     use_cache=False)
489         with self.assertRaises(arv_put.ArvPutUploadIsPending):
490             arv_put.ArvPutUploadJob([self.large_file_name],
491                                     replication_desired=1,
492                                     dry_run=True,
493                                     resume=False)
494         del(self.writer)
495
496 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
497     TEST_SIZE = os.path.getsize(__file__)
498
499     def test_expected_bytes_for_file(self):
500         self.assertEqual(self.TEST_SIZE,
501                           arv_put.expected_bytes_for([__file__]))
502
503     def test_expected_bytes_for_tree(self):
504         tree = self.make_tmpdir()
505         shutil.copyfile(__file__, os.path.join(tree, 'one'))
506         shutil.copyfile(__file__, os.path.join(tree, 'two'))
507         self.assertEqual(self.TEST_SIZE * 2,
508                           arv_put.expected_bytes_for([tree]))
509         self.assertEqual(self.TEST_SIZE * 3,
510                           arv_put.expected_bytes_for([tree, __file__]))
511
512     def test_expected_bytes_for_device(self):
513         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
514         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
515
516
517 class ArvadosPutReportTest(ArvadosBaseTestCase):
518     def test_machine_progress(self):
519         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
520             expect = ": {} written {} total\n".format(
521                 count, -1 if (total is None) else total)
522             self.assertTrue(
523                 arv_put.machine_progress(count, total).endswith(expect))
524
525     def test_known_human_progress(self):
526         for count, total in [(0, 1), (2, 4), (45, 60)]:
527             expect = '{:.1%}'.format(1.0*count/total)
528             actual = arv_put.human_progress(count, total)
529             self.assertTrue(actual.startswith('\r'))
530             self.assertIn(expect, actual)
531
532     def test_unknown_human_progress(self):
533         for count in [1, 20, 300, 4000, 50000]:
534             self.assertTrue(re.search(r'\b{}\b'.format(count),
535                                       arv_put.human_progress(count, None)))
536
537
538 class ArvadosPutTest(run_test_server.TestCaseWithServers,
539                      ArvadosBaseTestCase,
540                      tutil.VersionChecker):
541     MAIN_SERVER = {}
542     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
543
544     def call_main_with_args(self, args):
545         self.main_stdout = tutil.StringIO()
546         self.main_stderr = tutil.StringIO()
547         return arv_put.main(args, self.main_stdout, self.main_stderr)
548
549     def call_main_on_test_file(self, args=[]):
550         with self.make_test_file() as testfile:
551             path = testfile.name
552             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
553         self.assertTrue(
554             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
555                                         '098f6bcd4621d373cade4e832627b4f6')),
556             "did not find file stream in Keep store")
557
558     def setUp(self):
559         super(ArvadosPutTest, self).setUp()
560         run_test_server.authorize_with('active')
561         arv_put.api_client = None
562
563     def tearDown(self):
564         for outbuf in ['main_stdout', 'main_stderr']:
565             if hasattr(self, outbuf):
566                 getattr(self, outbuf).close()
567                 delattr(self, outbuf)
568         super(ArvadosPutTest, self).tearDown()
569
570     def test_version_argument(self):
571         with tutil.redirected_streams(
572                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
573             with self.assertRaises(SystemExit):
574                 self.call_main_with_args(['--version'])
575         self.assertVersionOutput(out, err)
576
577     def test_simple_file_put(self):
578         self.call_main_on_test_file()
579
580     def test_put_with_unwriteable_cache_dir(self):
581         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
582         cachedir = self.make_tmpdir()
583         os.chmod(cachedir, 0o0)
584         arv_put.ResumeCache.CACHE_DIR = cachedir
585         try:
586             self.call_main_on_test_file()
587         finally:
588             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
589             os.chmod(cachedir, 0o700)
590
591     def test_put_with_unwritable_cache_subdir(self):
592         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
593         cachedir = self.make_tmpdir()
594         os.chmod(cachedir, 0o0)
595         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
596         try:
597             self.call_main_on_test_file()
598         finally:
599             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
600             os.chmod(cachedir, 0o700)
601
602     def test_put_block_replication(self):
603         self.call_main_on_test_file()
604         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
605             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
606             self.call_main_on_test_file(['--replication', '1'])
607             self.call_main_on_test_file(['--replication', '4'])
608             self.call_main_on_test_file(['--replication', '5'])
609             self.assertEqual(
610                 [x[-1].get('copies') for x in put_mock.call_args_list],
611                 [1, 4, 5])
612
613     def test_normalize(self):
614         testfile1 = self.make_test_file()
615         testfile2 = self.make_test_file()
616         test_paths = [testfile1.name, testfile2.name]
617         # Reverse-sort the paths, so normalization must change their order.
618         test_paths.sort(reverse=True)
619         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
620                                  test_paths)
621         manifest = self.main_stdout.getvalue()
622         # Assert the second file we specified appears first in the manifest.
623         file_indices = [manifest.find(':' + os.path.basename(path))
624                         for path in test_paths]
625         self.assertGreater(*file_indices)
626
627     def test_error_name_without_collection(self):
628         self.assertRaises(SystemExit, self.call_main_with_args,
629                           ['--name', 'test without Collection',
630                            '--stream', '/dev/null'])
631
632     def test_error_when_project_not_found(self):
633         self.assertRaises(SystemExit,
634                           self.call_main_with_args,
635                           ['--project-uuid', self.Z_UUID])
636
637     def test_error_bad_project_uuid(self):
638         self.assertRaises(SystemExit,
639                           self.call_main_with_args,
640                           ['--project-uuid', self.Z_UUID, '--stream'])
641
642     def test_api_error_handling(self):
643         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
644         coll_save_mock.side_effect = arvados.errors.ApiError(
645             fake_httplib2_response(403), b'{}')
646         with mock.patch('arvados.collection.Collection.save_new',
647                         new=coll_save_mock):
648             with self.assertRaises(SystemExit) as exc_test:
649                 self.call_main_with_args(['/dev/null'])
650             self.assertLess(0, exc_test.exception.args[0])
651             self.assertLess(0, coll_save_mock.call_count)
652             self.assertEqual("", self.main_stdout.getvalue())
653
654
655 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
656                             ArvadosBaseTestCase):
657     def _getKeepServerConfig():
658         for config_file, mandatory in [
659                 ['application.yml', False], ['application.default.yml', True]]:
660             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
661                                 "api", "config", config_file)
662             if not mandatory and not os.path.exists(path):
663                 continue
664             with open(path) as f:
665                 rails_config = yaml.load(f.read())
666                 for config_section in ['test', 'common']:
667                     try:
668                         key = rails_config[config_section]["blob_signing_key"]
669                     except (KeyError, TypeError):
670                         pass
671                     else:
672                         return {'blob_signing_key': key,
673                                 'enforce_permissions': True}
674         return {'blog_signing_key': None, 'enforce_permissions': False}
675
676     MAIN_SERVER = {}
677     KEEP_SERVER = _getKeepServerConfig()
678     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
679
680     @classmethod
681     def setUpClass(cls):
682         super(ArvPutIntegrationTest, cls).setUpClass()
683         cls.ENVIRON = os.environ.copy()
684         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
685
686     def setUp(self):
687         super(ArvPutIntegrationTest, self).setUp()
688         arv_put.api_client = None
689
690     def authorize_with(self, token_name):
691         run_test_server.authorize_with(token_name)
692         for v in ["ARVADOS_API_HOST",
693                   "ARVADOS_API_HOST_INSECURE",
694                   "ARVADOS_API_TOKEN"]:
695             self.ENVIRON[v] = arvados.config.settings()[v]
696         arv_put.api_client = arvados.api('v1')
697
698     def current_user(self):
699         return arv_put.api_client.users().current().execute()
700
701     def test_check_real_project_found(self):
702         self.authorize_with('active')
703         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
704                         "did not correctly find test fixture project")
705
706     def test_check_error_finding_nonexistent_uuid(self):
707         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
708         self.authorize_with('active')
709         try:
710             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
711                                                   0)
712         except ValueError as error:
713             self.assertIn(BAD_UUID, str(error))
714         else:
715             self.assertFalse(result, "incorrectly found nonexistent project")
716
717     def test_check_error_finding_nonexistent_project(self):
718         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
719         self.authorize_with('active')
720         with self.assertRaises(apiclient.errors.HttpError):
721             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
722                                                   0)
723
724     def test_short_put_from_stdin(self):
725         # Have to run this as an integration test since arv-put can't
726         # read from the tests' stdin.
727         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
728         # case, because the /proc entry is already gone by the time it tries.
729         pipe = subprocess.Popen(
730             [sys.executable, arv_put.__file__, '--stream'],
731             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
732             stderr=subprocess.STDOUT, env=self.ENVIRON)
733         pipe.stdin.write(b'stdin test\n')
734         pipe.stdin.close()
735         deadline = time.time() + 5
736         while (pipe.poll() is None) and (time.time() < deadline):
737             time.sleep(.1)
738         returncode = pipe.poll()
739         if returncode is None:
740             pipe.terminate()
741             self.fail("arv-put did not PUT from stdin within 5 seconds")
742         elif returncode != 0:
743             sys.stdout.write(pipe.stdout.read())
744             self.fail("arv-put returned exit code {}".format(returncode))
745         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
746                       pipe.stdout.read().decode())
747
748     def test_ArvPutSignedManifest(self):
749         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
750         # the newly created manifest from the API server, testing to confirm
751         # that the block locators in the returned manifest are signed.
752         self.authorize_with('active')
753
754         # Before doing anything, demonstrate that the collection
755         # we're about to create is not present in our test fixture.
756         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
757         with self.assertRaises(apiclient.errors.HttpError):
758             notfound = arv_put.api_client.collections().get(
759                 uuid=manifest_uuid).execute()
760
761         datadir = self.make_tmpdir()
762         with open(os.path.join(datadir, "foo"), "w") as f:
763             f.write("The quick brown fox jumped over the lazy dog")
764         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
765                              stdout=subprocess.PIPE,
766                              stderr=subprocess.PIPE,
767                              env=self.ENVIRON)
768         (out, err) = p.communicate()
769         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
770         self.assertEqual(p.returncode, 0)
771
772         # The manifest text stored in the API server under the same
773         # manifest UUID must use signed locators.
774         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
775         self.assertRegex(
776             c['manifest_text'],
777             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
778
779         os.remove(os.path.join(datadir, "foo"))
780         os.rmdir(datadir)
781
782     def run_and_find_collection(self, text, extra_args=[]):
783         self.authorize_with('active')
784         pipe = subprocess.Popen(
785             [sys.executable, arv_put.__file__] + extra_args,
786             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
787             stderr=subprocess.PIPE, env=self.ENVIRON)
788         stdout, stderr = pipe.communicate(text.encode())
789         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
790         search_key = ('portable_data_hash'
791                       if '--portable-data-hash' in extra_args else 'uuid')
792         collection_list = arvados.api('v1').collections().list(
793             filters=[[search_key, '=', stdout.decode().strip()]]
794         ).execute().get('items', [])
795         self.assertEqual(1, len(collection_list))
796         return collection_list[0]
797
798     def test_put_collection_with_later_update(self):
799         tmpdir = self.make_tmpdir()
800         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
801             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
802         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
803         self.assertNotEqual(None, col['uuid'])
804         # Add a new file to the directory
805         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
806             f.write('The quick brown fox jumped over the lazy dog')
807         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
808         self.assertEqual(col['uuid'], updated_col['uuid'])
809         # Get the manifest and check that the new file is being included
810         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
811         self.assertRegex(c['manifest_text'], r'^\. .*:44:file2\n')
812
813     def test_put_collection_with_high_redundancy(self):
814         # Write empty data: we're not testing CollectionWriter, just
815         # making sure collections.create tells the API server what our
816         # desired replication level is.
817         collection = self.run_and_find_collection("", ['--replication', '4'])
818         self.assertEqual(4, collection['replication_desired'])
819
820     def test_put_collection_with_default_redundancy(self):
821         collection = self.run_and_find_collection("")
822         self.assertEqual(None, collection['replication_desired'])
823
824     def test_put_collection_with_unnamed_project_link(self):
825         link = self.run_and_find_collection(
826             "Test unnamed collection",
827             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
828         username = pwd.getpwuid(os.getuid()).pw_name
829         self.assertRegex(
830             link['name'],
831             r'^Saved at .* by {}@'.format(re.escape(username)))
832
833     def test_put_collection_with_name_and_no_project(self):
834         link_name = 'Test Collection Link in home project'
835         collection = self.run_and_find_collection(
836             "Test named collection in home project",
837             ['--portable-data-hash', '--name', link_name])
838         self.assertEqual(link_name, collection['name'])
839         my_user_uuid = self.current_user()['uuid']
840         self.assertEqual(my_user_uuid, collection['owner_uuid'])
841
842     def test_put_collection_with_named_project_link(self):
843         link_name = 'Test auto Collection Link'
844         collection = self.run_and_find_collection("Test named collection",
845                                       ['--portable-data-hash',
846                                        '--name', link_name,
847                                        '--project-uuid', self.PROJECT_UUID])
848         self.assertEqual(link_name, collection['name'])
849
850
851 if __name__ == '__main__':
852     unittest.main()