8937: Merge branch 'master' into 8937-arvput-cached-tokens
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 import apiclient
12 import hashlib
13 import json
14 import mock
15 import os
16 import pwd
17 import random
18 import re
19 import shutil
20 import subprocess
21 import sys
22 import tempfile
23 import threading
24 import time
25 import unittest
26 import uuid
27 import yaml
28
29 import arvados
30 import arvados.commands.put as arv_put
31 from . import arvados_testutil as tutil
32
33 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
34 from . import run_test_server
35
36 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
37     CACHE_ARGSET = [
38         [],
39         ['/dev/null'],
40         ['/dev/null', '--filename', 'empty'],
41         ['/tmp']
42         ]
43
44     def tearDown(self):
45         super(ArvadosPutResumeCacheTest, self).tearDown()
46         try:
47             self.last_cache.destroy()
48         except AttributeError:
49             pass
50
51     def cache_path_from_arglist(self, arglist):
52         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
53
54     def test_cache_names_stable(self):
55         for argset in self.CACHE_ARGSET:
56             self.assertEqual(self.cache_path_from_arglist(argset),
57                               self.cache_path_from_arglist(argset),
58                               "cache name changed for {}".format(argset))
59
60     def test_cache_names_unique(self):
61         results = []
62         for argset in self.CACHE_ARGSET:
63             path = self.cache_path_from_arglist(argset)
64             self.assertNotIn(path, results)
65             results.append(path)
66
67     def test_cache_names_simple(self):
68         # The goal here is to make sure the filename doesn't use characters
69         # reserved by the filesystem.  Feel free to adjust this regexp as
70         # long as it still does that.
71         bad_chars = re.compile(r'[^-\.\w]')
72         for argset in self.CACHE_ARGSET:
73             path = self.cache_path_from_arglist(argset)
74             self.assertFalse(bad_chars.search(os.path.basename(path)),
75                              "path too exotic: {}".format(path))
76
77     def test_cache_names_ignore_argument_order(self):
78         self.assertEqual(
79             self.cache_path_from_arglist(['a', 'b', 'c']),
80             self.cache_path_from_arglist(['c', 'a', 'b']))
81         self.assertEqual(
82             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
83             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
84
85     def test_cache_names_differ_for_similar_paths(self):
86         # This test needs names at / that don't exist on the real filesystem.
87         self.assertNotEqual(
88             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
89             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
90
91     def test_cache_names_ignore_irrelevant_arguments(self):
92         # Workaround: parse_arguments bails on --filename with a directory.
93         path1 = self.cache_path_from_arglist(['/tmp'])
94         args = arv_put.parse_arguments(['/tmp'])
95         args.filename = 'tmp'
96         path2 = arv_put.ResumeCache.make_path(args)
97         self.assertEqual(path1, path2,
98                          "cache path considered --filename for directory")
99         self.assertEqual(
100             self.cache_path_from_arglist(['-']),
101             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
102             "cache path considered --max-manifest-depth for file")
103
104     def test_cache_names_treat_negative_manifest_depths_identically(self):
105         base_args = ['/tmp', '--max-manifest-depth']
106         self.assertEqual(
107             self.cache_path_from_arglist(base_args + ['-1']),
108             self.cache_path_from_arglist(base_args + ['-2']))
109
110     def test_cache_names_treat_stdin_consistently(self):
111         self.assertEqual(
112             self.cache_path_from_arglist(['-', '--filename', 'test']),
113             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
114
115     def test_cache_names_identical_for_synonymous_names(self):
116         self.assertEqual(
117             self.cache_path_from_arglist(['.']),
118             self.cache_path_from_arglist([os.path.realpath('.')]))
119         testdir = self.make_tmpdir()
120         looplink = os.path.join(testdir, 'loop')
121         os.symlink(testdir, looplink)
122         self.assertEqual(
123             self.cache_path_from_arglist([testdir]),
124             self.cache_path_from_arglist([looplink]))
125
126     def test_cache_names_different_by_api_host(self):
127         config = arvados.config.settings()
128         orig_host = config.get('ARVADOS_API_HOST')
129         try:
130             name1 = self.cache_path_from_arglist(['.'])
131             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
132             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
133         finally:
134             if orig_host is None:
135                 del config['ARVADOS_API_HOST']
136             else:
137                 config['ARVADOS_API_HOST'] = orig_host
138
139     @mock.patch('arvados.keep.KeepClient.head')
140     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
141         keep_client_head.side_effect = [True]
142         thing = {}
143         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
144         with tempfile.NamedTemporaryFile() as cachefile:
145             self.last_cache = arv_put.ResumeCache(cachefile.name)
146         self.last_cache.save(thing)
147         self.last_cache.close()
148         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
149         self.assertNotEqual(None, resume_cache)
150
151     @mock.patch('arvados.keep.KeepClient.head')
152     def test_resume_cache_with_finished_streams(self, keep_client_head):
153         keep_client_head.side_effect = [True]
154         thing = {}
155         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
156         with tempfile.NamedTemporaryFile() as cachefile:
157             self.last_cache = arv_put.ResumeCache(cachefile.name)
158         self.last_cache.save(thing)
159         self.last_cache.close()
160         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
161         self.assertNotEqual(None, resume_cache)
162
163     @mock.patch('arvados.keep.KeepClient.head')
164     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
165         keep_client_head.side_effect = Exception('Locator not found')
166         thing = {}
167         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
168         with tempfile.NamedTemporaryFile() as cachefile:
169             self.last_cache = arv_put.ResumeCache(cachefile.name)
170         self.last_cache.save(thing)
171         self.last_cache.close()
172         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
173         self.assertNotEqual(None, resume_cache)
174         resume_cache.check_cache()
175
176     def test_basic_cache_storage(self):
177         thing = ['test', 'list']
178         with tempfile.NamedTemporaryFile() as cachefile:
179             self.last_cache = arv_put.ResumeCache(cachefile.name)
180         self.last_cache.save(thing)
181         self.assertEqual(thing, self.last_cache.load())
182
183     def test_empty_cache(self):
184         with tempfile.NamedTemporaryFile() as cachefile:
185             cache = arv_put.ResumeCache(cachefile.name)
186         self.assertRaises(ValueError, cache.load)
187
188     def test_cache_persistent(self):
189         thing = ['test', 'list']
190         path = os.path.join(self.make_tmpdir(), 'cache')
191         cache = arv_put.ResumeCache(path)
192         cache.save(thing)
193         cache.close()
194         self.last_cache = arv_put.ResumeCache(path)
195         self.assertEqual(thing, self.last_cache.load())
196
197     def test_multiple_cache_writes(self):
198         thing = ['short', 'list']
199         with tempfile.NamedTemporaryFile() as cachefile:
200             self.last_cache = arv_put.ResumeCache(cachefile.name)
201         # Start writing an object longer than the one we test, to make
202         # sure the cache file gets truncated.
203         self.last_cache.save(['long', 'long', 'list'])
204         self.last_cache.save(thing)
205         self.assertEqual(thing, self.last_cache.load())
206
207     def test_cache_is_locked(self):
208         with tempfile.NamedTemporaryFile() as cachefile:
209             cache = arv_put.ResumeCache(cachefile.name)
210             self.assertRaises(arv_put.ResumeCacheConflict,
211                               arv_put.ResumeCache, cachefile.name)
212
213     def test_cache_stays_locked(self):
214         with tempfile.NamedTemporaryFile() as cachefile:
215             self.last_cache = arv_put.ResumeCache(cachefile.name)
216             path = cachefile.name
217         self.last_cache.save('test')
218         self.assertRaises(arv_put.ResumeCacheConflict,
219                           arv_put.ResumeCache, path)
220
221     def test_destroy_cache(self):
222         cachefile = tempfile.NamedTemporaryFile(delete=False)
223         try:
224             cache = arv_put.ResumeCache(cachefile.name)
225             cache.save('test')
226             cache.destroy()
227             try:
228                 arv_put.ResumeCache(cachefile.name)
229             except arv_put.ResumeCacheConflict:
230                 self.fail("could not load cache after destroying it")
231             self.assertRaises(ValueError, cache.load)
232         finally:
233             if os.path.exists(cachefile.name):
234                 os.unlink(cachefile.name)
235
236     def test_restart_cache(self):
237         path = os.path.join(self.make_tmpdir(), 'cache')
238         cache = arv_put.ResumeCache(path)
239         cache.save('test')
240         cache.restart()
241         self.assertRaises(ValueError, cache.load)
242         self.assertRaises(arv_put.ResumeCacheConflict,
243                           arv_put.ResumeCache, path)
244
245
246 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
247                           ArvadosBaseTestCase):
248
249     def setUp(self):
250         super(ArvPutUploadJobTest, self).setUp()
251         run_test_server.authorize_with('active')
252         # Temp files creation
253         self.tempdir = tempfile.mkdtemp()
254         subdir = os.path.join(self.tempdir, 'subdir')
255         os.mkdir(subdir)
256         data = "x" * 1024 # 1 KB
257         for i in range(1, 5):
258             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
259                 f.write(data * i)
260         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
261             f.write(data * 5)
262         # Large temp file for resume test
263         _, self.large_file_name = tempfile.mkstemp()
264         fileobj = open(self.large_file_name, 'w')
265         # Make sure to write just a little more than one block
266         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
267             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
268             fileobj.write(data)
269         fileobj.close()
270         # Temp dir containing small files to be repacked
271         self.small_files_dir = tempfile.mkdtemp()
272         data = 'y' * 1024 * 1024 # 1 MB
273         for i in range(1, 70):
274             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
275                 f.write(data + str(i))
276         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
277         # Temp dir to hold a symlink to other temp dir
278         self.tempdir_with_symlink = tempfile.mkdtemp()
279         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
280         os.symlink(os.path.join(self.tempdir, '1'),
281                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
282
283     def tearDown(self):
284         super(ArvPutUploadJobTest, self).tearDown()
285         shutil.rmtree(self.tempdir)
286         os.unlink(self.large_file_name)
287         shutil.rmtree(self.small_files_dir)
288         shutil.rmtree(self.tempdir_with_symlink)
289
290     def test_symlinks_are_followed_by_default(self):
291         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
292         cwriter.start(save_collection=False)
293         self.assertIn('linkeddir', cwriter.manifest_text())
294         self.assertIn('linkedfile', cwriter.manifest_text())
295         cwriter.destroy_cache()
296
297     def test_symlinks_are_not_followed_when_requested(self):
298         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
299                                           follow_links=False)
300         cwriter.start(save_collection=False)
301         self.assertNotIn('linkeddir', cwriter.manifest_text())
302         self.assertNotIn('linkedfile', cwriter.manifest_text())
303         cwriter.destroy_cache()
304
305     def test_passing_nonexistant_path_raise_exception(self):
306         uuid_str = str(uuid.uuid4())
307         with self.assertRaises(arv_put.PathDoesNotExistError):
308             cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
309
310     def test_writer_works_without_cache(self):
311         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
312         cwriter.start(save_collection=False)
313         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
314
315     def test_writer_works_with_cache(self):
316         with tempfile.NamedTemporaryFile() as f:
317             f.write(b'foo')
318             f.flush()
319             cwriter = arv_put.ArvPutUploadJob([f.name])
320             cwriter.start(save_collection=False)
321             self.assertEqual(0, cwriter.bytes_skipped)
322             self.assertEqual(3, cwriter.bytes_written)
323             # Don't destroy the cache, and start another upload
324             cwriter_new = arv_put.ArvPutUploadJob([f.name])
325             cwriter_new.start(save_collection=False)
326             cwriter_new.destroy_cache()
327             self.assertEqual(3, cwriter_new.bytes_skipped)
328             self.assertEqual(3, cwriter_new.bytes_written)
329
330     def make_progress_tester(self):
331         progression = []
332         def record_func(written, expected):
333             progression.append((written, expected))
334         return progression, record_func
335
336     def test_progress_reporting(self):
337         with tempfile.NamedTemporaryFile() as f:
338             f.write(b'foo')
339             f.flush()
340             for expect_count in (None, 8):
341                 progression, reporter = self.make_progress_tester()
342                 cwriter = arv_put.ArvPutUploadJob([f.name],
343                                                   reporter=reporter)
344                 cwriter.bytes_expected = expect_count
345                 cwriter.start(save_collection=False)
346                 cwriter.destroy_cache()
347                 self.assertIn((3, expect_count), progression)
348
349     def test_writer_upload_directory(self):
350         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
351         cwriter.start(save_collection=False)
352         cwriter.destroy_cache()
353         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
354
355     def test_resume_large_file_upload(self):
356         def wrapped_write(*args, **kwargs):
357             data = args[1]
358             # Exit only on last block
359             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
360                 # Simulate a checkpoint before quitting. Ensure block commit.
361                 self.writer._update(final=True)
362                 raise SystemExit("Simulated error")
363             return self.arvfile_write(*args, **kwargs)
364
365         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
366                         autospec=True) as mocked_write:
367             mocked_write.side_effect = wrapped_write
368             writer = arv_put.ArvPutUploadJob([self.large_file_name],
369                                              replication_desired=1)
370             # We'll be accessing from inside the wrapper
371             self.writer = writer
372             with self.assertRaises(SystemExit):
373                 writer.start(save_collection=False)
374             # Confirm that the file was partially uploaded
375             self.assertGreater(writer.bytes_written, 0)
376             self.assertLess(writer.bytes_written,
377                             os.path.getsize(self.large_file_name))
378         # Retry the upload
379         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
380                                           replication_desired=1)
381         writer2.start(save_collection=False)
382         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
383                          os.path.getsize(self.large_file_name))
384         writer2.destroy_cache()
385         del(self.writer)
386
387     # Test for bug #11002
388     def test_graceful_exit_while_repacking_small_blocks(self):
389         def wrapped_commit(*args, **kwargs):
390             raise SystemExit("Simulated error")
391
392         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
393                         autospec=True) as mocked_commit:
394             mocked_commit.side_effect = wrapped_commit
395             # Upload a little more than 1 block, wrapped_commit will make the first block
396             # commit to fail.
397             # arv-put should not exit with an exception by trying to commit the collection
398             # as it's in an inconsistent state.
399             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
400                                              replication_desired=1)
401             try:
402                 with self.assertRaises(SystemExit):
403                     writer.start(save_collection=False)
404             except arvados.arvfile.UnownedBlockError:
405                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
406         writer.destroy_cache()
407
408     def test_no_resume_when_asked(self):
409         def wrapped_write(*args, **kwargs):
410             data = args[1]
411             # Exit only on last block
412             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
413                 # Simulate a checkpoint before quitting.
414                 self.writer._update()
415                 raise SystemExit("Simulated error")
416             return self.arvfile_write(*args, **kwargs)
417
418         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
419                         autospec=True) as mocked_write:
420             mocked_write.side_effect = wrapped_write
421             writer = arv_put.ArvPutUploadJob([self.large_file_name],
422                                              replication_desired=1)
423             # We'll be accessing from inside the wrapper
424             self.writer = writer
425             with self.assertRaises(SystemExit):
426                 writer.start(save_collection=False)
427             # Confirm that the file was partially uploaded
428             self.assertGreater(writer.bytes_written, 0)
429             self.assertLess(writer.bytes_written,
430                             os.path.getsize(self.large_file_name))
431         # Retry the upload, this time without resume
432         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
433                                           replication_desired=1,
434                                           resume=False)
435         writer2.start(save_collection=False)
436         self.assertEqual(writer2.bytes_skipped, 0)
437         self.assertEqual(writer2.bytes_written,
438                          os.path.getsize(self.large_file_name))
439         writer2.destroy_cache()
440         del(self.writer)
441
442     def test_no_resume_when_no_cache(self):
443         def wrapped_write(*args, **kwargs):
444             data = args[1]
445             # Exit only on last block
446             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
447                 # Simulate a checkpoint before quitting.
448                 self.writer._update()
449                 raise SystemExit("Simulated error")
450             return self.arvfile_write(*args, **kwargs)
451
452         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
453                         autospec=True) as mocked_write:
454             mocked_write.side_effect = wrapped_write
455             writer = arv_put.ArvPutUploadJob([self.large_file_name],
456                                              replication_desired=1)
457             # We'll be accessing from inside the wrapper
458             self.writer = writer
459             with self.assertRaises(SystemExit):
460                 writer.start(save_collection=False)
461             # Confirm that the file was partially uploaded
462             self.assertGreater(writer.bytes_written, 0)
463             self.assertLess(writer.bytes_written,
464                             os.path.getsize(self.large_file_name))
465         # Retry the upload, this time without cache usage
466         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
467                                           replication_desired=1,
468                                           resume=False,
469                                           use_cache=False)
470         writer2.start(save_collection=False)
471         self.assertEqual(writer2.bytes_skipped, 0)
472         self.assertEqual(writer2.bytes_written,
473                          os.path.getsize(self.large_file_name))
474         writer2.destroy_cache()
475         del(self.writer)
476
477     def test_dry_run_feature(self):
478         def wrapped_write(*args, **kwargs):
479             data = args[1]
480             # Exit only on last block
481             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
482                 # Simulate a checkpoint before quitting.
483                 self.writer._update()
484                 raise SystemExit("Simulated error")
485             return self.arvfile_write(*args, **kwargs)
486
487         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
488                         autospec=True) as mocked_write:
489             mocked_write.side_effect = wrapped_write
490             writer = arv_put.ArvPutUploadJob([self.large_file_name],
491                                              replication_desired=1)
492             # We'll be accessing from inside the wrapper
493             self.writer = writer
494             with self.assertRaises(SystemExit):
495                 writer.start(save_collection=False)
496             # Confirm that the file was partially uploaded
497             self.assertGreater(writer.bytes_written, 0)
498             self.assertLess(writer.bytes_written,
499                             os.path.getsize(self.large_file_name))
500         with self.assertRaises(arv_put.ArvPutUploadIsPending):
501             # Retry the upload using dry_run to check if there is a pending upload
502             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
503                                               replication_desired=1,
504                                               dry_run=True)
505         # Complete the pending upload
506         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
507                                           replication_desired=1)
508         writer3.start(save_collection=False)
509         with self.assertRaises(arv_put.ArvPutUploadNotPending):
510             # Confirm there's no pending upload with dry_run=True
511             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
512                                               replication_desired=1,
513                                               dry_run=True)
514         # Test obvious cases
515         with self.assertRaises(arv_put.ArvPutUploadIsPending):
516             arv_put.ArvPutUploadJob([self.large_file_name],
517                                     replication_desired=1,
518                                     dry_run=True,
519                                     resume=False,
520                                     use_cache=False)
521         with self.assertRaises(arv_put.ArvPutUploadIsPending):
522             arv_put.ArvPutUploadJob([self.large_file_name],
523                                     replication_desired=1,
524                                     dry_run=True,
525                                     resume=False)
526         del(self.writer)
527
528 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
529     TEST_SIZE = os.path.getsize(__file__)
530
531     def test_expected_bytes_for_file(self):
532         writer = arv_put.ArvPutUploadJob([__file__])
533         self.assertEqual(self.TEST_SIZE,
534                          writer.bytes_expected)
535
536     def test_expected_bytes_for_tree(self):
537         tree = self.make_tmpdir()
538         shutil.copyfile(__file__, os.path.join(tree, 'one'))
539         shutil.copyfile(__file__, os.path.join(tree, 'two'))
540
541         writer = arv_put.ArvPutUploadJob([tree])
542         self.assertEqual(self.TEST_SIZE * 2,
543                          writer.bytes_expected)
544         writer = arv_put.ArvPutUploadJob([tree, __file__])
545         self.assertEqual(self.TEST_SIZE * 3,
546                          writer.bytes_expected)
547
548     def test_expected_bytes_for_device(self):
549         writer = arv_put.ArvPutUploadJob(['/dev/null'])
550         self.assertIsNone(writer.bytes_expected)
551         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
552         self.assertIsNone(writer.bytes_expected)
553
554
555 class ArvadosPutReportTest(ArvadosBaseTestCase):
556     def test_machine_progress(self):
557         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
558             expect = ": {} written {} total\n".format(
559                 count, -1 if (total is None) else total)
560             self.assertTrue(
561                 arv_put.machine_progress(count, total).endswith(expect))
562
563     def test_known_human_progress(self):
564         for count, total in [(0, 1), (2, 4), (45, 60)]:
565             expect = '{:.1%}'.format(1.0*count/total)
566             actual = arv_put.human_progress(count, total)
567             self.assertTrue(actual.startswith('\r'))
568             self.assertIn(expect, actual)
569
570     def test_unknown_human_progress(self):
571         for count in [1, 20, 300, 4000, 50000]:
572             self.assertTrue(re.search(r'\b{}\b'.format(count),
573                                       arv_put.human_progress(count, None)))
574
575
576 class ArvadosPutTest(run_test_server.TestCaseWithServers,
577                      ArvadosBaseTestCase,
578                      tutil.VersionChecker):
579     MAIN_SERVER = {}
580     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
581
582     def call_main_with_args(self, args):
583         self.main_stdout = tutil.StringIO()
584         self.main_stderr = tutil.StringIO()
585         return arv_put.main(args, self.main_stdout, self.main_stderr)
586
587     def call_main_on_test_file(self, args=[]):
588         with self.make_test_file() as testfile:
589             path = testfile.name
590             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
591         self.assertTrue(
592             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
593                                         '098f6bcd4621d373cade4e832627b4f6')),
594             "did not find file stream in Keep store")
595
596     def setUp(self):
597         super(ArvadosPutTest, self).setUp()
598         run_test_server.authorize_with('active')
599         arv_put.api_client = None
600
601     def tearDown(self):
602         for outbuf in ['main_stdout', 'main_stderr']:
603             if hasattr(self, outbuf):
604                 getattr(self, outbuf).close()
605                 delattr(self, outbuf)
606         super(ArvadosPutTest, self).tearDown()
607
608     def test_version_argument(self):
609         with tutil.redirected_streams(
610                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
611             with self.assertRaises(SystemExit):
612                 self.call_main_with_args(['--version'])
613         self.assertVersionOutput(out, err)
614
615     def test_simple_file_put(self):
616         self.call_main_on_test_file()
617
618     def test_put_with_unwriteable_cache_dir(self):
619         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
620         cachedir = self.make_tmpdir()
621         os.chmod(cachedir, 0o0)
622         arv_put.ResumeCache.CACHE_DIR = cachedir
623         try:
624             self.call_main_on_test_file()
625         finally:
626             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
627             os.chmod(cachedir, 0o700)
628
629     def test_put_with_unwritable_cache_subdir(self):
630         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
631         cachedir = self.make_tmpdir()
632         os.chmod(cachedir, 0o0)
633         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
634         try:
635             self.call_main_on_test_file()
636         finally:
637             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
638             os.chmod(cachedir, 0o700)
639
640     def test_put_block_replication(self):
641         self.call_main_on_test_file()
642         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
643             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
644             self.call_main_on_test_file(['--replication', '1'])
645             self.call_main_on_test_file(['--replication', '4'])
646             self.call_main_on_test_file(['--replication', '5'])
647             self.assertEqual(
648                 [x[-1].get('copies') for x in put_mock.call_args_list],
649                 [1, 4, 5])
650
651     def test_normalize(self):
652         testfile1 = self.make_test_file()
653         testfile2 = self.make_test_file()
654         test_paths = [testfile1.name, testfile2.name]
655         # Reverse-sort the paths, so normalization must change their order.
656         test_paths.sort(reverse=True)
657         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
658                                  test_paths)
659         manifest = self.main_stdout.getvalue()
660         # Assert the second file we specified appears first in the manifest.
661         file_indices = [manifest.find(':' + os.path.basename(path))
662                         for path in test_paths]
663         self.assertGreater(*file_indices)
664
665     def test_error_name_without_collection(self):
666         self.assertRaises(SystemExit, self.call_main_with_args,
667                           ['--name', 'test without Collection',
668                            '--stream', '/dev/null'])
669
670     def test_error_when_project_not_found(self):
671         self.assertRaises(SystemExit,
672                           self.call_main_with_args,
673                           ['--project-uuid', self.Z_UUID])
674
675     def test_error_bad_project_uuid(self):
676         self.assertRaises(SystemExit,
677                           self.call_main_with_args,
678                           ['--project-uuid', self.Z_UUID, '--stream'])
679
680     def test_error_when_excluding_absolute_path(self):
681         tmpdir = self.make_tmpdir()
682         self.assertRaises(SystemExit,
683                           self.call_main_with_args,
684                           ['--exclude', '/some/absolute/path/*',
685                            tmpdir])
686
687     def test_api_error_handling(self):
688         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
689         coll_save_mock.side_effect = arvados.errors.ApiError(
690             fake_httplib2_response(403), b'{}')
691         with mock.patch('arvados.collection.Collection.save_new',
692                         new=coll_save_mock):
693             with self.assertRaises(SystemExit) as exc_test:
694                 self.call_main_with_args(['/dev/null'])
695             self.assertLess(0, exc_test.exception.args[0])
696             self.assertLess(0, coll_save_mock.call_count)
697             self.assertEqual("", self.main_stdout.getvalue())
698
699
700 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
701                             ArvadosBaseTestCase):
702     def _getKeepServerConfig():
703         for config_file, mandatory in [
704                 ['application.yml', False], ['application.default.yml', True]]:
705             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
706                                 "api", "config", config_file)
707             if not mandatory and not os.path.exists(path):
708                 continue
709             with open(path) as f:
710                 rails_config = yaml.load(f.read())
711                 for config_section in ['test', 'common']:
712                     try:
713                         key = rails_config[config_section]["blob_signing_key"]
714                     except (KeyError, TypeError):
715                         pass
716                     else:
717                         return {'blob_signing_key': key,
718                                 'enforce_permissions': True}
719         return {'blog_signing_key': None, 'enforce_permissions': False}
720
721     MAIN_SERVER = {}
722     KEEP_SERVER = _getKeepServerConfig()
723     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
724
725     @classmethod
726     def setUpClass(cls):
727         super(ArvPutIntegrationTest, cls).setUpClass()
728         cls.ENVIRON = os.environ.copy()
729         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
730
731     def setUp(self):
732         super(ArvPutIntegrationTest, self).setUp()
733         arv_put.api_client = None
734
735     def authorize_with(self, token_name):
736         run_test_server.authorize_with(token_name)
737         for v in ["ARVADOS_API_HOST",
738                   "ARVADOS_API_HOST_INSECURE",
739                   "ARVADOS_API_TOKEN"]:
740             self.ENVIRON[v] = arvados.config.settings()[v]
741         arv_put.api_client = arvados.api('v1')
742
743     def current_user(self):
744         return arv_put.api_client.users().current().execute()
745
746     def test_check_real_project_found(self):
747         self.authorize_with('active')
748         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
749                         "did not correctly find test fixture project")
750
751     def test_check_error_finding_nonexistent_uuid(self):
752         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
753         self.authorize_with('active')
754         try:
755             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
756                                                   0)
757         except ValueError as error:
758             self.assertIn(BAD_UUID, str(error))
759         else:
760             self.assertFalse(result, "incorrectly found nonexistent project")
761
762     def test_check_error_finding_nonexistent_project(self):
763         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
764         self.authorize_with('active')
765         with self.assertRaises(apiclient.errors.HttpError):
766             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
767                                                   0)
768
769     def test_short_put_from_stdin(self):
770         # Have to run this as an integration test since arv-put can't
771         # read from the tests' stdin.
772         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
773         # case, because the /proc entry is already gone by the time it tries.
774         pipe = subprocess.Popen(
775             [sys.executable, arv_put.__file__, '--stream'],
776             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
777             stderr=subprocess.STDOUT, env=self.ENVIRON)
778         pipe.stdin.write(b'stdin test\n')
779         pipe.stdin.close()
780         deadline = time.time() + 5
781         while (pipe.poll() is None) and (time.time() < deadline):
782             time.sleep(.1)
783         returncode = pipe.poll()
784         if returncode is None:
785             pipe.terminate()
786             self.fail("arv-put did not PUT from stdin within 5 seconds")
787         elif returncode != 0:
788             sys.stdout.write(pipe.stdout.read())
789             self.fail("arv-put returned exit code {}".format(returncode))
790         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
791                       pipe.stdout.read().decode())
792
793     def test_ArvPutSignedManifest(self):
794         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
795         # the newly created manifest from the API server, testing to confirm
796         # that the block locators in the returned manifest are signed.
797         self.authorize_with('active')
798
799         # Before doing anything, demonstrate that the collection
800         # we're about to create is not present in our test fixture.
801         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
802         with self.assertRaises(apiclient.errors.HttpError):
803             notfound = arv_put.api_client.collections().get(
804                 uuid=manifest_uuid).execute()
805
806         datadir = self.make_tmpdir()
807         with open(os.path.join(datadir, "foo"), "w") as f:
808             f.write("The quick brown fox jumped over the lazy dog")
809         p = subprocess.Popen([sys.executable, arv_put.__file__,
810                               os.path.join(datadir, 'foo')],
811                              stdout=subprocess.PIPE,
812                              stderr=subprocess.PIPE,
813                              env=self.ENVIRON)
814         (out, err) = p.communicate()
815         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
816         self.assertEqual(p.returncode, 0)
817
818         # The manifest text stored in the API server under the same
819         # manifest UUID must use signed locators.
820         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
821         self.assertRegex(
822             c['manifest_text'],
823             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
824
825         os.remove(os.path.join(datadir, "foo"))
826         os.rmdir(datadir)
827
828     def run_and_find_collection(self, text, extra_args=[]):
829         self.authorize_with('active')
830         pipe = subprocess.Popen(
831             [sys.executable, arv_put.__file__] + extra_args,
832             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
833             stderr=subprocess.PIPE, env=self.ENVIRON)
834         stdout, stderr = pipe.communicate(text.encode())
835         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
836         search_key = ('portable_data_hash'
837                       if '--portable-data-hash' in extra_args else 'uuid')
838         collection_list = arvados.api('v1').collections().list(
839             filters=[[search_key, '=', stdout.decode().strip()]]
840         ).execute().get('items', [])
841         self.assertEqual(1, len(collection_list))
842         return collection_list[0]
843
844     def test_invalid_token_invalidates_cache(self):
845         self.authorize_with('active')
846         tmpdir = self.make_tmpdir()
847         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
848             f.write('foo')
849         # Upload a directory and get the cache file name
850         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
851                              stdout=subprocess.PIPE,
852                              stderr=subprocess.PIPE,
853                              env=self.ENVIRON)
854         (out, err) = p.communicate()
855         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
856         self.assertEqual(p.returncode, 0)
857         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
858                                    err.decode()).groups()[0]
859         self.assertTrue(os.path.isfile(cache_filepath))
860         # Load the cache file contents and modify the manifest to simulate
861         # an invalid access token
862         with open(cache_filepath, 'r') as c:
863             cache = json.load(c)
864         self.assertRegex(cache['manifest'], r'\+A\S+\@')
865         cache['manifest'] = re.sub(r'\+A\S+\@',
866                                    '+Athistokendoesnotwork@',
867                                    cache['manifest'])
868         with open(cache_filepath, 'w') as c:
869             c.write(json.dumps(cache))
870         # Re-run the upload and expect to get an invalid cache message
871         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
872                              stdout=subprocess.PIPE,
873                              stderr=subprocess.PIPE,
874                              env=self.ENVIRON)
875         (out, err) = p.communicate()
876         self.assertRegex(
877             err.decode(),
878             r'INFO: Cache file (.*) is not valid, starting from scratch')
879         self.assertEqual(p.returncode, 0)
880
881     def test_put_collection_with_later_update(self):
882         tmpdir = self.make_tmpdir()
883         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
884             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
885         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
886         self.assertNotEqual(None, col['uuid'])
887         # Add a new file to the directory
888         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
889             f.write('The quick brown fox jumped over the lazy dog')
890         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
891         self.assertEqual(col['uuid'], updated_col['uuid'])
892         # Get the manifest and check that the new file is being included
893         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
894         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
895
896     def test_upload_directory_reference_without_trailing_slash(self):
897         tmpdir1 = self.make_tmpdir()
898         tmpdir2 = self.make_tmpdir()
899         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
900             f.write('This is foo')
901         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
902             f.write('This is not foo')
903         # Upload one directory and one file
904         col = self.run_and_find_collection("", ['--no-progress',
905                                                 tmpdir1,
906                                                 os.path.join(tmpdir2, 'bar')])
907         self.assertNotEqual(None, col['uuid'])
908         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
909         # Check that 'foo' was written inside a subcollection
910         # OTOH, 'bar' should have been directly uploaded on the root collection
911         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
912
913     def test_upload_directory_reference_with_trailing_slash(self):
914         tmpdir1 = self.make_tmpdir()
915         tmpdir2 = self.make_tmpdir()
916         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
917             f.write('This is foo')
918         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
919             f.write('This is not foo')
920         # Upload one directory (with trailing slash) and one file
921         col = self.run_and_find_collection("", ['--no-progress',
922                                                 tmpdir1 + os.sep,
923                                                 os.path.join(tmpdir2, 'bar')])
924         self.assertNotEqual(None, col['uuid'])
925         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
926         # Check that 'foo' and 'bar' were written at the same level
927         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
928
929     def test_put_collection_with_high_redundancy(self):
930         # Write empty data: we're not testing CollectionWriter, just
931         # making sure collections.create tells the API server what our
932         # desired replication level is.
933         collection = self.run_and_find_collection("", ['--replication', '4'])
934         self.assertEqual(4, collection['replication_desired'])
935
936     def test_put_collection_with_default_redundancy(self):
937         collection = self.run_and_find_collection("")
938         self.assertEqual(None, collection['replication_desired'])
939
940     def test_put_collection_with_unnamed_project_link(self):
941         link = self.run_and_find_collection(
942             "Test unnamed collection",
943             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
944         username = pwd.getpwuid(os.getuid()).pw_name
945         self.assertRegex(
946             link['name'],
947             r'^Saved at .* by {}@'.format(re.escape(username)))
948
949     def test_put_collection_with_name_and_no_project(self):
950         link_name = 'Test Collection Link in home project'
951         collection = self.run_and_find_collection(
952             "Test named collection in home project",
953             ['--portable-data-hash', '--name', link_name])
954         self.assertEqual(link_name, collection['name'])
955         my_user_uuid = self.current_user()['uuid']
956         self.assertEqual(my_user_uuid, collection['owner_uuid'])
957
958     def test_put_collection_with_named_project_link(self):
959         link_name = 'Test auto Collection Link'
960         collection = self.run_and_find_collection("Test named collection",
961                                       ['--portable-data-hash',
962                                        '--name', link_name,
963                                        '--project-uuid', self.PROJECT_UUID])
964         self.assertEqual(link_name, collection['name'])
965
966     def test_exclude_filename_pattern(self):
967         tmpdir = self.make_tmpdir()
968         tmpsubdir = os.path.join(tmpdir, 'subdir')
969         os.mkdir(tmpsubdir)
970         for fname in ['file1', 'file2', 'file3']:
971             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
972                 f.write("This is %s" % fname)
973             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
974                 f.write("This is %s" % fname)
975         col = self.run_and_find_collection("", ['--no-progress',
976                                                 '--exclude', '*2.txt',
977                                                 '--exclude', 'file3.*',
978                                                  tmpdir])
979         self.assertNotEqual(None, col['uuid'])
980         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
981         # None of the file2.txt & file3.txt should have been uploaded
982         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
983         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
984         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
985
986     def test_exclude_filepath_pattern(self):
987         tmpdir = self.make_tmpdir()
988         tmpsubdir = os.path.join(tmpdir, 'subdir')
989         os.mkdir(tmpsubdir)
990         for fname in ['file1', 'file2', 'file3']:
991             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
992                 f.write("This is %s" % fname)
993             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
994                 f.write("This is %s" % fname)
995         col = self.run_and_find_collection("", ['--no-progress',
996                                                 '--exclude', 'subdir/*2.txt',
997                                                 '--exclude', './file1.*',
998                                                  tmpdir])
999         self.assertNotEqual(None, col['uuid'])
1000         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1001         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1002         self.assertNotRegex(c['manifest_text'],
1003                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1004         self.assertNotRegex(c['manifest_text'],
1005                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1006         self.assertRegex(c['manifest_text'],
1007                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1008         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1009
1010     def test_silent_mode_no_errors(self):
1011         self.authorize_with('active')
1012         tmpdir = self.make_tmpdir()
1013         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1014             f.write('hello world')
1015         pipe = subprocess.Popen(
1016             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1017             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1018             stderr=subprocess.PIPE, env=self.ENVIRON)
1019         stdout, stderr = pipe.communicate()
1020         # No console output should occur on normal operations
1021         self.assertNotRegex(stderr.decode(), r'.+')
1022         self.assertNotRegex(stdout.decode(), r'.+')
1023
1024     def test_silent_mode_does_not_avoid_error_messages(self):
1025         self.authorize_with('active')
1026         pipe = subprocess.Popen(
1027             [sys.executable, arv_put.__file__] + ['--silent',
1028                                                   '/path/not/existant'],
1029             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1030             stderr=subprocess.PIPE, env=self.ENVIRON)
1031         stdout, stderr = pipe.communicate()
1032         # Error message should be displayed when errors happen
1033         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1034         self.assertNotRegex(stdout.decode(), r'.+')
1035
1036
1037 if __name__ == '__main__':
1038     unittest.main()