11308: Fix useless assertion.
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from __future__ import absolute_import
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
8 from builtins import str
9 from builtins import range
10 import apiclient
11 import mock
12 import os
13 import pwd
14 import re
15 import shutil
16 import subprocess
17 import sys
18 import tempfile
19 import time
20 import unittest
21 import yaml
22 import threading
23 import hashlib
24 import random
25
26 import arvados
27 import arvados.commands.put as arv_put
28 from . import arvados_testutil as tutil
29
30 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
31 from . import run_test_server
32
33 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
34     CACHE_ARGSET = [
35         [],
36         ['/dev/null'],
37         ['/dev/null', '--filename', 'empty'],
38         ['/tmp']
39         ]
40
41     def tearDown(self):
42         super(ArvadosPutResumeCacheTest, self).tearDown()
43         try:
44             self.last_cache.destroy()
45         except AttributeError:
46             pass
47
48     def cache_path_from_arglist(self, arglist):
49         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
50
51     def test_cache_names_stable(self):
52         for argset in self.CACHE_ARGSET:
53             self.assertEqual(self.cache_path_from_arglist(argset),
54                               self.cache_path_from_arglist(argset),
55                               "cache name changed for {}".format(argset))
56
57     def test_cache_names_unique(self):
58         results = []
59         for argset in self.CACHE_ARGSET:
60             path = self.cache_path_from_arglist(argset)
61             self.assertNotIn(path, results)
62             results.append(path)
63
64     def test_cache_names_simple(self):
65         # The goal here is to make sure the filename doesn't use characters
66         # reserved by the filesystem.  Feel free to adjust this regexp as
67         # long as it still does that.
68         bad_chars = re.compile(r'[^-\.\w]')
69         for argset in self.CACHE_ARGSET:
70             path = self.cache_path_from_arglist(argset)
71             self.assertFalse(bad_chars.search(os.path.basename(path)),
72                              "path too exotic: {}".format(path))
73
74     def test_cache_names_ignore_argument_order(self):
75         self.assertEqual(
76             self.cache_path_from_arglist(['a', 'b', 'c']),
77             self.cache_path_from_arglist(['c', 'a', 'b']))
78         self.assertEqual(
79             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
80             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
81
82     def test_cache_names_differ_for_similar_paths(self):
83         # This test needs names at / that don't exist on the real filesystem.
84         self.assertNotEqual(
85             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
86             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
87
88     def test_cache_names_ignore_irrelevant_arguments(self):
89         # Workaround: parse_arguments bails on --filename with a directory.
90         path1 = self.cache_path_from_arglist(['/tmp'])
91         args = arv_put.parse_arguments(['/tmp'])
92         args.filename = 'tmp'
93         path2 = arv_put.ResumeCache.make_path(args)
94         self.assertEqual(path1, path2,
95                          "cache path considered --filename for directory")
96         self.assertEqual(
97             self.cache_path_from_arglist(['-']),
98             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
99             "cache path considered --max-manifest-depth for file")
100
101     def test_cache_names_treat_negative_manifest_depths_identically(self):
102         base_args = ['/tmp', '--max-manifest-depth']
103         self.assertEqual(
104             self.cache_path_from_arglist(base_args + ['-1']),
105             self.cache_path_from_arglist(base_args + ['-2']))
106
107     def test_cache_names_treat_stdin_consistently(self):
108         self.assertEqual(
109             self.cache_path_from_arglist(['-', '--filename', 'test']),
110             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
111
112     def test_cache_names_identical_for_synonymous_names(self):
113         self.assertEqual(
114             self.cache_path_from_arglist(['.']),
115             self.cache_path_from_arglist([os.path.realpath('.')]))
116         testdir = self.make_tmpdir()
117         looplink = os.path.join(testdir, 'loop')
118         os.symlink(testdir, looplink)
119         self.assertEqual(
120             self.cache_path_from_arglist([testdir]),
121             self.cache_path_from_arglist([looplink]))
122
123     def test_cache_names_different_by_api_host(self):
124         config = arvados.config.settings()
125         orig_host = config.get('ARVADOS_API_HOST')
126         try:
127             name1 = self.cache_path_from_arglist(['.'])
128             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
129             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
130         finally:
131             if orig_host is None:
132                 del config['ARVADOS_API_HOST']
133             else:
134                 config['ARVADOS_API_HOST'] = orig_host
135
136     @mock.patch('arvados.keep.KeepClient.head')
137     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
138         keep_client_head.side_effect = [True]
139         thing = {}
140         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
141         with tempfile.NamedTemporaryFile() as cachefile:
142             self.last_cache = arv_put.ResumeCache(cachefile.name)
143         self.last_cache.save(thing)
144         self.last_cache.close()
145         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
146         self.assertNotEqual(None, resume_cache)
147
148     @mock.patch('arvados.keep.KeepClient.head')
149     def test_resume_cache_with_finished_streams(self, keep_client_head):
150         keep_client_head.side_effect = [True]
151         thing = {}
152         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
153         with tempfile.NamedTemporaryFile() as cachefile:
154             self.last_cache = arv_put.ResumeCache(cachefile.name)
155         self.last_cache.save(thing)
156         self.last_cache.close()
157         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
158         self.assertNotEqual(None, resume_cache)
159
160     @mock.patch('arvados.keep.KeepClient.head')
161     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
162         keep_client_head.side_effect = Exception('Locator not found')
163         thing = {}
164         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
165         with tempfile.NamedTemporaryFile() as cachefile:
166             self.last_cache = arv_put.ResumeCache(cachefile.name)
167         self.last_cache.save(thing)
168         self.last_cache.close()
169         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
170         self.assertNotEqual(None, resume_cache)
171         self.assertRaises(None, resume_cache.check_cache())
172
173     def test_basic_cache_storage(self):
174         thing = ['test', 'list']
175         with tempfile.NamedTemporaryFile() as cachefile:
176             self.last_cache = arv_put.ResumeCache(cachefile.name)
177         self.last_cache.save(thing)
178         self.assertEqual(thing, self.last_cache.load())
179
180     def test_empty_cache(self):
181         with tempfile.NamedTemporaryFile() as cachefile:
182             cache = arv_put.ResumeCache(cachefile.name)
183         self.assertRaises(ValueError, cache.load)
184
185     def test_cache_persistent(self):
186         thing = ['test', 'list']
187         path = os.path.join(self.make_tmpdir(), 'cache')
188         cache = arv_put.ResumeCache(path)
189         cache.save(thing)
190         cache.close()
191         self.last_cache = arv_put.ResumeCache(path)
192         self.assertEqual(thing, self.last_cache.load())
193
194     def test_multiple_cache_writes(self):
195         thing = ['short', 'list']
196         with tempfile.NamedTemporaryFile() as cachefile:
197             self.last_cache = arv_put.ResumeCache(cachefile.name)
198         # Start writing an object longer than the one we test, to make
199         # sure the cache file gets truncated.
200         self.last_cache.save(['long', 'long', 'list'])
201         self.last_cache.save(thing)
202         self.assertEqual(thing, self.last_cache.load())
203
204     def test_cache_is_locked(self):
205         with tempfile.NamedTemporaryFile() as cachefile:
206             cache = arv_put.ResumeCache(cachefile.name)
207             self.assertRaises(arv_put.ResumeCacheConflict,
208                               arv_put.ResumeCache, cachefile.name)
209
210     def test_cache_stays_locked(self):
211         with tempfile.NamedTemporaryFile() as cachefile:
212             self.last_cache = arv_put.ResumeCache(cachefile.name)
213             path = cachefile.name
214         self.last_cache.save('test')
215         self.assertRaises(arv_put.ResumeCacheConflict,
216                           arv_put.ResumeCache, path)
217
218     def test_destroy_cache(self):
219         cachefile = tempfile.NamedTemporaryFile(delete=False)
220         try:
221             cache = arv_put.ResumeCache(cachefile.name)
222             cache.save('test')
223             cache.destroy()
224             try:
225                 arv_put.ResumeCache(cachefile.name)
226             except arv_put.ResumeCacheConflict:
227                 self.fail("could not load cache after destroying it")
228             self.assertRaises(ValueError, cache.load)
229         finally:
230             if os.path.exists(cachefile.name):
231                 os.unlink(cachefile.name)
232
233     def test_restart_cache(self):
234         path = os.path.join(self.make_tmpdir(), 'cache')
235         cache = arv_put.ResumeCache(path)
236         cache.save('test')
237         cache.restart()
238         self.assertRaises(ValueError, cache.load)
239         self.assertRaises(arv_put.ResumeCacheConflict,
240                           arv_put.ResumeCache, path)
241
242
243 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
244                           ArvadosBaseTestCase):
245
246     def setUp(self):
247         super(ArvPutUploadJobTest, self).setUp()
248         run_test_server.authorize_with('active')
249         # Temp files creation
250         self.tempdir = tempfile.mkdtemp()
251         subdir = os.path.join(self.tempdir, 'subdir')
252         os.mkdir(subdir)
253         data = "x" * 1024 # 1 KB
254         for i in range(1, 5):
255             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
256                 f.write(data * i)
257         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
258             f.write(data * 5)
259         # Large temp file for resume test
260         _, self.large_file_name = tempfile.mkstemp()
261         fileobj = open(self.large_file_name, 'w')
262         # Make sure to write just a little more than one block
263         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
264             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
265             fileobj.write(data)
266         fileobj.close()
267         # Temp dir containing small files to be repacked
268         self.small_files_dir = tempfile.mkdtemp()
269         data = 'y' * 1024 * 1024 # 1 MB
270         for i in range(1, 70):
271             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
272                 f.write(data + str(i))
273         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
274
275     def tearDown(self):
276         super(ArvPutUploadJobTest, self).tearDown()
277         shutil.rmtree(self.tempdir)
278         os.unlink(self.large_file_name)
279         shutil.rmtree(self.small_files_dir)
280
281     def test_writer_works_without_cache(self):
282         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
283         cwriter.start(save_collection=False)
284         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
285
286     def test_writer_works_with_cache(self):
287         with tempfile.NamedTemporaryFile() as f:
288             f.write(b'foo')
289             f.flush()
290             cwriter = arv_put.ArvPutUploadJob([f.name])
291             cwriter.start(save_collection=False)
292             self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
293             # Don't destroy the cache, and start another upload
294             cwriter_new = arv_put.ArvPutUploadJob([f.name])
295             cwriter_new.start(save_collection=False)
296             cwriter_new.destroy_cache()
297             self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
298
299     def make_progress_tester(self):
300         progression = []
301         def record_func(written, expected):
302             progression.append((written, expected))
303         return progression, record_func
304
305     def test_progress_reporting(self):
306         with tempfile.NamedTemporaryFile() as f:
307             f.write(b'foo')
308             f.flush()
309             for expect_count in (None, 8):
310                 progression, reporter = self.make_progress_tester()
311                 cwriter = arv_put.ArvPutUploadJob([f.name],
312                     reporter=reporter, bytes_expected=expect_count)
313                 cwriter.start(save_collection=False)
314                 cwriter.destroy_cache()
315                 self.assertIn((3, expect_count), progression)
316
317     def test_writer_upload_directory(self):
318         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
319         cwriter.start(save_collection=False)
320         cwriter.destroy_cache()
321         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
322
323     def test_resume_large_file_upload(self):
324         def wrapped_write(*args, **kwargs):
325             data = args[1]
326             # Exit only on last block
327             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
328                 # Simulate a checkpoint before quitting. Ensure block commit.
329                 self.writer._update(final=True)
330                 raise SystemExit("Simulated error")
331             return self.arvfile_write(*args, **kwargs)
332
333         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
334                         autospec=True) as mocked_write:
335             mocked_write.side_effect = wrapped_write
336             writer = arv_put.ArvPutUploadJob([self.large_file_name],
337                                              replication_desired=1)
338             # We'll be accessing from inside the wrapper
339             self.writer = writer
340             with self.assertRaises(SystemExit):
341                 writer.start(save_collection=False)
342             # Confirm that the file was partially uploaded
343             self.assertGreater(writer.bytes_written, 0)
344             self.assertLess(writer.bytes_written,
345                             os.path.getsize(self.large_file_name))
346         # Retry the upload
347         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
348                                           replication_desired=1)
349         writer2.start(save_collection=False)
350         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
351                          os.path.getsize(self.large_file_name))
352         writer2.destroy_cache()
353         del(self.writer)
354
355     # Test for bug #11002
356     def test_graceful_exit_while_repacking_small_blocks(self):
357         def wrapped_commit(*args, **kwargs):
358             raise SystemExit("Simulated error")
359
360         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
361                         autospec=True) as mocked_commit:
362             mocked_commit.side_effect = wrapped_commit
363             # Upload a little more than 1 block, wrapped_commit will make the first block
364             # commit to fail.
365             # arv-put should not exit with an exception by trying to commit the collection
366             # as it's in an inconsistent state.
367             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
368                                              replication_desired=1)
369             try:
370                 with self.assertRaises(SystemExit):
371                     writer.start(save_collection=False)
372             except arvados.arvfile.UnownedBlockError:
373                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
374         writer.destroy_cache()
375
376     def test_no_resume_when_asked(self):
377         def wrapped_write(*args, **kwargs):
378             data = args[1]
379             # Exit only on last block
380             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
381                 # Simulate a checkpoint before quitting.
382                 self.writer._update()
383                 raise SystemExit("Simulated error")
384             return self.arvfile_write(*args, **kwargs)
385
386         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
387                         autospec=True) as mocked_write:
388             mocked_write.side_effect = wrapped_write
389             writer = arv_put.ArvPutUploadJob([self.large_file_name],
390                                              replication_desired=1)
391             # We'll be accessing from inside the wrapper
392             self.writer = writer
393             with self.assertRaises(SystemExit):
394                 writer.start(save_collection=False)
395             # Confirm that the file was partially uploaded
396             self.assertGreater(writer.bytes_written, 0)
397             self.assertLess(writer.bytes_written,
398                             os.path.getsize(self.large_file_name))
399         # Retry the upload, this time without resume
400         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
401                                           replication_desired=1,
402                                           resume=False)
403         writer2.start(save_collection=False)
404         self.assertEqual(writer2.bytes_skipped, 0)
405         self.assertEqual(writer2.bytes_written,
406                          os.path.getsize(self.large_file_name))
407         writer2.destroy_cache()
408         del(self.writer)
409
410     def test_no_resume_when_no_cache(self):
411         def wrapped_write(*args, **kwargs):
412             data = args[1]
413             # Exit only on last block
414             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
415                 # Simulate a checkpoint before quitting.
416                 self.writer._update()
417                 raise SystemExit("Simulated error")
418             return self.arvfile_write(*args, **kwargs)
419
420         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
421                         autospec=True) as mocked_write:
422             mocked_write.side_effect = wrapped_write
423             writer = arv_put.ArvPutUploadJob([self.large_file_name],
424                                              replication_desired=1)
425             # We'll be accessing from inside the wrapper
426             self.writer = writer
427             with self.assertRaises(SystemExit):
428                 writer.start(save_collection=False)
429             # Confirm that the file was partially uploaded
430             self.assertGreater(writer.bytes_written, 0)
431             self.assertLess(writer.bytes_written,
432                             os.path.getsize(self.large_file_name))
433         # Retry the upload, this time without cache usage
434         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
435                                           replication_desired=1,
436                                           resume=False,
437                                           use_cache=False)
438         writer2.start(save_collection=False)
439         self.assertEqual(writer2.bytes_skipped, 0)
440         self.assertEqual(writer2.bytes_written,
441                          os.path.getsize(self.large_file_name))
442         writer2.destroy_cache()
443         del(self.writer)
444
445     def test_dry_run_feature(self):
446         def wrapped_write(*args, **kwargs):
447             data = args[1]
448             # Exit only on last block
449             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
450                 # Simulate a checkpoint before quitting.
451                 self.writer._update()
452                 raise SystemExit("Simulated error")
453             return self.arvfile_write(*args, **kwargs)
454
455         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
456                         autospec=True) as mocked_write:
457             mocked_write.side_effect = wrapped_write
458             writer = arv_put.ArvPutUploadJob([self.large_file_name],
459                                              replication_desired=1)
460             # We'll be accessing from inside the wrapper
461             self.writer = writer
462             with self.assertRaises(SystemExit):
463                 writer.start(save_collection=False)
464             # Confirm that the file was partially uploaded
465             self.assertGreater(writer.bytes_written, 0)
466             self.assertLess(writer.bytes_written,
467                             os.path.getsize(self.large_file_name))
468         # Retry the upload using dry_run to check if there is a pending upload
469         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
470                                           replication_desired=1,
471                                           dry_run=True)
472         with self.assertRaises(arv_put.ArvPutUploadIsPending):
473             writer2.start(save_collection=False)
474         # Complete the pending upload
475         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
476                                           replication_desired=1)
477         writer3.start(save_collection=False)
478         # Confirm there's no pending upload with dry_run=True
479         writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
480                                           replication_desired=1,
481                                           dry_run=True)
482         with self.assertRaises(arv_put.ArvPutUploadNotPending):
483             writer4.start(save_collection=False)
484         writer4.destroy_cache()
485         # Test obvious cases
486         with self.assertRaises(arv_put.ArvPutUploadIsPending):
487             arv_put.ArvPutUploadJob([self.large_file_name],
488                                     replication_desired=1,
489                                     dry_run=True,
490                                     resume=False,
491                                     use_cache=False)
492         with self.assertRaises(arv_put.ArvPutUploadIsPending):
493             arv_put.ArvPutUploadJob([self.large_file_name],
494                                     replication_desired=1,
495                                     dry_run=True,
496                                     resume=False)
497         del(self.writer)
498
499 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
500     TEST_SIZE = os.path.getsize(__file__)
501
502     def test_expected_bytes_for_file(self):
503         self.assertEqual(self.TEST_SIZE,
504                           arv_put.expected_bytes_for([__file__]))
505
506     def test_expected_bytes_for_tree(self):
507         tree = self.make_tmpdir()
508         shutil.copyfile(__file__, os.path.join(tree, 'one'))
509         shutil.copyfile(__file__, os.path.join(tree, 'two'))
510         self.assertEqual(self.TEST_SIZE * 2,
511                           arv_put.expected_bytes_for([tree]))
512         self.assertEqual(self.TEST_SIZE * 3,
513                           arv_put.expected_bytes_for([tree, __file__]))
514
515     def test_expected_bytes_for_device(self):
516         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
517         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
518
519
520 class ArvadosPutReportTest(ArvadosBaseTestCase):
521     def test_machine_progress(self):
522         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
523             expect = ": {} written {} total\n".format(
524                 count, -1 if (total is None) else total)
525             self.assertTrue(
526                 arv_put.machine_progress(count, total).endswith(expect))
527
528     def test_known_human_progress(self):
529         for count, total in [(0, 1), (2, 4), (45, 60)]:
530             expect = '{:.1%}'.format(1.0*count/total)
531             actual = arv_put.human_progress(count, total)
532             self.assertTrue(actual.startswith('\r'))
533             self.assertIn(expect, actual)
534
535     def test_unknown_human_progress(self):
536         for count in [1, 20, 300, 4000, 50000]:
537             self.assertTrue(re.search(r'\b{}\b'.format(count),
538                                       arv_put.human_progress(count, None)))
539
540
541 class ArvadosPutTest(run_test_server.TestCaseWithServers,
542                      ArvadosBaseTestCase,
543                      tutil.VersionChecker):
544     MAIN_SERVER = {}
545     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
546
547     def call_main_with_args(self, args):
548         self.main_stdout = tutil.StringIO()
549         self.main_stderr = tutil.StringIO()
550         return arv_put.main(args, self.main_stdout, self.main_stderr)
551
552     def call_main_on_test_file(self, args=[]):
553         with self.make_test_file() as testfile:
554             path = testfile.name
555             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
556         self.assertTrue(
557             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
558                                         '098f6bcd4621d373cade4e832627b4f6')),
559             "did not find file stream in Keep store")
560
561     def setUp(self):
562         super(ArvadosPutTest, self).setUp()
563         run_test_server.authorize_with('active')
564         arv_put.api_client = None
565
566     def tearDown(self):
567         for outbuf in ['main_stdout', 'main_stderr']:
568             if hasattr(self, outbuf):
569                 getattr(self, outbuf).close()
570                 delattr(self, outbuf)
571         super(ArvadosPutTest, self).tearDown()
572
573     def test_version_argument(self):
574         with tutil.redirected_streams(
575                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
576             with self.assertRaises(SystemExit):
577                 self.call_main_with_args(['--version'])
578         self.assertVersionOutput(out, err)
579
580     def test_simple_file_put(self):
581         self.call_main_on_test_file()
582
583     def test_put_with_unwriteable_cache_dir(self):
584         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
585         cachedir = self.make_tmpdir()
586         os.chmod(cachedir, 0o0)
587         arv_put.ResumeCache.CACHE_DIR = cachedir
588         try:
589             self.call_main_on_test_file()
590         finally:
591             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
592             os.chmod(cachedir, 0o700)
593
594     def test_put_with_unwritable_cache_subdir(self):
595         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
596         cachedir = self.make_tmpdir()
597         os.chmod(cachedir, 0o0)
598         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
599         try:
600             self.call_main_on_test_file()
601         finally:
602             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
603             os.chmod(cachedir, 0o700)
604
605     def test_put_block_replication(self):
606         self.call_main_on_test_file()
607         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
608             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
609             self.call_main_on_test_file(['--replication', '1'])
610             self.call_main_on_test_file(['--replication', '4'])
611             self.call_main_on_test_file(['--replication', '5'])
612             self.assertEqual(
613                 [x[-1].get('copies') for x in put_mock.call_args_list],
614                 [1, 4, 5])
615
616     def test_normalize(self):
617         testfile1 = self.make_test_file()
618         testfile2 = self.make_test_file()
619         test_paths = [testfile1.name, testfile2.name]
620         # Reverse-sort the paths, so normalization must change their order.
621         test_paths.sort(reverse=True)
622         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
623                                  test_paths)
624         manifest = self.main_stdout.getvalue()
625         # Assert the second file we specified appears first in the manifest.
626         file_indices = [manifest.find(':' + os.path.basename(path))
627                         for path in test_paths]
628         self.assertGreater(*file_indices)
629
630     def test_error_name_without_collection(self):
631         self.assertRaises(SystemExit, self.call_main_with_args,
632                           ['--name', 'test without Collection',
633                            '--stream', '/dev/null'])
634
635     def test_error_when_project_not_found(self):
636         self.assertRaises(SystemExit,
637                           self.call_main_with_args,
638                           ['--project-uuid', self.Z_UUID])
639
640     def test_error_bad_project_uuid(self):
641         self.assertRaises(SystemExit,
642                           self.call_main_with_args,
643                           ['--project-uuid', self.Z_UUID, '--stream'])
644
645     def test_api_error_handling(self):
646         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
647         coll_save_mock.side_effect = arvados.errors.ApiError(
648             fake_httplib2_response(403), b'{}')
649         with mock.patch('arvados.collection.Collection.save_new',
650                         new=coll_save_mock):
651             with self.assertRaises(SystemExit) as exc_test:
652                 self.call_main_with_args(['/dev/null'])
653             self.assertLess(0, exc_test.exception.args[0])
654             self.assertLess(0, coll_save_mock.call_count)
655             self.assertEqual("", self.main_stdout.getvalue())
656
657
658 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
659                             ArvadosBaseTestCase):
660     def _getKeepServerConfig():
661         for config_file, mandatory in [
662                 ['application.yml', False], ['application.default.yml', True]]:
663             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
664                                 "api", "config", config_file)
665             if not mandatory and not os.path.exists(path):
666                 continue
667             with open(path) as f:
668                 rails_config = yaml.load(f.read())
669                 for config_section in ['test', 'common']:
670                     try:
671                         key = rails_config[config_section]["blob_signing_key"]
672                     except (KeyError, TypeError):
673                         pass
674                     else:
675                         return {'blob_signing_key': key,
676                                 'enforce_permissions': True}
677         return {'blog_signing_key': None, 'enforce_permissions': False}
678
679     MAIN_SERVER = {}
680     KEEP_SERVER = _getKeepServerConfig()
681     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
682
683     @classmethod
684     def setUpClass(cls):
685         super(ArvPutIntegrationTest, cls).setUpClass()
686         cls.ENVIRON = os.environ.copy()
687         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
688
689     def setUp(self):
690         super(ArvPutIntegrationTest, self).setUp()
691         arv_put.api_client = None
692
693     def authorize_with(self, token_name):
694         run_test_server.authorize_with(token_name)
695         for v in ["ARVADOS_API_HOST",
696                   "ARVADOS_API_HOST_INSECURE",
697                   "ARVADOS_API_TOKEN"]:
698             self.ENVIRON[v] = arvados.config.settings()[v]
699         arv_put.api_client = arvados.api('v1')
700
701     def current_user(self):
702         return arv_put.api_client.users().current().execute()
703
704     def test_check_real_project_found(self):
705         self.authorize_with('active')
706         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
707                         "did not correctly find test fixture project")
708
709     def test_check_error_finding_nonexistent_uuid(self):
710         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
711         self.authorize_with('active')
712         try:
713             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
714                                                   0)
715         except ValueError as error:
716             self.assertIn(BAD_UUID, str(error))
717         else:
718             self.assertFalse(result, "incorrectly found nonexistent project")
719
720     def test_check_error_finding_nonexistent_project(self):
721         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
722         self.authorize_with('active')
723         with self.assertRaises(apiclient.errors.HttpError):
724             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
725                                                   0)
726
727     def test_short_put_from_stdin(self):
728         # Have to run this as an integration test since arv-put can't
729         # read from the tests' stdin.
730         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
731         # case, because the /proc entry is already gone by the time it tries.
732         pipe = subprocess.Popen(
733             [sys.executable, arv_put.__file__, '--stream'],
734             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
735             stderr=subprocess.STDOUT, env=self.ENVIRON)
736         pipe.stdin.write(b'stdin test\n')
737         pipe.stdin.close()
738         deadline = time.time() + 5
739         while (pipe.poll() is None) and (time.time() < deadline):
740             time.sleep(.1)
741         returncode = pipe.poll()
742         if returncode is None:
743             pipe.terminate()
744             self.fail("arv-put did not PUT from stdin within 5 seconds")
745         elif returncode != 0:
746             sys.stdout.write(pipe.stdout.read())
747             self.fail("arv-put returned exit code {}".format(returncode))
748         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
749                       pipe.stdout.read().decode())
750
751     def test_ArvPutSignedManifest(self):
752         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
753         # the newly created manifest from the API server, testing to confirm
754         # that the block locators in the returned manifest are signed.
755         self.authorize_with('active')
756
757         # Before doing anything, demonstrate that the collection
758         # we're about to create is not present in our test fixture.
759         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
760         with self.assertRaises(apiclient.errors.HttpError):
761             notfound = arv_put.api_client.collections().get(
762                 uuid=manifest_uuid).execute()
763
764         datadir = self.make_tmpdir()
765         with open(os.path.join(datadir, "foo"), "w") as f:
766             f.write("The quick brown fox jumped over the lazy dog")
767         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
768                              stdout=subprocess.PIPE,
769                              stderr=subprocess.PIPE,
770                              env=self.ENVIRON)
771         (out, err) = p.communicate()
772         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
773         self.assertEqual(p.returncode, 0)
774
775         # The manifest text stored in the API server under the same
776         # manifest UUID must use signed locators.
777         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
778         self.assertRegex(
779             c['manifest_text'],
780             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
781
782         os.remove(os.path.join(datadir, "foo"))
783         os.rmdir(datadir)
784
785     def run_and_find_collection(self, text, extra_args=[]):
786         self.authorize_with('active')
787         pipe = subprocess.Popen(
788             [sys.executable, arv_put.__file__] + extra_args,
789             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
790             stderr=subprocess.PIPE, env=self.ENVIRON)
791         stdout, stderr = pipe.communicate(text.encode())
792         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
793         search_key = ('portable_data_hash'
794                       if '--portable-data-hash' in extra_args else 'uuid')
795         collection_list = arvados.api('v1').collections().list(
796             filters=[[search_key, '=', stdout.decode().strip()]]
797         ).execute().get('items', [])
798         self.assertEqual(1, len(collection_list))
799         return collection_list[0]
800
801     def test_put_collection_with_later_update(self):
802         tmpdir = self.make_tmpdir()
803         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
804             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
805         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
806         self.assertNotEqual(None, col['uuid'])
807         # Add a new file to the directory
808         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
809             f.write('The quick brown fox jumped over the lazy dog')
810         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
811         self.assertEqual(col['uuid'], updated_col['uuid'])
812         # Get the manifest and check that the new file is being included
813         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
814         self.assertRegex(c['manifest_text'], r'^\. .*:44:file2\n')
815
816     def test_put_collection_with_high_redundancy(self):
817         # Write empty data: we're not testing CollectionWriter, just
818         # making sure collections.create tells the API server what our
819         # desired replication level is.
820         collection = self.run_and_find_collection("", ['--replication', '4'])
821         self.assertEqual(4, collection['replication_desired'])
822
823     def test_put_collection_with_default_redundancy(self):
824         collection = self.run_and_find_collection("")
825         self.assertEqual(None, collection['replication_desired'])
826
827     def test_put_collection_with_unnamed_project_link(self):
828         link = self.run_and_find_collection(
829             "Test unnamed collection",
830             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
831         username = pwd.getpwuid(os.getuid()).pw_name
832         self.assertRegex(
833             link['name'],
834             r'^Saved at .* by {}@'.format(re.escape(username)))
835
836     def test_put_collection_with_name_and_no_project(self):
837         link_name = 'Test Collection Link in home project'
838         collection = self.run_and_find_collection(
839             "Test named collection in home project",
840             ['--portable-data-hash', '--name', link_name])
841         self.assertEqual(link_name, collection['name'])
842         my_user_uuid = self.current_user()['uuid']
843         self.assertEqual(my_user_uuid, collection['owner_uuid'])
844
845     def test_put_collection_with_named_project_link(self):
846         link_name = 'Test auto Collection Link'
847         collection = self.run_and_find_collection("Test named collection",
848                                       ['--portable-data-hash',
849                                        '--name', link_name,
850                                        '--project-uuid', self.PROJECT_UUID])
851         self.assertEqual(link_name, collection['name'])
852
853
854 if __name__ == '__main__':
855     unittest.main()