11308: run-tests --skip python2
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from __future__ import absolute_import
5 from __future__ import division
6 from future import standard_library
7 standard_library.install_aliases()
8 from builtins import str
9 from builtins import range
10 import apiclient
11 import io
12 import mock
13 import os
14 import pwd
15 import re
16 import shutil
17 import subprocess
18 import sys
19 import tempfile
20 import time
21 import unittest
22 import yaml
23 import threading
24 import hashlib
25 import random
26
27 if sys.version_info >= (3, 0):
28     from io import StringIO
29 else:
30     from cStringIO import StringIO
31
32 import arvados
33 import arvados.commands.put as arv_put
34 from . import arvados_testutil as tutil
35
36 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
37 from . import run_test_server
38
39 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
40     CACHE_ARGSET = [
41         [],
42         ['/dev/null'],
43         ['/dev/null', '--filename', 'empty'],
44         ['/tmp']
45         ]
46
47     def tearDown(self):
48         super(ArvadosPutResumeCacheTest, self).tearDown()
49         try:
50             self.last_cache.destroy()
51         except AttributeError:
52             pass
53
54     def cache_path_from_arglist(self, arglist):
55         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
56
57     def test_cache_names_stable(self):
58         for argset in self.CACHE_ARGSET:
59             self.assertEqual(self.cache_path_from_arglist(argset),
60                               self.cache_path_from_arglist(argset),
61                               "cache name changed for {}".format(argset))
62
63     def test_cache_names_unique(self):
64         results = []
65         for argset in self.CACHE_ARGSET:
66             path = self.cache_path_from_arglist(argset)
67             self.assertNotIn(path, results)
68             results.append(path)
69
70     def test_cache_names_simple(self):
71         # The goal here is to make sure the filename doesn't use characters
72         # reserved by the filesystem.  Feel free to adjust this regexp as
73         # long as it still does that.
74         bad_chars = re.compile(r'[^-\.\w]')
75         for argset in self.CACHE_ARGSET:
76             path = self.cache_path_from_arglist(argset)
77             self.assertFalse(bad_chars.search(os.path.basename(path)),
78                              "path too exotic: {}".format(path))
79
80     def test_cache_names_ignore_argument_order(self):
81         self.assertEqual(
82             self.cache_path_from_arglist(['a', 'b', 'c']),
83             self.cache_path_from_arglist(['c', 'a', 'b']))
84         self.assertEqual(
85             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
86             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
87
88     def test_cache_names_differ_for_similar_paths(self):
89         # This test needs names at / that don't exist on the real filesystem.
90         self.assertNotEqual(
91             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
92             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
93
94     def test_cache_names_ignore_irrelevant_arguments(self):
95         # Workaround: parse_arguments bails on --filename with a directory.
96         path1 = self.cache_path_from_arglist(['/tmp'])
97         args = arv_put.parse_arguments(['/tmp'])
98         args.filename = 'tmp'
99         path2 = arv_put.ResumeCache.make_path(args)
100         self.assertEqual(path1, path2,
101                          "cache path considered --filename for directory")
102         self.assertEqual(
103             self.cache_path_from_arglist(['-']),
104             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
105             "cache path considered --max-manifest-depth for file")
106
107     def test_cache_names_treat_negative_manifest_depths_identically(self):
108         base_args = ['/tmp', '--max-manifest-depth']
109         self.assertEqual(
110             self.cache_path_from_arglist(base_args + ['-1']),
111             self.cache_path_from_arglist(base_args + ['-2']))
112
113     def test_cache_names_treat_stdin_consistently(self):
114         self.assertEqual(
115             self.cache_path_from_arglist(['-', '--filename', 'test']),
116             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
117
118     def test_cache_names_identical_for_synonymous_names(self):
119         self.assertEqual(
120             self.cache_path_from_arglist(['.']),
121             self.cache_path_from_arglist([os.path.realpath('.')]))
122         testdir = self.make_tmpdir()
123         looplink = os.path.join(testdir, 'loop')
124         os.symlink(testdir, looplink)
125         self.assertEqual(
126             self.cache_path_from_arglist([testdir]),
127             self.cache_path_from_arglist([looplink]))
128
129     def test_cache_names_different_by_api_host(self):
130         config = arvados.config.settings()
131         orig_host = config.get('ARVADOS_API_HOST')
132         try:
133             name1 = self.cache_path_from_arglist(['.'])
134             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
135             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
136         finally:
137             if orig_host is None:
138                 del config['ARVADOS_API_HOST']
139             else:
140                 config['ARVADOS_API_HOST'] = orig_host
141
142     @mock.patch('arvados.keep.KeepClient.head')
143     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
144         keep_client_head.side_effect = [True]
145         thing = {}
146         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
147         with tempfile.NamedTemporaryFile() as cachefile:
148             self.last_cache = arv_put.ResumeCache(cachefile.name)
149         self.last_cache.save(thing)
150         self.last_cache.close()
151         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
152         self.assertNotEqual(None, resume_cache)
153
154     @mock.patch('arvados.keep.KeepClient.head')
155     def test_resume_cache_with_finished_streams(self, keep_client_head):
156         keep_client_head.side_effect = [True]
157         thing = {}
158         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
159         with tempfile.NamedTemporaryFile() as cachefile:
160             self.last_cache = arv_put.ResumeCache(cachefile.name)
161         self.last_cache.save(thing)
162         self.last_cache.close()
163         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
164         self.assertNotEqual(None, resume_cache)
165
166     @mock.patch('arvados.keep.KeepClient.head')
167     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
168         keep_client_head.side_effect = Exception('Locator not found')
169         thing = {}
170         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
171         with tempfile.NamedTemporaryFile() as cachefile:
172             self.last_cache = arv_put.ResumeCache(cachefile.name)
173         self.last_cache.save(thing)
174         self.last_cache.close()
175         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
176         self.assertNotEqual(None, resume_cache)
177         self.assertRaises(None, resume_cache.check_cache())
178
179     def test_basic_cache_storage(self):
180         thing = ['test', 'list']
181         with tempfile.NamedTemporaryFile() as cachefile:
182             self.last_cache = arv_put.ResumeCache(cachefile.name)
183         self.last_cache.save(thing)
184         self.assertEqual(thing, self.last_cache.load())
185
186     def test_empty_cache(self):
187         with tempfile.NamedTemporaryFile() as cachefile:
188             cache = arv_put.ResumeCache(cachefile.name)
189         self.assertRaises(ValueError, cache.load)
190
191     def test_cache_persistent(self):
192         thing = ['test', 'list']
193         path = os.path.join(self.make_tmpdir(), 'cache')
194         cache = arv_put.ResumeCache(path)
195         cache.save(thing)
196         cache.close()
197         self.last_cache = arv_put.ResumeCache(path)
198         self.assertEqual(thing, self.last_cache.load())
199
200     def test_multiple_cache_writes(self):
201         thing = ['short', 'list']
202         with tempfile.NamedTemporaryFile() as cachefile:
203             self.last_cache = arv_put.ResumeCache(cachefile.name)
204         # Start writing an object longer than the one we test, to make
205         # sure the cache file gets truncated.
206         self.last_cache.save(['long', 'long', 'list'])
207         self.last_cache.save(thing)
208         self.assertEqual(thing, self.last_cache.load())
209
210     def test_cache_is_locked(self):
211         with tempfile.NamedTemporaryFile() as cachefile:
212             cache = arv_put.ResumeCache(cachefile.name)
213             self.assertRaises(arv_put.ResumeCacheConflict,
214                               arv_put.ResumeCache, cachefile.name)
215
216     def test_cache_stays_locked(self):
217         with tempfile.NamedTemporaryFile() as cachefile:
218             self.last_cache = arv_put.ResumeCache(cachefile.name)
219             path = cachefile.name
220         self.last_cache.save('test')
221         self.assertRaises(arv_put.ResumeCacheConflict,
222                           arv_put.ResumeCache, path)
223
224     def test_destroy_cache(self):
225         cachefile = tempfile.NamedTemporaryFile(delete=False)
226         try:
227             cache = arv_put.ResumeCache(cachefile.name)
228             cache.save('test')
229             cache.destroy()
230             try:
231                 arv_put.ResumeCache(cachefile.name)
232             except arv_put.ResumeCacheConflict:
233                 self.fail("could not load cache after destroying it")
234             self.assertRaises(ValueError, cache.load)
235         finally:
236             if os.path.exists(cachefile.name):
237                 os.unlink(cachefile.name)
238
239     def test_restart_cache(self):
240         path = os.path.join(self.make_tmpdir(), 'cache')
241         cache = arv_put.ResumeCache(path)
242         cache.save('test')
243         cache.restart()
244         self.assertRaises(ValueError, cache.load)
245         self.assertRaises(arv_put.ResumeCacheConflict,
246                           arv_put.ResumeCache, path)
247
248
249 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
250                           ArvadosBaseTestCase):
251
252     def setUp(self):
253         super(ArvPutUploadJobTest, self).setUp()
254         run_test_server.authorize_with('active')
255         # Temp files creation
256         self.tempdir = tempfile.mkdtemp()
257         subdir = os.path.join(self.tempdir, 'subdir')
258         os.mkdir(subdir)
259         data = "x" * 1024 # 1 KB
260         for i in range(1, 5):
261             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
262                 f.write(data * i)
263         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
264             f.write(data * 5)
265         # Large temp file for resume test
266         _, self.large_file_name = tempfile.mkstemp()
267         fileobj = open(self.large_file_name, 'w')
268         # Make sure to write just a little more than one block
269         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
270             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
271             fileobj.write(data)
272         fileobj.close()
273         # Temp dir containing small files to be repacked
274         self.small_files_dir = tempfile.mkdtemp()
275         data = 'y' * 1024 * 1024 # 1 MB
276         for i in range(1, 70):
277             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
278                 f.write(data + str(i))
279         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
280
281     def tearDown(self):
282         super(ArvPutUploadJobTest, self).tearDown()
283         shutil.rmtree(self.tempdir)
284         os.unlink(self.large_file_name)
285         shutil.rmtree(self.small_files_dir)
286
287     def test_writer_works_without_cache(self):
288         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
289         cwriter.start(save_collection=False)
290         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
291
292     def test_writer_works_with_cache(self):
293         with tempfile.NamedTemporaryFile() as f:
294             f.write('foo')
295             f.flush()
296             cwriter = arv_put.ArvPutUploadJob([f.name])
297             cwriter.start(save_collection=False)
298             self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
299             # Don't destroy the cache, and start another upload
300             cwriter_new = arv_put.ArvPutUploadJob([f.name])
301             cwriter_new.start(save_collection=False)
302             cwriter_new.destroy_cache()
303             self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
304
305     def make_progress_tester(self):
306         progression = []
307         def record_func(written, expected):
308             progression.append((written, expected))
309         return progression, record_func
310
311     def test_progress_reporting(self):
312         with tempfile.NamedTemporaryFile() as f:
313             f.write('foo')
314             f.flush()
315             for expect_count in (None, 8):
316                 progression, reporter = self.make_progress_tester()
317                 cwriter = arv_put.ArvPutUploadJob([f.name],
318                     reporter=reporter, bytes_expected=expect_count)
319                 cwriter.start(save_collection=False)
320                 cwriter.destroy_cache()
321                 self.assertIn((3, expect_count), progression)
322
323     def test_writer_upload_directory(self):
324         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
325         cwriter.start(save_collection=False)
326         cwriter.destroy_cache()
327         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
328
329     def test_resume_large_file_upload(self):
330         def wrapped_write(*args, **kwargs):
331             data = args[1]
332             # Exit only on last block
333             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
334                 # Simulate a checkpoint before quitting. Ensure block commit.
335                 self.writer._update(final=True)
336                 raise SystemExit("Simulated error")
337             return self.arvfile_write(*args, **kwargs)
338
339         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
340                         autospec=True) as mocked_write:
341             mocked_write.side_effect = wrapped_write
342             writer = arv_put.ArvPutUploadJob([self.large_file_name],
343                                              replication_desired=1)
344             # We'll be accessing from inside the wrapper
345             self.writer = writer
346             with self.assertRaises(SystemExit):
347                 writer.start(save_collection=False)
348             # Confirm that the file was partially uploaded
349             self.assertGreater(writer.bytes_written, 0)
350             self.assertLess(writer.bytes_written,
351                             os.path.getsize(self.large_file_name))
352         # Retry the upload
353         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
354                                           replication_desired=1)
355         writer2.start(save_collection=False)
356         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
357                          os.path.getsize(self.large_file_name))
358         writer2.destroy_cache()
359         del(self.writer)
360
361     # Test for bug #11002
362     def test_graceful_exit_while_repacking_small_blocks(self):
363         def wrapped_commit(*args, **kwargs):
364             raise SystemExit("Simulated error")
365
366         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
367                         autospec=True) as mocked_commit:
368             mocked_commit.side_effect = wrapped_commit
369             # Upload a little more than 1 block, wrapped_commit will make the first block
370             # commit to fail.
371             # arv-put should not exit with an exception by trying to commit the collection
372             # as it's in an inconsistent state.
373             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
374                                              replication_desired=1)
375             try:
376                 with self.assertRaises(SystemExit):
377                     writer.start(save_collection=False)
378             except arvados.arvfile.UnownedBlockError:
379                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
380         writer.destroy_cache()
381
382     def test_no_resume_when_asked(self):
383         def wrapped_write(*args, **kwargs):
384             data = args[1]
385             # Exit only on last block
386             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
387                 # Simulate a checkpoint before quitting.
388                 self.writer._update()
389                 raise SystemExit("Simulated error")
390             return self.arvfile_write(*args, **kwargs)
391
392         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
393                         autospec=True) as mocked_write:
394             mocked_write.side_effect = wrapped_write
395             writer = arv_put.ArvPutUploadJob([self.large_file_name],
396                                              replication_desired=1)
397             # We'll be accessing from inside the wrapper
398             self.writer = writer
399             with self.assertRaises(SystemExit):
400                 writer.start(save_collection=False)
401             # Confirm that the file was partially uploaded
402             self.assertGreater(writer.bytes_written, 0)
403             self.assertLess(writer.bytes_written,
404                             os.path.getsize(self.large_file_name))
405         # Retry the upload, this time without resume
406         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
407                                           replication_desired=1,
408                                           resume=False)
409         writer2.start(save_collection=False)
410         self.assertEqual(writer2.bytes_skipped, 0)
411         self.assertEqual(writer2.bytes_written,
412                          os.path.getsize(self.large_file_name))
413         writer2.destroy_cache()
414         del(self.writer)
415
416     def test_no_resume_when_no_cache(self):
417         def wrapped_write(*args, **kwargs):
418             data = args[1]
419             # Exit only on last block
420             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
421                 # Simulate a checkpoint before quitting.
422                 self.writer._update()
423                 raise SystemExit("Simulated error")
424             return self.arvfile_write(*args, **kwargs)
425
426         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
427                         autospec=True) as mocked_write:
428             mocked_write.side_effect = wrapped_write
429             writer = arv_put.ArvPutUploadJob([self.large_file_name],
430                                              replication_desired=1)
431             # We'll be accessing from inside the wrapper
432             self.writer = writer
433             with self.assertRaises(SystemExit):
434                 writer.start(save_collection=False)
435             # Confirm that the file was partially uploaded
436             self.assertGreater(writer.bytes_written, 0)
437             self.assertLess(writer.bytes_written,
438                             os.path.getsize(self.large_file_name))
439         # Retry the upload, this time without cache usage
440         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
441                                           replication_desired=1,
442                                           resume=False,
443                                           use_cache=False)
444         writer2.start(save_collection=False)
445         self.assertEqual(writer2.bytes_skipped, 0)
446         self.assertEqual(writer2.bytes_written,
447                          os.path.getsize(self.large_file_name))
448         writer2.destroy_cache()
449         del(self.writer)
450
451     def test_dry_run_feature(self):
452         def wrapped_write(*args, **kwargs):
453             data = args[1]
454             # Exit only on last block
455             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
456                 # Simulate a checkpoint before quitting.
457                 self.writer._update()
458                 raise SystemExit("Simulated error")
459             return self.arvfile_write(*args, **kwargs)
460
461         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
462                         autospec=True) as mocked_write:
463             mocked_write.side_effect = wrapped_write
464             writer = arv_put.ArvPutUploadJob([self.large_file_name],
465                                              replication_desired=1)
466             # We'll be accessing from inside the wrapper
467             self.writer = writer
468             with self.assertRaises(SystemExit):
469                 writer.start(save_collection=False)
470             # Confirm that the file was partially uploaded
471             self.assertGreater(writer.bytes_written, 0)
472             self.assertLess(writer.bytes_written,
473                             os.path.getsize(self.large_file_name))
474         # Retry the upload using dry_run to check if there is a pending upload
475         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
476                                           replication_desired=1,
477                                           dry_run=True)
478         with self.assertRaises(arv_put.ArvPutUploadIsPending):
479             writer2.start(save_collection=False)
480         # Complete the pending upload
481         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
482                                           replication_desired=1)
483         writer3.start(save_collection=False)
484         # Confirm there's no pending upload with dry_run=True
485         writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
486                                           replication_desired=1,
487                                           dry_run=True)
488         with self.assertRaises(arv_put.ArvPutUploadNotPending):
489             writer4.start(save_collection=False)
490         writer4.destroy_cache()
491         # Test obvious cases
492         with self.assertRaises(arv_put.ArvPutUploadIsPending):
493             arv_put.ArvPutUploadJob([self.large_file_name],
494                                     replication_desired=1,
495                                     dry_run=True,
496                                     resume=False,
497                                     use_cache=False)
498         with self.assertRaises(arv_put.ArvPutUploadIsPending):
499             arv_put.ArvPutUploadJob([self.large_file_name],
500                                     replication_desired=1,
501                                     dry_run=True,
502                                     resume=False)
503         del(self.writer)
504
505 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
506     TEST_SIZE = os.path.getsize(__file__)
507
508     def test_expected_bytes_for_file(self):
509         self.assertEqual(self.TEST_SIZE,
510                           arv_put.expected_bytes_for([__file__]))
511
512     def test_expected_bytes_for_tree(self):
513         tree = self.make_tmpdir()
514         shutil.copyfile(__file__, os.path.join(tree, 'one'))
515         shutil.copyfile(__file__, os.path.join(tree, 'two'))
516         self.assertEqual(self.TEST_SIZE * 2,
517                           arv_put.expected_bytes_for([tree]))
518         self.assertEqual(self.TEST_SIZE * 3,
519                           arv_put.expected_bytes_for([tree, __file__]))
520
521     def test_expected_bytes_for_device(self):
522         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
523         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
524
525
526 class ArvadosPutReportTest(ArvadosBaseTestCase):
527     def test_machine_progress(self):
528         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
529             expect = ": {} written {} total\n".format(
530                 count, -1 if (total is None) else total)
531             self.assertTrue(
532                 arv_put.machine_progress(count, total).endswith(expect))
533
534     def test_known_human_progress(self):
535         for count, total in [(0, 1), (2, 4), (45, 60)]:
536             expect = '{:.1%}'.format(1.0*count/total)
537             actual = arv_put.human_progress(count, total)
538             self.assertTrue(actual.startswith('\r'))
539             self.assertIn(expect, actual)
540
541     def test_unknown_human_progress(self):
542         for count in [1, 20, 300, 4000, 50000]:
543             self.assertTrue(re.search(r'\b{}\b'.format(count),
544                                       arv_put.human_progress(count, None)))
545
546
547 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
548     MAIN_SERVER = {}
549     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
550
551     def call_main_with_args(self, args):
552         self.main_stdout = StringIO()
553         self.main_stderr = StringIO()
554         return arv_put.main(args, self.main_stdout, self.main_stderr)
555
556     def call_main_on_test_file(self, args=[]):
557         with self.make_test_file() as testfile:
558             path = testfile.name
559             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
560         self.assertTrue(
561             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
562                                         '098f6bcd4621d373cade4e832627b4f6')),
563             "did not find file stream in Keep store")
564
565     def setUp(self):
566         super(ArvadosPutTest, self).setUp()
567         run_test_server.authorize_with('active')
568         arv_put.api_client = None
569
570     def tearDown(self):
571         for outbuf in ['main_stdout', 'main_stderr']:
572             if hasattr(self, outbuf):
573                 getattr(self, outbuf).close()
574                 delattr(self, outbuf)
575         super(ArvadosPutTest, self).tearDown()
576
577     def test_version_argument(self):
578         err = io.BytesIO()
579         out = io.BytesIO()
580         with tutil.redirected_streams(stdout=out, stderr=err):
581             with self.assertRaises(SystemExit):
582                 self.call_main_with_args(['--version'])
583         self.assertEqual(out.getvalue(), '')
584         self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
585
586     def test_simple_file_put(self):
587         self.call_main_on_test_file()
588
589     def test_put_with_unwriteable_cache_dir(self):
590         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
591         cachedir = self.make_tmpdir()
592         os.chmod(cachedir, 0o0)
593         arv_put.ResumeCache.CACHE_DIR = cachedir
594         try:
595             self.call_main_on_test_file()
596         finally:
597             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
598             os.chmod(cachedir, 0o700)
599
600     def test_put_with_unwritable_cache_subdir(self):
601         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
602         cachedir = self.make_tmpdir()
603         os.chmod(cachedir, 0o0)
604         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
605         try:
606             self.call_main_on_test_file()
607         finally:
608             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
609             os.chmod(cachedir, 0o700)
610
611     def test_put_block_replication(self):
612         self.call_main_on_test_file()
613         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
614             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
615             self.call_main_on_test_file(['--replication', '1'])
616             self.call_main_on_test_file(['--replication', '4'])
617             self.call_main_on_test_file(['--replication', '5'])
618             self.assertEqual(
619                 [x[-1].get('copies') for x in put_mock.call_args_list],
620                 [1, 4, 5])
621
622     def test_normalize(self):
623         testfile1 = self.make_test_file()
624         testfile2 = self.make_test_file()
625         test_paths = [testfile1.name, testfile2.name]
626         # Reverse-sort the paths, so normalization must change their order.
627         test_paths.sort(reverse=True)
628         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
629                                  test_paths)
630         manifest = self.main_stdout.getvalue()
631         # Assert the second file we specified appears first in the manifest.
632         file_indices = [manifest.find(':' + os.path.basename(path))
633                         for path in test_paths]
634         self.assertGreater(*file_indices)
635
636     def test_error_name_without_collection(self):
637         self.assertRaises(SystemExit, self.call_main_with_args,
638                           ['--name', 'test without Collection',
639                            '--stream', '/dev/null'])
640
641     def test_error_when_project_not_found(self):
642         self.assertRaises(SystemExit,
643                           self.call_main_with_args,
644                           ['--project-uuid', self.Z_UUID])
645
646     def test_error_bad_project_uuid(self):
647         self.assertRaises(SystemExit,
648                           self.call_main_with_args,
649                           ['--project-uuid', self.Z_UUID, '--stream'])
650
651     def test_api_error_handling(self):
652         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
653         coll_save_mock.side_effect = arvados.errors.ApiError(
654             fake_httplib2_response(403), '{}')
655         with mock.patch('arvados.collection.Collection.save_new',
656                         new=coll_save_mock):
657             with self.assertRaises(SystemExit) as exc_test:
658                 self.call_main_with_args(['/dev/null'])
659             self.assertLess(0, exc_test.exception.args[0])
660             self.assertLess(0, coll_save_mock.call_count)
661             self.assertEqual("", self.main_stdout.getvalue())
662
663
664 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
665                             ArvadosBaseTestCase):
666     def _getKeepServerConfig():
667         for config_file, mandatory in [
668                 ['application.yml', False], ['application.default.yml', True]]:
669             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
670                                 "api", "config", config_file)
671             if not mandatory and not os.path.exists(path):
672                 continue
673             with open(path) as f:
674                 rails_config = yaml.load(f.read())
675                 for config_section in ['test', 'common']:
676                     try:
677                         key = rails_config[config_section]["blob_signing_key"]
678                     except (KeyError, TypeError):
679                         pass
680                     else:
681                         return {'blob_signing_key': key,
682                                 'enforce_permissions': True}
683         return {'blog_signing_key': None, 'enforce_permissions': False}
684
685     MAIN_SERVER = {}
686     KEEP_SERVER = _getKeepServerConfig()
687     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
688
689     @classmethod
690     def setUpClass(cls):
691         super(ArvPutIntegrationTest, cls).setUpClass()
692         cls.ENVIRON = os.environ.copy()
693         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
694
695     def setUp(self):
696         super(ArvPutIntegrationTest, self).setUp()
697         arv_put.api_client = None
698
699     def authorize_with(self, token_name):
700         run_test_server.authorize_with(token_name)
701         for v in ["ARVADOS_API_HOST",
702                   "ARVADOS_API_HOST_INSECURE",
703                   "ARVADOS_API_TOKEN"]:
704             self.ENVIRON[v] = arvados.config.settings()[v]
705         arv_put.api_client = arvados.api('v1')
706
707     def current_user(self):
708         return arv_put.api_client.users().current().execute()
709
710     def test_check_real_project_found(self):
711         self.authorize_with('active')
712         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
713                         "did not correctly find test fixture project")
714
715     def test_check_error_finding_nonexistent_uuid(self):
716         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
717         self.authorize_with('active')
718         try:
719             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
720                                                   0)
721         except ValueError as error:
722             self.assertIn(BAD_UUID, error.message)
723         else:
724             self.assertFalse(result, "incorrectly found nonexistent project")
725
726     def test_check_error_finding_nonexistent_project(self):
727         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
728         self.authorize_with('active')
729         with self.assertRaises(apiclient.errors.HttpError):
730             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
731                                                   0)
732
733     def test_short_put_from_stdin(self):
734         # Have to run this as an integration test since arv-put can't
735         # read from the tests' stdin.
736         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
737         # case, because the /proc entry is already gone by the time it tries.
738         pipe = subprocess.Popen(
739             [sys.executable, arv_put.__file__, '--stream'],
740             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
741             stderr=subprocess.STDOUT, env=self.ENVIRON)
742         pipe.stdin.write('stdin test\n')
743         pipe.stdin.close()
744         deadline = time.time() + 5
745         while (pipe.poll() is None) and (time.time() < deadline):
746             time.sleep(.1)
747         returncode = pipe.poll()
748         if returncode is None:
749             pipe.terminate()
750             self.fail("arv-put did not PUT from stdin within 5 seconds")
751         elif returncode != 0:
752             sys.stdout.write(pipe.stdout.read())
753             self.fail("arv-put returned exit code {}".format(returncode))
754         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
755
756     def test_ArvPutSignedManifest(self):
757         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
758         # the newly created manifest from the API server, testing to confirm
759         # that the block locators in the returned manifest are signed.
760         self.authorize_with('active')
761
762         # Before doing anything, demonstrate that the collection
763         # we're about to create is not present in our test fixture.
764         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
765         with self.assertRaises(apiclient.errors.HttpError):
766             notfound = arv_put.api_client.collections().get(
767                 uuid=manifest_uuid).execute()
768
769         datadir = self.make_tmpdir()
770         with open(os.path.join(datadir, "foo"), "w") as f:
771             f.write("The quick brown fox jumped over the lazy dog")
772         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
773                              stdout=subprocess.PIPE, env=self.ENVIRON)
774         (arvout, arverr) = p.communicate()
775         self.assertEqual(arverr, None)
776         self.assertEqual(p.returncode, 0)
777
778         # The manifest text stored in the API server under the same
779         # manifest UUID must use signed locators.
780         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
781         self.assertRegexpMatches(
782             c['manifest_text'],
783             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
784
785         os.remove(os.path.join(datadir, "foo"))
786         os.rmdir(datadir)
787
788     def run_and_find_collection(self, text, extra_args=[]):
789         self.authorize_with('active')
790         pipe = subprocess.Popen(
791             [sys.executable, arv_put.__file__] + extra_args,
792             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
793             stderr=subprocess.PIPE, env=self.ENVIRON)
794         stdout, stderr = pipe.communicate(text)
795         search_key = ('portable_data_hash'
796                       if '--portable-data-hash' in extra_args else 'uuid')
797         collection_list = arvados.api('v1').collections().list(
798             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
799         self.assertEqual(1, len(collection_list))
800         return collection_list[0]
801
802     def test_put_collection_with_later_update(self):
803         tmpdir = self.make_tmpdir()
804         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
805             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
806         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
807         self.assertNotEqual(None, col['uuid'])
808         # Add a new file to the directory
809         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
810             f.write('The quick brown fox jumped over the lazy dog')
811         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
812         self.assertEqual(col['uuid'], updated_col['uuid'])
813         # Get the manifest and check that the new file is being included
814         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
815         self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
816
817     def test_put_collection_with_high_redundancy(self):
818         # Write empty data: we're not testing CollectionWriter, just
819         # making sure collections.create tells the API server what our
820         # desired replication level is.
821         collection = self.run_and_find_collection("", ['--replication', '4'])
822         self.assertEqual(4, collection['replication_desired'])
823
824     def test_put_collection_with_default_redundancy(self):
825         collection = self.run_and_find_collection("")
826         self.assertEqual(None, collection['replication_desired'])
827
828     def test_put_collection_with_unnamed_project_link(self):
829         link = self.run_and_find_collection(
830             "Test unnamed collection",
831             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
832         username = pwd.getpwuid(os.getuid()).pw_name
833         self.assertRegexpMatches(
834             link['name'],
835             r'^Saved at .* by {}@'.format(re.escape(username)))
836
837     def test_put_collection_with_name_and_no_project(self):
838         link_name = 'Test Collection Link in home project'
839         collection = self.run_and_find_collection(
840             "Test named collection in home project",
841             ['--portable-data-hash', '--name', link_name])
842         self.assertEqual(link_name, collection['name'])
843         my_user_uuid = self.current_user()['uuid']
844         self.assertEqual(my_user_uuid, collection['owner_uuid'])
845
846     def test_put_collection_with_named_project_link(self):
847         link_name = 'Test auto Collection Link'
848         collection = self.run_and_find_collection("Test named collection",
849                                       ['--portable-data-hash',
850                                        '--name', link_name,
851                                        '--project-uuid', self.PROJECT_UUID])
852         self.assertEqual(link_name, collection['name'])
853
854
855 if __name__ == '__main__':
856     unittest.main()