Merge branch '11185-wb-disable-reuse'
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import io
6 import mock
7 import os
8 import pwd
9 import re
10 import shutil
11 import subprocess
12 import sys
13 import tempfile
14 import time
15 import unittest
16 import yaml
17 import threading
18 import hashlib
19 import random
20 import uuid
21
22 from cStringIO import StringIO
23
24 import arvados
25 import arvados.commands.put as arv_put
26 import arvados_testutil as tutil
27
28 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
29 import run_test_server
30
31 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
32     CACHE_ARGSET = [
33         [],
34         ['/dev/null'],
35         ['/dev/null', '--filename', 'empty'],
36         ['/tmp']
37         ]
38
39     def tearDown(self):
40         super(ArvadosPutResumeCacheTest, self).tearDown()
41         try:
42             self.last_cache.destroy()
43         except AttributeError:
44             pass
45
46     def cache_path_from_arglist(self, arglist):
47         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
48
49     def test_cache_names_stable(self):
50         for argset in self.CACHE_ARGSET:
51             self.assertEqual(self.cache_path_from_arglist(argset),
52                               self.cache_path_from_arglist(argset),
53                               "cache name changed for {}".format(argset))
54
55     def test_cache_names_unique(self):
56         results = []
57         for argset in self.CACHE_ARGSET:
58             path = self.cache_path_from_arglist(argset)
59             self.assertNotIn(path, results)
60             results.append(path)
61
62     def test_cache_names_simple(self):
63         # The goal here is to make sure the filename doesn't use characters
64         # reserved by the filesystem.  Feel free to adjust this regexp as
65         # long as it still does that.
66         bad_chars = re.compile(r'[^-\.\w]')
67         for argset in self.CACHE_ARGSET:
68             path = self.cache_path_from_arglist(argset)
69             self.assertFalse(bad_chars.search(os.path.basename(path)),
70                              "path too exotic: {}".format(path))
71
72     def test_cache_names_ignore_argument_order(self):
73         self.assertEqual(
74             self.cache_path_from_arglist(['a', 'b', 'c']),
75             self.cache_path_from_arglist(['c', 'a', 'b']))
76         self.assertEqual(
77             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
78             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
79
80     def test_cache_names_differ_for_similar_paths(self):
81         # This test needs names at / that don't exist on the real filesystem.
82         self.assertNotEqual(
83             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
84             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
85
86     def test_cache_names_ignore_irrelevant_arguments(self):
87         # Workaround: parse_arguments bails on --filename with a directory.
88         path1 = self.cache_path_from_arglist(['/tmp'])
89         args = arv_put.parse_arguments(['/tmp'])
90         args.filename = 'tmp'
91         path2 = arv_put.ResumeCache.make_path(args)
92         self.assertEqual(path1, path2,
93                          "cache path considered --filename for directory")
94         self.assertEqual(
95             self.cache_path_from_arglist(['-']),
96             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
97             "cache path considered --max-manifest-depth for file")
98
99     def test_cache_names_treat_negative_manifest_depths_identically(self):
100         base_args = ['/tmp', '--max-manifest-depth']
101         self.assertEqual(
102             self.cache_path_from_arglist(base_args + ['-1']),
103             self.cache_path_from_arglist(base_args + ['-2']))
104
105     def test_cache_names_treat_stdin_consistently(self):
106         self.assertEqual(
107             self.cache_path_from_arglist(['-', '--filename', 'test']),
108             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
109
110     def test_cache_names_identical_for_synonymous_names(self):
111         self.assertEqual(
112             self.cache_path_from_arglist(['.']),
113             self.cache_path_from_arglist([os.path.realpath('.')]))
114         testdir = self.make_tmpdir()
115         looplink = os.path.join(testdir, 'loop')
116         os.symlink(testdir, looplink)
117         self.assertEqual(
118             self.cache_path_from_arglist([testdir]),
119             self.cache_path_from_arglist([looplink]))
120
121     def test_cache_names_different_by_api_host(self):
122         config = arvados.config.settings()
123         orig_host = config.get('ARVADOS_API_HOST')
124         try:
125             name1 = self.cache_path_from_arglist(['.'])
126             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
127             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
128         finally:
129             if orig_host is None:
130                 del config['ARVADOS_API_HOST']
131             else:
132                 config['ARVADOS_API_HOST'] = orig_host
133
134     @mock.patch('arvados.keep.KeepClient.head')
135     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
136         keep_client_head.side_effect = [True]
137         thing = {}
138         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
139         with tempfile.NamedTemporaryFile() as cachefile:
140             self.last_cache = arv_put.ResumeCache(cachefile.name)
141         self.last_cache.save(thing)
142         self.last_cache.close()
143         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
144         self.assertNotEqual(None, resume_cache)
145
146     @mock.patch('arvados.keep.KeepClient.head')
147     def test_resume_cache_with_finished_streams(self, keep_client_head):
148         keep_client_head.side_effect = [True]
149         thing = {}
150         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
151         with tempfile.NamedTemporaryFile() as cachefile:
152             self.last_cache = arv_put.ResumeCache(cachefile.name)
153         self.last_cache.save(thing)
154         self.last_cache.close()
155         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
156         self.assertNotEqual(None, resume_cache)
157
158     @mock.patch('arvados.keep.KeepClient.head')
159     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
160         keep_client_head.side_effect = Exception('Locator not found')
161         thing = {}
162         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
163         with tempfile.NamedTemporaryFile() as cachefile:
164             self.last_cache = arv_put.ResumeCache(cachefile.name)
165         self.last_cache.save(thing)
166         self.last_cache.close()
167         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
168         self.assertNotEqual(None, resume_cache)
169         self.assertRaises(None, resume_cache.check_cache())
170
171     def test_basic_cache_storage(self):
172         thing = ['test', 'list']
173         with tempfile.NamedTemporaryFile() as cachefile:
174             self.last_cache = arv_put.ResumeCache(cachefile.name)
175         self.last_cache.save(thing)
176         self.assertEqual(thing, self.last_cache.load())
177
178     def test_empty_cache(self):
179         with tempfile.NamedTemporaryFile() as cachefile:
180             cache = arv_put.ResumeCache(cachefile.name)
181         self.assertRaises(ValueError, cache.load)
182
183     def test_cache_persistent(self):
184         thing = ['test', 'list']
185         path = os.path.join(self.make_tmpdir(), 'cache')
186         cache = arv_put.ResumeCache(path)
187         cache.save(thing)
188         cache.close()
189         self.last_cache = arv_put.ResumeCache(path)
190         self.assertEqual(thing, self.last_cache.load())
191
192     def test_multiple_cache_writes(self):
193         thing = ['short', 'list']
194         with tempfile.NamedTemporaryFile() as cachefile:
195             self.last_cache = arv_put.ResumeCache(cachefile.name)
196         # Start writing an object longer than the one we test, to make
197         # sure the cache file gets truncated.
198         self.last_cache.save(['long', 'long', 'list'])
199         self.last_cache.save(thing)
200         self.assertEqual(thing, self.last_cache.load())
201
202     def test_cache_is_locked(self):
203         with tempfile.NamedTemporaryFile() as cachefile:
204             cache = arv_put.ResumeCache(cachefile.name)
205             self.assertRaises(arv_put.ResumeCacheConflict,
206                               arv_put.ResumeCache, cachefile.name)
207
208     def test_cache_stays_locked(self):
209         with tempfile.NamedTemporaryFile() as cachefile:
210             self.last_cache = arv_put.ResumeCache(cachefile.name)
211             path = cachefile.name
212         self.last_cache.save('test')
213         self.assertRaises(arv_put.ResumeCacheConflict,
214                           arv_put.ResumeCache, path)
215
216     def test_destroy_cache(self):
217         cachefile = tempfile.NamedTemporaryFile(delete=False)
218         try:
219             cache = arv_put.ResumeCache(cachefile.name)
220             cache.save('test')
221             cache.destroy()
222             try:
223                 arv_put.ResumeCache(cachefile.name)
224             except arv_put.ResumeCacheConflict:
225                 self.fail("could not load cache after destroying it")
226             self.assertRaises(ValueError, cache.load)
227         finally:
228             if os.path.exists(cachefile.name):
229                 os.unlink(cachefile.name)
230
231     def test_restart_cache(self):
232         path = os.path.join(self.make_tmpdir(), 'cache')
233         cache = arv_put.ResumeCache(path)
234         cache.save('test')
235         cache.restart()
236         self.assertRaises(ValueError, cache.load)
237         self.assertRaises(arv_put.ResumeCacheConflict,
238                           arv_put.ResumeCache, path)
239
240
241 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
242                           ArvadosBaseTestCase):
243
244     def setUp(self):
245         super(ArvPutUploadJobTest, self).setUp()
246         run_test_server.authorize_with('active')
247         # Temp files creation
248         self.tempdir = tempfile.mkdtemp()
249         subdir = os.path.join(self.tempdir, 'subdir')
250         os.mkdir(subdir)
251         data = "x" * 1024 # 1 KB
252         for i in range(1, 5):
253             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
254                 f.write(data * i)
255         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
256             f.write(data * 5)
257         # Large temp file for resume test
258         _, self.large_file_name = tempfile.mkstemp()
259         fileobj = open(self.large_file_name, 'w')
260         # Make sure to write just a little more than one block
261         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
262             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
263             fileobj.write(data)
264         fileobj.close()
265         # Temp dir containing small files to be repacked
266         self.small_files_dir = tempfile.mkdtemp()
267         data = 'y' * 1024 * 1024 # 1 MB
268         for i in range(1, 70):
269             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
270                 f.write(data + str(i))
271         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
272         # Temp dir to hold a symlink to other temp dir
273         self.tempdir_with_symlink = tempfile.mkdtemp()
274         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
275         os.symlink(os.path.join(self.tempdir, '1'),
276                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
277
278     def tearDown(self):
279         super(ArvPutUploadJobTest, self).tearDown()
280         shutil.rmtree(self.tempdir)
281         os.unlink(self.large_file_name)
282         shutil.rmtree(self.small_files_dir)
283         shutil.rmtree(self.tempdir_with_symlink)
284
285     def test_symlinks_are_followed_by_default(self):
286         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
287         cwriter.start(save_collection=False)
288         self.assertIn('linkeddir', cwriter.manifest_text())
289         self.assertIn('linkedfile', cwriter.manifest_text())
290         cwriter.destroy_cache()
291
292     def test_symlinks_are_not_followed_when_requested(self):
293         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
294                                           follow_links=False)
295         cwriter.start(save_collection=False)
296         self.assertNotIn('linkeddir', cwriter.manifest_text())
297         self.assertNotIn('linkedfile', cwriter.manifest_text())
298         cwriter.destroy_cache()
299
300     def test_passing_nonexistant_path_raise_exception(self):
301         uuid_str = str(uuid.uuid4())
302         cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
303         with self.assertRaises(arv_put.PathDoesNotExistError):
304             cwriter.start(save_collection=False)
305
306     def test_writer_works_without_cache(self):
307         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
308         cwriter.start(save_collection=False)
309         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
310
311     def test_writer_works_with_cache(self):
312         with tempfile.NamedTemporaryFile() as f:
313             f.write('foo')
314             f.flush()
315             cwriter = arv_put.ArvPutUploadJob([f.name])
316             cwriter.start(save_collection=False)
317             self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
318             # Don't destroy the cache, and start another upload
319             cwriter_new = arv_put.ArvPutUploadJob([f.name])
320             cwriter_new.start(save_collection=False)
321             cwriter_new.destroy_cache()
322             self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
323
324     def make_progress_tester(self):
325         progression = []
326         def record_func(written, expected):
327             progression.append((written, expected))
328         return progression, record_func
329
330     def test_progress_reporting(self):
331         with tempfile.NamedTemporaryFile() as f:
332             f.write('foo')
333             f.flush()
334             for expect_count in (None, 8):
335                 progression, reporter = self.make_progress_tester()
336                 cwriter = arv_put.ArvPutUploadJob([f.name],
337                     reporter=reporter, bytes_expected=expect_count)
338                 cwriter.start(save_collection=False)
339                 cwriter.destroy_cache()
340                 self.assertIn((3, expect_count), progression)
341
342     def test_writer_upload_directory(self):
343         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
344         cwriter.start(save_collection=False)
345         cwriter.destroy_cache()
346         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
347
348     def test_resume_large_file_upload(self):
349         def wrapped_write(*args, **kwargs):
350             data = args[1]
351             # Exit only on last block
352             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
353                 # Simulate a checkpoint before quitting. Ensure block commit.
354                 self.writer._update(final=True)
355                 raise SystemExit("Simulated error")
356             return self.arvfile_write(*args, **kwargs)
357
358         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
359                         autospec=True) as mocked_write:
360             mocked_write.side_effect = wrapped_write
361             writer = arv_put.ArvPutUploadJob([self.large_file_name],
362                                              replication_desired=1)
363             # We'll be accessing from inside the wrapper
364             self.writer = writer
365             with self.assertRaises(SystemExit):
366                 writer.start(save_collection=False)
367             # Confirm that the file was partially uploaded
368             self.assertGreater(writer.bytes_written, 0)
369             self.assertLess(writer.bytes_written,
370                             os.path.getsize(self.large_file_name))
371         # Retry the upload
372         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
373                                           replication_desired=1)
374         writer2.start(save_collection=False)
375         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
376                          os.path.getsize(self.large_file_name))
377         writer2.destroy_cache()
378         del(self.writer)
379
380     # Test for bug #11002
381     def test_graceful_exit_while_repacking_small_blocks(self):
382         def wrapped_commit(*args, **kwargs):
383             raise SystemExit("Simulated error")
384
385         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
386                         autospec=True) as mocked_commit:
387             mocked_commit.side_effect = wrapped_commit
388             # Upload a little more than 1 block, wrapped_commit will make the first block
389             # commit to fail.
390             # arv-put should not exit with an exception by trying to commit the collection
391             # as it's in an inconsistent state.
392             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
393                                              replication_desired=1)
394             try:
395                 with self.assertRaises(SystemExit):
396                     writer.start(save_collection=False)
397             except arvados.arvfile.UnownedBlockError:
398                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
399         writer.destroy_cache()
400
401     def test_no_resume_when_asked(self):
402         def wrapped_write(*args, **kwargs):
403             data = args[1]
404             # Exit only on last block
405             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
406                 # Simulate a checkpoint before quitting.
407                 self.writer._update()
408                 raise SystemExit("Simulated error")
409             return self.arvfile_write(*args, **kwargs)
410
411         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
412                         autospec=True) as mocked_write:
413             mocked_write.side_effect = wrapped_write
414             writer = arv_put.ArvPutUploadJob([self.large_file_name],
415                                              replication_desired=1)
416             # We'll be accessing from inside the wrapper
417             self.writer = writer
418             with self.assertRaises(SystemExit):
419                 writer.start(save_collection=False)
420             # Confirm that the file was partially uploaded
421             self.assertGreater(writer.bytes_written, 0)
422             self.assertLess(writer.bytes_written,
423                             os.path.getsize(self.large_file_name))
424         # Retry the upload, this time without resume
425         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
426                                           replication_desired=1,
427                                           resume=False)
428         writer2.start(save_collection=False)
429         self.assertEqual(writer2.bytes_skipped, 0)
430         self.assertEqual(writer2.bytes_written,
431                          os.path.getsize(self.large_file_name))
432         writer2.destroy_cache()
433         del(self.writer)
434
435     def test_no_resume_when_no_cache(self):
436         def wrapped_write(*args, **kwargs):
437             data = args[1]
438             # Exit only on last block
439             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
440                 # Simulate a checkpoint before quitting.
441                 self.writer._update()
442                 raise SystemExit("Simulated error")
443             return self.arvfile_write(*args, **kwargs)
444
445         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
446                         autospec=True) as mocked_write:
447             mocked_write.side_effect = wrapped_write
448             writer = arv_put.ArvPutUploadJob([self.large_file_name],
449                                              replication_desired=1)
450             # We'll be accessing from inside the wrapper
451             self.writer = writer
452             with self.assertRaises(SystemExit):
453                 writer.start(save_collection=False)
454             # Confirm that the file was partially uploaded
455             self.assertGreater(writer.bytes_written, 0)
456             self.assertLess(writer.bytes_written,
457                             os.path.getsize(self.large_file_name))
458         # Retry the upload, this time without cache usage
459         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
460                                           replication_desired=1,
461                                           resume=False,
462                                           use_cache=False)
463         writer2.start(save_collection=False)
464         self.assertEqual(writer2.bytes_skipped, 0)
465         self.assertEqual(writer2.bytes_written,
466                          os.path.getsize(self.large_file_name))
467         writer2.destroy_cache()
468         del(self.writer)
469
470     def test_dry_run_feature(self):
471         def wrapped_write(*args, **kwargs):
472             data = args[1]
473             # Exit only on last block
474             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
475                 # Simulate a checkpoint before quitting.
476                 self.writer._update()
477                 raise SystemExit("Simulated error")
478             return self.arvfile_write(*args, **kwargs)
479
480         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
481                         autospec=True) as mocked_write:
482             mocked_write.side_effect = wrapped_write
483             writer = arv_put.ArvPutUploadJob([self.large_file_name],
484                                              replication_desired=1)
485             # We'll be accessing from inside the wrapper
486             self.writer = writer
487             with self.assertRaises(SystemExit):
488                 writer.start(save_collection=False)
489             # Confirm that the file was partially uploaded
490             self.assertGreater(writer.bytes_written, 0)
491             self.assertLess(writer.bytes_written,
492                             os.path.getsize(self.large_file_name))
493         # Retry the upload using dry_run to check if there is a pending upload
494         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
495                                           replication_desired=1,
496                                           dry_run=True)
497         with self.assertRaises(arv_put.ArvPutUploadIsPending):
498             writer2.start(save_collection=False)
499         # Complete the pending upload
500         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
501                                           replication_desired=1)
502         writer3.start(save_collection=False)
503         # Confirm there's no pending upload with dry_run=True
504         writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
505                                           replication_desired=1,
506                                           dry_run=True)
507         with self.assertRaises(arv_put.ArvPutUploadNotPending):
508             writer4.start(save_collection=False)
509         writer4.destroy_cache()
510         # Test obvious cases
511         with self.assertRaises(arv_put.ArvPutUploadIsPending):
512             arv_put.ArvPutUploadJob([self.large_file_name],
513                                     replication_desired=1,
514                                     dry_run=True,
515                                     resume=False,
516                                     use_cache=False)
517         with self.assertRaises(arv_put.ArvPutUploadIsPending):
518             arv_put.ArvPutUploadJob([self.large_file_name],
519                                     replication_desired=1,
520                                     dry_run=True,
521                                     resume=False)
522         del(self.writer)
523
524 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
525     TEST_SIZE = os.path.getsize(__file__)
526
527     def test_expected_bytes_for_file(self):
528         self.assertEqual(self.TEST_SIZE,
529                           arv_put.expected_bytes_for([__file__]))
530
531     def test_expected_bytes_for_tree(self):
532         tree = self.make_tmpdir()
533         shutil.copyfile(__file__, os.path.join(tree, 'one'))
534         shutil.copyfile(__file__, os.path.join(tree, 'two'))
535         self.assertEqual(self.TEST_SIZE * 2,
536                           arv_put.expected_bytes_for([tree]))
537         self.assertEqual(self.TEST_SIZE * 3,
538                           arv_put.expected_bytes_for([tree, __file__]))
539
540     def test_expected_bytes_for_device(self):
541         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
542         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
543
544
545 class ArvadosPutReportTest(ArvadosBaseTestCase):
546     def test_machine_progress(self):
547         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
548             expect = ": {} written {} total\n".format(
549                 count, -1 if (total is None) else total)
550             self.assertTrue(
551                 arv_put.machine_progress(count, total).endswith(expect))
552
553     def test_known_human_progress(self):
554         for count, total in [(0, 1), (2, 4), (45, 60)]:
555             expect = '{:.1%}'.format(float(count) / total)
556             actual = arv_put.human_progress(count, total)
557             self.assertTrue(actual.startswith('\r'))
558             self.assertIn(expect, actual)
559
560     def test_unknown_human_progress(self):
561         for count in [1, 20, 300, 4000, 50000]:
562             self.assertTrue(re.search(r'\b{}\b'.format(count),
563                                       arv_put.human_progress(count, None)))
564
565
566 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
567     MAIN_SERVER = {}
568     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
569
570     def call_main_with_args(self, args):
571         self.main_stdout = StringIO()
572         self.main_stderr = StringIO()
573         return arv_put.main(args, self.main_stdout, self.main_stderr)
574
575     def call_main_on_test_file(self, args=[]):
576         with self.make_test_file() as testfile:
577             path = testfile.name
578             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
579         self.assertTrue(
580             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
581                                         '098f6bcd4621d373cade4e832627b4f6')),
582             "did not find file stream in Keep store")
583
584     def setUp(self):
585         super(ArvadosPutTest, self).setUp()
586         run_test_server.authorize_with('active')
587         arv_put.api_client = None
588
589     def tearDown(self):
590         for outbuf in ['main_stdout', 'main_stderr']:
591             if hasattr(self, outbuf):
592                 getattr(self, outbuf).close()
593                 delattr(self, outbuf)
594         super(ArvadosPutTest, self).tearDown()
595
596     def test_version_argument(self):
597         err = io.BytesIO()
598         out = io.BytesIO()
599         with tutil.redirected_streams(stdout=out, stderr=err):
600             with self.assertRaises(SystemExit):
601                 self.call_main_with_args(['--version'])
602         self.assertEqual(out.getvalue(), '')
603         self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
604
605     def test_simple_file_put(self):
606         self.call_main_on_test_file()
607
608     def test_put_with_unwriteable_cache_dir(self):
609         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
610         cachedir = self.make_tmpdir()
611         os.chmod(cachedir, 0o0)
612         arv_put.ResumeCache.CACHE_DIR = cachedir
613         try:
614             self.call_main_on_test_file()
615         finally:
616             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
617             os.chmod(cachedir, 0o700)
618
619     def test_put_with_unwritable_cache_subdir(self):
620         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
621         cachedir = self.make_tmpdir()
622         os.chmod(cachedir, 0o0)
623         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
624         try:
625             self.call_main_on_test_file()
626         finally:
627             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
628             os.chmod(cachedir, 0o700)
629
630     def test_put_block_replication(self):
631         self.call_main_on_test_file()
632         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
633             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
634             self.call_main_on_test_file(['--replication', '1'])
635             self.call_main_on_test_file(['--replication', '4'])
636             self.call_main_on_test_file(['--replication', '5'])
637             self.assertEqual(
638                 [x[-1].get('copies') for x in put_mock.call_args_list],
639                 [1, 4, 5])
640
641     def test_normalize(self):
642         testfile1 = self.make_test_file()
643         testfile2 = self.make_test_file()
644         test_paths = [testfile1.name, testfile2.name]
645         # Reverse-sort the paths, so normalization must change their order.
646         test_paths.sort(reverse=True)
647         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
648                                  test_paths)
649         manifest = self.main_stdout.getvalue()
650         # Assert the second file we specified appears first in the manifest.
651         file_indices = [manifest.find(':' + os.path.basename(path))
652                         for path in test_paths]
653         self.assertGreater(*file_indices)
654
655     def test_error_name_without_collection(self):
656         self.assertRaises(SystemExit, self.call_main_with_args,
657                           ['--name', 'test without Collection',
658                            '--stream', '/dev/null'])
659
660     def test_error_when_project_not_found(self):
661         self.assertRaises(SystemExit,
662                           self.call_main_with_args,
663                           ['--project-uuid', self.Z_UUID])
664
665     def test_error_bad_project_uuid(self):
666         self.assertRaises(SystemExit,
667                           self.call_main_with_args,
668                           ['--project-uuid', self.Z_UUID, '--stream'])
669
670     def test_api_error_handling(self):
671         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
672         coll_save_mock.side_effect = arvados.errors.ApiError(
673             fake_httplib2_response(403), '{}')
674         with mock.patch('arvados.collection.Collection.save_new',
675                         new=coll_save_mock):
676             with self.assertRaises(SystemExit) as exc_test:
677                 self.call_main_with_args(['/dev/null'])
678             self.assertLess(0, exc_test.exception.args[0])
679             self.assertLess(0, coll_save_mock.call_count)
680             self.assertEqual("", self.main_stdout.getvalue())
681
682
683 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
684                             ArvadosBaseTestCase):
685     def _getKeepServerConfig():
686         for config_file, mandatory in [
687                 ['application.yml', False], ['application.default.yml', True]]:
688             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
689                                 "api", "config", config_file)
690             if not mandatory and not os.path.exists(path):
691                 continue
692             with open(path) as f:
693                 rails_config = yaml.load(f.read())
694                 for config_section in ['test', 'common']:
695                     try:
696                         key = rails_config[config_section]["blob_signing_key"]
697                     except (KeyError, TypeError):
698                         pass
699                     else:
700                         return {'blob_signing_key': key,
701                                 'enforce_permissions': True}
702         return {'blog_signing_key': None, 'enforce_permissions': False}
703
704     MAIN_SERVER = {}
705     KEEP_SERVER = _getKeepServerConfig()
706     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
707
708     @classmethod
709     def setUpClass(cls):
710         super(ArvPutIntegrationTest, cls).setUpClass()
711         cls.ENVIRON = os.environ.copy()
712         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
713
714     def setUp(self):
715         super(ArvPutIntegrationTest, self).setUp()
716         arv_put.api_client = None
717
718     def authorize_with(self, token_name):
719         run_test_server.authorize_with(token_name)
720         for v in ["ARVADOS_API_HOST",
721                   "ARVADOS_API_HOST_INSECURE",
722                   "ARVADOS_API_TOKEN"]:
723             self.ENVIRON[v] = arvados.config.settings()[v]
724         arv_put.api_client = arvados.api('v1')
725
726     def current_user(self):
727         return arv_put.api_client.users().current().execute()
728
729     def test_check_real_project_found(self):
730         self.authorize_with('active')
731         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
732                         "did not correctly find test fixture project")
733
734     def test_check_error_finding_nonexistent_uuid(self):
735         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
736         self.authorize_with('active')
737         try:
738             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
739                                                   0)
740         except ValueError as error:
741             self.assertIn(BAD_UUID, error.message)
742         else:
743             self.assertFalse(result, "incorrectly found nonexistent project")
744
745     def test_check_error_finding_nonexistent_project(self):
746         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
747         self.authorize_with('active')
748         with self.assertRaises(apiclient.errors.HttpError):
749             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
750                                                   0)
751
752     def test_short_put_from_stdin(self):
753         # Have to run this as an integration test since arv-put can't
754         # read from the tests' stdin.
755         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
756         # case, because the /proc entry is already gone by the time it tries.
757         pipe = subprocess.Popen(
758             [sys.executable, arv_put.__file__, '--stream'],
759             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
760             stderr=subprocess.STDOUT, env=self.ENVIRON)
761         pipe.stdin.write('stdin test\n')
762         pipe.stdin.close()
763         deadline = time.time() + 5
764         while (pipe.poll() is None) and (time.time() < deadline):
765             time.sleep(.1)
766         returncode = pipe.poll()
767         if returncode is None:
768             pipe.terminate()
769             self.fail("arv-put did not PUT from stdin within 5 seconds")
770         elif returncode != 0:
771             sys.stdout.write(pipe.stdout.read())
772             self.fail("arv-put returned exit code {}".format(returncode))
773         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
774
775     def test_ArvPutSignedManifest(self):
776         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
777         # the newly created manifest from the API server, testing to confirm
778         # that the block locators in the returned manifest are signed.
779         self.authorize_with('active')
780
781         # Before doing anything, demonstrate that the collection
782         # we're about to create is not present in our test fixture.
783         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
784         with self.assertRaises(apiclient.errors.HttpError):
785             notfound = arv_put.api_client.collections().get(
786                 uuid=manifest_uuid).execute()
787
788         datadir = self.make_tmpdir()
789         with open(os.path.join(datadir, "foo"), "w") as f:
790             f.write("The quick brown fox jumped over the lazy dog")
791         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
792                              stdout=subprocess.PIPE, env=self.ENVIRON)
793         (arvout, arverr) = p.communicate()
794         self.assertEqual(arverr, None)
795         self.assertEqual(p.returncode, 0)
796
797         # The manifest text stored in the API server under the same
798         # manifest UUID must use signed locators.
799         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
800         self.assertRegexpMatches(
801             c['manifest_text'],
802             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
803
804         os.remove(os.path.join(datadir, "foo"))
805         os.rmdir(datadir)
806
807     def run_and_find_collection(self, text, extra_args=[]):
808         self.authorize_with('active')
809         pipe = subprocess.Popen(
810             [sys.executable, arv_put.__file__] + extra_args,
811             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
812             stderr=subprocess.PIPE, env=self.ENVIRON)
813         stdout, stderr = pipe.communicate(text)
814         search_key = ('portable_data_hash'
815                       if '--portable-data-hash' in extra_args else 'uuid')
816         collection_list = arvados.api('v1').collections().list(
817             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
818         self.assertEqual(1, len(collection_list))
819         return collection_list[0]
820
821     def test_put_collection_with_later_update(self):
822         tmpdir = self.make_tmpdir()
823         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
824             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
825         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
826         self.assertNotEqual(None, col['uuid'])
827         # Add a new file to the directory
828         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
829             f.write('The quick brown fox jumped over the lazy dog')
830         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
831         self.assertEqual(col['uuid'], updated_col['uuid'])
832         # Get the manifest and check that the new file is being included
833         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
834         self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
835
836     def test_put_collection_with_high_redundancy(self):
837         # Write empty data: we're not testing CollectionWriter, just
838         # making sure collections.create tells the API server what our
839         # desired replication level is.
840         collection = self.run_and_find_collection("", ['--replication', '4'])
841         self.assertEqual(4, collection['replication_desired'])
842
843     def test_put_collection_with_default_redundancy(self):
844         collection = self.run_and_find_collection("")
845         self.assertEqual(None, collection['replication_desired'])
846
847     def test_put_collection_with_unnamed_project_link(self):
848         link = self.run_and_find_collection(
849             "Test unnamed collection",
850             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
851         username = pwd.getpwuid(os.getuid()).pw_name
852         self.assertRegexpMatches(
853             link['name'],
854             r'^Saved at .* by {}@'.format(re.escape(username)))
855
856     def test_put_collection_with_name_and_no_project(self):
857         link_name = 'Test Collection Link in home project'
858         collection = self.run_and_find_collection(
859             "Test named collection in home project",
860             ['--portable-data-hash', '--name', link_name])
861         self.assertEqual(link_name, collection['name'])
862         my_user_uuid = self.current_user()['uuid']
863         self.assertEqual(my_user_uuid, collection['owner_uuid'])
864
865     def test_put_collection_with_named_project_link(self):
866         link_name = 'Test auto Collection Link'
867         collection = self.run_and_find_collection("Test named collection",
868                                       ['--portable-data-hash',
869                                        '--name', link_name,
870                                        '--project-uuid', self.PROJECT_UUID])
871         self.assertEqual(link_name, collection['name'])
872
873
874 if __name__ == '__main__':
875     unittest.main()