11308: Fix arvfile append mode: write() changes the file pointer.
[arvados.git] / sdk / python / tests / test_arv_put.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import str
6 from builtins import range
7 import apiclient
8 import mock
9 import os
10 import pwd
11 import re
12 import shutil
13 import subprocess
14 import sys
15 import tempfile
16 import time
17 import unittest
18 import yaml
19 import threading
20 import hashlib
21 import random
22
23 import arvados
24 import arvados.commands.put as arv_put
25 from . import arvados_testutil as tutil
26
27 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
28 from . import run_test_server
29
30 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
31     CACHE_ARGSET = [
32         [],
33         ['/dev/null'],
34         ['/dev/null', '--filename', 'empty'],
35         ['/tmp']
36         ]
37
38     def tearDown(self):
39         super(ArvadosPutResumeCacheTest, self).tearDown()
40         try:
41             self.last_cache.destroy()
42         except AttributeError:
43             pass
44
45     def cache_path_from_arglist(self, arglist):
46         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
47
48     def test_cache_names_stable(self):
49         for argset in self.CACHE_ARGSET:
50             self.assertEqual(self.cache_path_from_arglist(argset),
51                               self.cache_path_from_arglist(argset),
52                               "cache name changed for {}".format(argset))
53
54     def test_cache_names_unique(self):
55         results = []
56         for argset in self.CACHE_ARGSET:
57             path = self.cache_path_from_arglist(argset)
58             self.assertNotIn(path, results)
59             results.append(path)
60
61     def test_cache_names_simple(self):
62         # The goal here is to make sure the filename doesn't use characters
63         # reserved by the filesystem.  Feel free to adjust this regexp as
64         # long as it still does that.
65         bad_chars = re.compile(r'[^-\.\w]')
66         for argset in self.CACHE_ARGSET:
67             path = self.cache_path_from_arglist(argset)
68             self.assertFalse(bad_chars.search(os.path.basename(path)),
69                              "path too exotic: {}".format(path))
70
71     def test_cache_names_ignore_argument_order(self):
72         self.assertEqual(
73             self.cache_path_from_arglist(['a', 'b', 'c']),
74             self.cache_path_from_arglist(['c', 'a', 'b']))
75         self.assertEqual(
76             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
77             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
78
79     def test_cache_names_differ_for_similar_paths(self):
80         # This test needs names at / that don't exist on the real filesystem.
81         self.assertNotEqual(
82             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
83             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
84
85     def test_cache_names_ignore_irrelevant_arguments(self):
86         # Workaround: parse_arguments bails on --filename with a directory.
87         path1 = self.cache_path_from_arglist(['/tmp'])
88         args = arv_put.parse_arguments(['/tmp'])
89         args.filename = 'tmp'
90         path2 = arv_put.ResumeCache.make_path(args)
91         self.assertEqual(path1, path2,
92                          "cache path considered --filename for directory")
93         self.assertEqual(
94             self.cache_path_from_arglist(['-']),
95             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
96             "cache path considered --max-manifest-depth for file")
97
98     def test_cache_names_treat_negative_manifest_depths_identically(self):
99         base_args = ['/tmp', '--max-manifest-depth']
100         self.assertEqual(
101             self.cache_path_from_arglist(base_args + ['-1']),
102             self.cache_path_from_arglist(base_args + ['-2']))
103
104     def test_cache_names_treat_stdin_consistently(self):
105         self.assertEqual(
106             self.cache_path_from_arglist(['-', '--filename', 'test']),
107             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
108
109     def test_cache_names_identical_for_synonymous_names(self):
110         self.assertEqual(
111             self.cache_path_from_arglist(['.']),
112             self.cache_path_from_arglist([os.path.realpath('.')]))
113         testdir = self.make_tmpdir()
114         looplink = os.path.join(testdir, 'loop')
115         os.symlink(testdir, looplink)
116         self.assertEqual(
117             self.cache_path_from_arglist([testdir]),
118             self.cache_path_from_arglist([looplink]))
119
120     def test_cache_names_different_by_api_host(self):
121         config = arvados.config.settings()
122         orig_host = config.get('ARVADOS_API_HOST')
123         try:
124             name1 = self.cache_path_from_arglist(['.'])
125             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
126             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
127         finally:
128             if orig_host is None:
129                 del config['ARVADOS_API_HOST']
130             else:
131                 config['ARVADOS_API_HOST'] = orig_host
132
133     @mock.patch('arvados.keep.KeepClient.head')
134     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
135         keep_client_head.side_effect = [True]
136         thing = {}
137         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
138         with tempfile.NamedTemporaryFile() as cachefile:
139             self.last_cache = arv_put.ResumeCache(cachefile.name)
140         self.last_cache.save(thing)
141         self.last_cache.close()
142         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
143         self.assertNotEqual(None, resume_cache)
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_finished_streams(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
159         keep_client_head.side_effect = Exception('Locator not found')
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168         self.assertRaises(None, resume_cache.check_cache())
169
170     def test_basic_cache_storage(self):
171         thing = ['test', 'list']
172         with tempfile.NamedTemporaryFile() as cachefile:
173             self.last_cache = arv_put.ResumeCache(cachefile.name)
174         self.last_cache.save(thing)
175         self.assertEqual(thing, self.last_cache.load())
176
177     def test_empty_cache(self):
178         with tempfile.NamedTemporaryFile() as cachefile:
179             cache = arv_put.ResumeCache(cachefile.name)
180         self.assertRaises(ValueError, cache.load)
181
182     def test_cache_persistent(self):
183         thing = ['test', 'list']
184         path = os.path.join(self.make_tmpdir(), 'cache')
185         cache = arv_put.ResumeCache(path)
186         cache.save(thing)
187         cache.close()
188         self.last_cache = arv_put.ResumeCache(path)
189         self.assertEqual(thing, self.last_cache.load())
190
191     def test_multiple_cache_writes(self):
192         thing = ['short', 'list']
193         with tempfile.NamedTemporaryFile() as cachefile:
194             self.last_cache = arv_put.ResumeCache(cachefile.name)
195         # Start writing an object longer than the one we test, to make
196         # sure the cache file gets truncated.
197         self.last_cache.save(['long', 'long', 'list'])
198         self.last_cache.save(thing)
199         self.assertEqual(thing, self.last_cache.load())
200
201     def test_cache_is_locked(self):
202         with tempfile.NamedTemporaryFile() as cachefile:
203             cache = arv_put.ResumeCache(cachefile.name)
204             self.assertRaises(arv_put.ResumeCacheConflict,
205                               arv_put.ResumeCache, cachefile.name)
206
207     def test_cache_stays_locked(self):
208         with tempfile.NamedTemporaryFile() as cachefile:
209             self.last_cache = arv_put.ResumeCache(cachefile.name)
210             path = cachefile.name
211         self.last_cache.save('test')
212         self.assertRaises(arv_put.ResumeCacheConflict,
213                           arv_put.ResumeCache, path)
214
215     def test_destroy_cache(self):
216         cachefile = tempfile.NamedTemporaryFile(delete=False)
217         try:
218             cache = arv_put.ResumeCache(cachefile.name)
219             cache.save('test')
220             cache.destroy()
221             try:
222                 arv_put.ResumeCache(cachefile.name)
223             except arv_put.ResumeCacheConflict:
224                 self.fail("could not load cache after destroying it")
225             self.assertRaises(ValueError, cache.load)
226         finally:
227             if os.path.exists(cachefile.name):
228                 os.unlink(cachefile.name)
229
230     def test_restart_cache(self):
231         path = os.path.join(self.make_tmpdir(), 'cache')
232         cache = arv_put.ResumeCache(path)
233         cache.save('test')
234         cache.restart()
235         self.assertRaises(ValueError, cache.load)
236         self.assertRaises(arv_put.ResumeCacheConflict,
237                           arv_put.ResumeCache, path)
238
239
240 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
241                           ArvadosBaseTestCase):
242
243     def setUp(self):
244         super(ArvPutUploadJobTest, self).setUp()
245         run_test_server.authorize_with('active')
246         # Temp files creation
247         self.tempdir = tempfile.mkdtemp()
248         subdir = os.path.join(self.tempdir, 'subdir')
249         os.mkdir(subdir)
250         data = "x" * 1024 # 1 KB
251         for i in range(1, 5):
252             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
253                 f.write(data * i)
254         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
255             f.write(data * 5)
256         # Large temp file for resume test
257         _, self.large_file_name = tempfile.mkstemp()
258         fileobj = open(self.large_file_name, 'w')
259         # Make sure to write just a little more than one block
260         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
261             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
262             fileobj.write(data)
263         fileobj.close()
264         # Temp dir containing small files to be repacked
265         self.small_files_dir = tempfile.mkdtemp()
266         data = 'y' * 1024 * 1024 # 1 MB
267         for i in range(1, 70):
268             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
269                 f.write(data + str(i))
270         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
271
272     def tearDown(self):
273         super(ArvPutUploadJobTest, self).tearDown()
274         shutil.rmtree(self.tempdir)
275         os.unlink(self.large_file_name)
276         shutil.rmtree(self.small_files_dir)
277
278     def test_writer_works_without_cache(self):
279         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
280         cwriter.start(save_collection=False)
281         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
282
283     def test_writer_works_with_cache(self):
284         with tempfile.NamedTemporaryFile() as f:
285             f.write(b'foo')
286             f.flush()
287             cwriter = arv_put.ArvPutUploadJob([f.name])
288             cwriter.start(save_collection=False)
289             self.assertEqual(0, cwriter.bytes_skipped)
290             self.assertEqual(3, cwriter.bytes_written)
291             # Don't destroy the cache, and start another upload
292             cwriter_new = arv_put.ArvPutUploadJob([f.name])
293             cwriter_new.start(save_collection=False)
294             cwriter_new.destroy_cache()
295             self.assertEqual(3, cwriter_new.bytes_skipped)
296             self.assertEqual(3, cwriter_new.bytes_written)
297
298     def make_progress_tester(self):
299         progression = []
300         def record_func(written, expected):
301             progression.append((written, expected))
302         return progression, record_func
303
304     def test_progress_reporting(self):
305         with tempfile.NamedTemporaryFile() as f:
306             f.write(b'foo')
307             f.flush()
308             for expect_count in (None, 8):
309                 progression, reporter = self.make_progress_tester()
310                 cwriter = arv_put.ArvPutUploadJob([f.name],
311                     reporter=reporter, bytes_expected=expect_count)
312                 cwriter.start(save_collection=False)
313                 cwriter.destroy_cache()
314                 self.assertIn((3, expect_count), progression)
315
316     def test_writer_upload_directory(self):
317         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
318         cwriter.start(save_collection=False)
319         cwriter.destroy_cache()
320         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
321
322     def test_resume_large_file_upload(self):
323         def wrapped_write(*args, **kwargs):
324             data = args[1]
325             # Exit only on last block
326             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
327                 # Simulate a checkpoint before quitting. Ensure block commit.
328                 self.writer._update(final=True)
329                 raise SystemExit("Simulated error")
330             return self.arvfile_write(*args, **kwargs)
331
332         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
333                         autospec=True) as mocked_write:
334             mocked_write.side_effect = wrapped_write
335             writer = arv_put.ArvPutUploadJob([self.large_file_name],
336                                              replication_desired=1)
337             # We'll be accessing from inside the wrapper
338             self.writer = writer
339             with self.assertRaises(SystemExit):
340                 writer.start(save_collection=False)
341             # Confirm that the file was partially uploaded
342             self.assertGreater(writer.bytes_written, 0)
343             self.assertLess(writer.bytes_written,
344                             os.path.getsize(self.large_file_name))
345         # Retry the upload
346         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
347                                           replication_desired=1)
348         writer2.start(save_collection=False)
349         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
350                          os.path.getsize(self.large_file_name))
351         writer2.destroy_cache()
352         del(self.writer)
353
354     # Test for bug #11002
355     def test_graceful_exit_while_repacking_small_blocks(self):
356         def wrapped_commit(*args, **kwargs):
357             raise SystemExit("Simulated error")
358
359         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
360                         autospec=True) as mocked_commit:
361             mocked_commit.side_effect = wrapped_commit
362             # Upload a little more than 1 block, wrapped_commit will make the first block
363             # commit to fail.
364             # arv-put should not exit with an exception by trying to commit the collection
365             # as it's in an inconsistent state.
366             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
367                                              replication_desired=1)
368             try:
369                 with self.assertRaises(SystemExit):
370                     writer.start(save_collection=False)
371             except arvados.arvfile.UnownedBlockError:
372                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
373         writer.destroy_cache()
374
375     def test_no_resume_when_asked(self):
376         def wrapped_write(*args, **kwargs):
377             data = args[1]
378             # Exit only on last block
379             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
380                 # Simulate a checkpoint before quitting.
381                 self.writer._update()
382                 raise SystemExit("Simulated error")
383             return self.arvfile_write(*args, **kwargs)
384
385         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
386                         autospec=True) as mocked_write:
387             mocked_write.side_effect = wrapped_write
388             writer = arv_put.ArvPutUploadJob([self.large_file_name],
389                                              replication_desired=1)
390             # We'll be accessing from inside the wrapper
391             self.writer = writer
392             with self.assertRaises(SystemExit):
393                 writer.start(save_collection=False)
394             # Confirm that the file was partially uploaded
395             self.assertGreater(writer.bytes_written, 0)
396             self.assertLess(writer.bytes_written,
397                             os.path.getsize(self.large_file_name))
398         # Retry the upload, this time without resume
399         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
400                                           replication_desired=1,
401                                           resume=False)
402         writer2.start(save_collection=False)
403         self.assertEqual(writer2.bytes_skipped, 0)
404         self.assertEqual(writer2.bytes_written,
405                          os.path.getsize(self.large_file_name))
406         writer2.destroy_cache()
407         del(self.writer)
408
409     def test_no_resume_when_no_cache(self):
410         def wrapped_write(*args, **kwargs):
411             data = args[1]
412             # Exit only on last block
413             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
414                 # Simulate a checkpoint before quitting.
415                 self.writer._update()
416                 raise SystemExit("Simulated error")
417             return self.arvfile_write(*args, **kwargs)
418
419         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
420                         autospec=True) as mocked_write:
421             mocked_write.side_effect = wrapped_write
422             writer = arv_put.ArvPutUploadJob([self.large_file_name],
423                                              replication_desired=1)
424             # We'll be accessing from inside the wrapper
425             self.writer = writer
426             with self.assertRaises(SystemExit):
427                 writer.start(save_collection=False)
428             # Confirm that the file was partially uploaded
429             self.assertGreater(writer.bytes_written, 0)
430             self.assertLess(writer.bytes_written,
431                             os.path.getsize(self.large_file_name))
432         # Retry the upload, this time without cache usage
433         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
434                                           replication_desired=1,
435                                           resume=False,
436                                           use_cache=False)
437         writer2.start(save_collection=False)
438         self.assertEqual(writer2.bytes_skipped, 0)
439         self.assertEqual(writer2.bytes_written,
440                          os.path.getsize(self.large_file_name))
441         writer2.destroy_cache()
442         del(self.writer)
443
444     def test_dry_run_feature(self):
445         def wrapped_write(*args, **kwargs):
446             data = args[1]
447             # Exit only on last block
448             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
449                 # Simulate a checkpoint before quitting.
450                 self.writer._update()
451                 raise SystemExit("Simulated error")
452             return self.arvfile_write(*args, **kwargs)
453
454         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
455                         autospec=True) as mocked_write:
456             mocked_write.side_effect = wrapped_write
457             writer = arv_put.ArvPutUploadJob([self.large_file_name],
458                                              replication_desired=1)
459             # We'll be accessing from inside the wrapper
460             self.writer = writer
461             with self.assertRaises(SystemExit):
462                 writer.start(save_collection=False)
463             # Confirm that the file was partially uploaded
464             self.assertGreater(writer.bytes_written, 0)
465             self.assertLess(writer.bytes_written,
466                             os.path.getsize(self.large_file_name))
467         # Retry the upload using dry_run to check if there is a pending upload
468         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
469                                           replication_desired=1,
470                                           dry_run=True)
471         with self.assertRaises(arv_put.ArvPutUploadIsPending):
472             writer2.start(save_collection=False)
473         # Complete the pending upload
474         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
475                                           replication_desired=1)
476         writer3.start(save_collection=False)
477         # Confirm there's no pending upload with dry_run=True
478         writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
479                                           replication_desired=1,
480                                           dry_run=True)
481         with self.assertRaises(arv_put.ArvPutUploadNotPending):
482             writer4.start(save_collection=False)
483         writer4.destroy_cache()
484         # Test obvious cases
485         with self.assertRaises(arv_put.ArvPutUploadIsPending):
486             arv_put.ArvPutUploadJob([self.large_file_name],
487                                     replication_desired=1,
488                                     dry_run=True,
489                                     resume=False,
490                                     use_cache=False)
491         with self.assertRaises(arv_put.ArvPutUploadIsPending):
492             arv_put.ArvPutUploadJob([self.large_file_name],
493                                     replication_desired=1,
494                                     dry_run=True,
495                                     resume=False)
496         del(self.writer)
497
498 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
499     TEST_SIZE = os.path.getsize(__file__)
500
501     def test_expected_bytes_for_file(self):
502         self.assertEqual(self.TEST_SIZE,
503                           arv_put.expected_bytes_for([__file__]))
504
505     def test_expected_bytes_for_tree(self):
506         tree = self.make_tmpdir()
507         shutil.copyfile(__file__, os.path.join(tree, 'one'))
508         shutil.copyfile(__file__, os.path.join(tree, 'two'))
509         self.assertEqual(self.TEST_SIZE * 2,
510                           arv_put.expected_bytes_for([tree]))
511         self.assertEqual(self.TEST_SIZE * 3,
512                           arv_put.expected_bytes_for([tree, __file__]))
513
514     def test_expected_bytes_for_device(self):
515         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
516         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
517
518
519 class ArvadosPutReportTest(ArvadosBaseTestCase):
520     def test_machine_progress(self):
521         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
522             expect = ": {} written {} total\n".format(
523                 count, -1 if (total is None) else total)
524             self.assertTrue(
525                 arv_put.machine_progress(count, total).endswith(expect))
526
527     def test_known_human_progress(self):
528         for count, total in [(0, 1), (2, 4), (45, 60)]:
529             expect = '{:.1%}'.format(1.0*count/total)
530             actual = arv_put.human_progress(count, total)
531             self.assertTrue(actual.startswith('\r'))
532             self.assertIn(expect, actual)
533
534     def test_unknown_human_progress(self):
535         for count in [1, 20, 300, 4000, 50000]:
536             self.assertTrue(re.search(r'\b{}\b'.format(count),
537                                       arv_put.human_progress(count, None)))
538
539
540 class ArvadosPutTest(run_test_server.TestCaseWithServers,
541                      ArvadosBaseTestCase,
542                      tutil.VersionChecker):
543     MAIN_SERVER = {}
544     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
545
546     def call_main_with_args(self, args):
547         self.main_stdout = tutil.StringIO()
548         self.main_stderr = tutil.StringIO()
549         return arv_put.main(args, self.main_stdout, self.main_stderr)
550
551     def call_main_on_test_file(self, args=[]):
552         with self.make_test_file() as testfile:
553             path = testfile.name
554             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
555         self.assertTrue(
556             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
557                                         '098f6bcd4621d373cade4e832627b4f6')),
558             "did not find file stream in Keep store")
559
560     def setUp(self):
561         super(ArvadosPutTest, self).setUp()
562         run_test_server.authorize_with('active')
563         arv_put.api_client = None
564
565     def tearDown(self):
566         for outbuf in ['main_stdout', 'main_stderr']:
567             if hasattr(self, outbuf):
568                 getattr(self, outbuf).close()
569                 delattr(self, outbuf)
570         super(ArvadosPutTest, self).tearDown()
571
572     def test_version_argument(self):
573         with tutil.redirected_streams(
574                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
575             with self.assertRaises(SystemExit):
576                 self.call_main_with_args(['--version'])
577         self.assertVersionOutput(out, err)
578
579     def test_simple_file_put(self):
580         self.call_main_on_test_file()
581
582     def test_put_with_unwriteable_cache_dir(self):
583         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
584         cachedir = self.make_tmpdir()
585         os.chmod(cachedir, 0o0)
586         arv_put.ResumeCache.CACHE_DIR = cachedir
587         try:
588             self.call_main_on_test_file()
589         finally:
590             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
591             os.chmod(cachedir, 0o700)
592
593     def test_put_with_unwritable_cache_subdir(self):
594         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
595         cachedir = self.make_tmpdir()
596         os.chmod(cachedir, 0o0)
597         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
598         try:
599             self.call_main_on_test_file()
600         finally:
601             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
602             os.chmod(cachedir, 0o700)
603
604     def test_put_block_replication(self):
605         self.call_main_on_test_file()
606         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
607             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
608             self.call_main_on_test_file(['--replication', '1'])
609             self.call_main_on_test_file(['--replication', '4'])
610             self.call_main_on_test_file(['--replication', '5'])
611             self.assertEqual(
612                 [x[-1].get('copies') for x in put_mock.call_args_list],
613                 [1, 4, 5])
614
615     def test_normalize(self):
616         testfile1 = self.make_test_file()
617         testfile2 = self.make_test_file()
618         test_paths = [testfile1.name, testfile2.name]
619         # Reverse-sort the paths, so normalization must change their order.
620         test_paths.sort(reverse=True)
621         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
622                                  test_paths)
623         manifest = self.main_stdout.getvalue()
624         # Assert the second file we specified appears first in the manifest.
625         file_indices = [manifest.find(':' + os.path.basename(path))
626                         for path in test_paths]
627         self.assertGreater(*file_indices)
628
629     def test_error_name_without_collection(self):
630         self.assertRaises(SystemExit, self.call_main_with_args,
631                           ['--name', 'test without Collection',
632                            '--stream', '/dev/null'])
633
634     def test_error_when_project_not_found(self):
635         self.assertRaises(SystemExit,
636                           self.call_main_with_args,
637                           ['--project-uuid', self.Z_UUID])
638
639     def test_error_bad_project_uuid(self):
640         self.assertRaises(SystemExit,
641                           self.call_main_with_args,
642                           ['--project-uuid', self.Z_UUID, '--stream'])
643
644     def test_api_error_handling(self):
645         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
646         coll_save_mock.side_effect = arvados.errors.ApiError(
647             fake_httplib2_response(403), b'{}')
648         with mock.patch('arvados.collection.Collection.save_new',
649                         new=coll_save_mock):
650             with self.assertRaises(SystemExit) as exc_test:
651                 self.call_main_with_args(['/dev/null'])
652             self.assertLess(0, exc_test.exception.args[0])
653             self.assertLess(0, coll_save_mock.call_count)
654             self.assertEqual("", self.main_stdout.getvalue())
655
656
657 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
658                             ArvadosBaseTestCase):
659     def _getKeepServerConfig():
660         for config_file, mandatory in [
661                 ['application.yml', False], ['application.default.yml', True]]:
662             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
663                                 "api", "config", config_file)
664             if not mandatory and not os.path.exists(path):
665                 continue
666             with open(path) as f:
667                 rails_config = yaml.load(f.read())
668                 for config_section in ['test', 'common']:
669                     try:
670                         key = rails_config[config_section]["blob_signing_key"]
671                     except (KeyError, TypeError):
672                         pass
673                     else:
674                         return {'blob_signing_key': key,
675                                 'enforce_permissions': True}
676         return {'blog_signing_key': None, 'enforce_permissions': False}
677
678     MAIN_SERVER = {}
679     KEEP_SERVER = _getKeepServerConfig()
680     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
681
682     @classmethod
683     def setUpClass(cls):
684         super(ArvPutIntegrationTest, cls).setUpClass()
685         cls.ENVIRON = os.environ.copy()
686         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
687
688     def setUp(self):
689         super(ArvPutIntegrationTest, self).setUp()
690         arv_put.api_client = None
691
692     def authorize_with(self, token_name):
693         run_test_server.authorize_with(token_name)
694         for v in ["ARVADOS_API_HOST",
695                   "ARVADOS_API_HOST_INSECURE",
696                   "ARVADOS_API_TOKEN"]:
697             self.ENVIRON[v] = arvados.config.settings()[v]
698         arv_put.api_client = arvados.api('v1')
699
700     def current_user(self):
701         return arv_put.api_client.users().current().execute()
702
703     def test_check_real_project_found(self):
704         self.authorize_with('active')
705         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
706                         "did not correctly find test fixture project")
707
708     def test_check_error_finding_nonexistent_uuid(self):
709         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
710         self.authorize_with('active')
711         try:
712             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
713                                                   0)
714         except ValueError as error:
715             self.assertIn(BAD_UUID, str(error))
716         else:
717             self.assertFalse(result, "incorrectly found nonexistent project")
718
719     def test_check_error_finding_nonexistent_project(self):
720         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
721         self.authorize_with('active')
722         with self.assertRaises(apiclient.errors.HttpError):
723             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
724                                                   0)
725
726     def test_short_put_from_stdin(self):
727         # Have to run this as an integration test since arv-put can't
728         # read from the tests' stdin.
729         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
730         # case, because the /proc entry is already gone by the time it tries.
731         pipe = subprocess.Popen(
732             [sys.executable, arv_put.__file__, '--stream'],
733             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
734             stderr=subprocess.STDOUT, env=self.ENVIRON)
735         pipe.stdin.write(b'stdin test\n')
736         pipe.stdin.close()
737         deadline = time.time() + 5
738         while (pipe.poll() is None) and (time.time() < deadline):
739             time.sleep(.1)
740         returncode = pipe.poll()
741         if returncode is None:
742             pipe.terminate()
743             self.fail("arv-put did not PUT from stdin within 5 seconds")
744         elif returncode != 0:
745             sys.stdout.write(pipe.stdout.read())
746             self.fail("arv-put returned exit code {}".format(returncode))
747         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
748                       pipe.stdout.read().decode())
749
750     def test_ArvPutSignedManifest(self):
751         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
752         # the newly created manifest from the API server, testing to confirm
753         # that the block locators in the returned manifest are signed.
754         self.authorize_with('active')
755
756         # Before doing anything, demonstrate that the collection
757         # we're about to create is not present in our test fixture.
758         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
759         with self.assertRaises(apiclient.errors.HttpError):
760             notfound = arv_put.api_client.collections().get(
761                 uuid=manifest_uuid).execute()
762
763         datadir = self.make_tmpdir()
764         with open(os.path.join(datadir, "foo"), "w") as f:
765             f.write("The quick brown fox jumped over the lazy dog")
766         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
767                              stdout=subprocess.PIPE,
768                              stderr=subprocess.PIPE,
769                              env=self.ENVIRON)
770         (out, err) = p.communicate()
771         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
772         self.assertEqual(p.returncode, 0)
773
774         # The manifest text stored in the API server under the same
775         # manifest UUID must use signed locators.
776         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
777         self.assertRegex(
778             c['manifest_text'],
779             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
780
781         os.remove(os.path.join(datadir, "foo"))
782         os.rmdir(datadir)
783
784     def run_and_find_collection(self, text, extra_args=[]):
785         self.authorize_with('active')
786         pipe = subprocess.Popen(
787             [sys.executable, arv_put.__file__] + extra_args,
788             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
789             stderr=subprocess.PIPE, env=self.ENVIRON)
790         stdout, stderr = pipe.communicate(text.encode())
791         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
792         search_key = ('portable_data_hash'
793                       if '--portable-data-hash' in extra_args else 'uuid')
794         collection_list = arvados.api('v1').collections().list(
795             filters=[[search_key, '=', stdout.decode().strip()]]
796         ).execute().get('items', [])
797         self.assertEqual(1, len(collection_list))
798         return collection_list[0]
799
800     def test_put_collection_with_later_update(self):
801         tmpdir = self.make_tmpdir()
802         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
803             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
804         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
805         self.assertNotEqual(None, col['uuid'])
806         # Add a new file to the directory
807         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
808             f.write('The quick brown fox jumped over the lazy dog')
809         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
810         self.assertEqual(col['uuid'], updated_col['uuid'])
811         # Get the manifest and check that the new file is being included
812         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
813         self.assertRegex(c['manifest_text'], r'^\. .*:44:file2\n')
814
815     def test_put_collection_with_high_redundancy(self):
816         # Write empty data: we're not testing CollectionWriter, just
817         # making sure collections.create tells the API server what our
818         # desired replication level is.
819         collection = self.run_and_find_collection("", ['--replication', '4'])
820         self.assertEqual(4, collection['replication_desired'])
821
822     def test_put_collection_with_default_redundancy(self):
823         collection = self.run_and_find_collection("")
824         self.assertEqual(None, collection['replication_desired'])
825
826     def test_put_collection_with_unnamed_project_link(self):
827         link = self.run_and_find_collection(
828             "Test unnamed collection",
829             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
830         username = pwd.getpwuid(os.getuid()).pw_name
831         self.assertRegex(
832             link['name'],
833             r'^Saved at .* by {}@'.format(re.escape(username)))
834
835     def test_put_collection_with_name_and_no_project(self):
836         link_name = 'Test Collection Link in home project'
837         collection = self.run_and_find_collection(
838             "Test named collection in home project",
839             ['--portable-data-hash', '--name', link_name])
840         self.assertEqual(link_name, collection['name'])
841         my_user_uuid = self.current_user()['uuid']
842         self.assertEqual(my_user_uuid, collection['owner_uuid'])
843
844     def test_put_collection_with_named_project_link(self):
845         link_name = 'Test auto Collection Link'
846         collection = self.run_and_find_collection("Test named collection",
847                                       ['--portable-data-hash',
848                                        '--name', link_name,
849                                        '--project-uuid', self.PROJECT_UUID])
850         self.assertEqual(link_name, collection['name'])
851
852
853 if __name__ == '__main__':
854     unittest.main()