12055: Merge branch 'master' into 12055-nodemanager-ec2-tags
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 import apiclient
12 import datetime
13 import hashlib
14 import json
15 import mock
16 import os
17 import pwd
18 import random
19 import re
20 import shutil
21 import subprocess
22 import sys
23 import tempfile
24 import threading
25 import time
26 import unittest
27 import uuid
28 import yaml
29
30 import arvados
31 import arvados.commands.put as arv_put
32 from . import arvados_testutil as tutil
33
34 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
35 from . import run_test_server
36
37 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
38     CACHE_ARGSET = [
39         [],
40         ['/dev/null'],
41         ['/dev/null', '--filename', 'empty'],
42         ['/tmp']
43         ]
44
45     def tearDown(self):
46         super(ArvadosPutResumeCacheTest, self).tearDown()
47         try:
48             self.last_cache.destroy()
49         except AttributeError:
50             pass
51
52     def cache_path_from_arglist(self, arglist):
53         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
54
55     def test_cache_names_stable(self):
56         for argset in self.CACHE_ARGSET:
57             self.assertEqual(self.cache_path_from_arglist(argset),
58                               self.cache_path_from_arglist(argset),
59                               "cache name changed for {}".format(argset))
60
61     def test_cache_names_unique(self):
62         results = []
63         for argset in self.CACHE_ARGSET:
64             path = self.cache_path_from_arglist(argset)
65             self.assertNotIn(path, results)
66             results.append(path)
67
68     def test_cache_names_simple(self):
69         # The goal here is to make sure the filename doesn't use characters
70         # reserved by the filesystem.  Feel free to adjust this regexp as
71         # long as it still does that.
72         bad_chars = re.compile(r'[^-\.\w]')
73         for argset in self.CACHE_ARGSET:
74             path = self.cache_path_from_arglist(argset)
75             self.assertFalse(bad_chars.search(os.path.basename(path)),
76                              "path too exotic: {}".format(path))
77
78     def test_cache_names_ignore_argument_order(self):
79         self.assertEqual(
80             self.cache_path_from_arglist(['a', 'b', 'c']),
81             self.cache_path_from_arglist(['c', 'a', 'b']))
82         self.assertEqual(
83             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
84             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
85
86     def test_cache_names_differ_for_similar_paths(self):
87         # This test needs names at / that don't exist on the real filesystem.
88         self.assertNotEqual(
89             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
90             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
91
92     def test_cache_names_ignore_irrelevant_arguments(self):
93         # Workaround: parse_arguments bails on --filename with a directory.
94         path1 = self.cache_path_from_arglist(['/tmp'])
95         args = arv_put.parse_arguments(['/tmp'])
96         args.filename = 'tmp'
97         path2 = arv_put.ResumeCache.make_path(args)
98         self.assertEqual(path1, path2,
99                          "cache path considered --filename for directory")
100         self.assertEqual(
101             self.cache_path_from_arglist(['-']),
102             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
103             "cache path considered --max-manifest-depth for file")
104
105     def test_cache_names_treat_negative_manifest_depths_identically(self):
106         base_args = ['/tmp', '--max-manifest-depth']
107         self.assertEqual(
108             self.cache_path_from_arglist(base_args + ['-1']),
109             self.cache_path_from_arglist(base_args + ['-2']))
110
111     def test_cache_names_treat_stdin_consistently(self):
112         self.assertEqual(
113             self.cache_path_from_arglist(['-', '--filename', 'test']),
114             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
115
116     def test_cache_names_identical_for_synonymous_names(self):
117         self.assertEqual(
118             self.cache_path_from_arglist(['.']),
119             self.cache_path_from_arglist([os.path.realpath('.')]))
120         testdir = self.make_tmpdir()
121         looplink = os.path.join(testdir, 'loop')
122         os.symlink(testdir, looplink)
123         self.assertEqual(
124             self.cache_path_from_arglist([testdir]),
125             self.cache_path_from_arglist([looplink]))
126
127     def test_cache_names_different_by_api_host(self):
128         config = arvados.config.settings()
129         orig_host = config.get('ARVADOS_API_HOST')
130         try:
131             name1 = self.cache_path_from_arglist(['.'])
132             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
133             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
134         finally:
135             if orig_host is None:
136                 del config['ARVADOS_API_HOST']
137             else:
138                 config['ARVADOS_API_HOST'] = orig_host
139
140     @mock.patch('arvados.keep.KeepClient.head')
141     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
142         keep_client_head.side_effect = [True]
143         thing = {}
144         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
145         with tempfile.NamedTemporaryFile() as cachefile:
146             self.last_cache = arv_put.ResumeCache(cachefile.name)
147         self.last_cache.save(thing)
148         self.last_cache.close()
149         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
150         self.assertNotEqual(None, resume_cache)
151
152     @mock.patch('arvados.keep.KeepClient.head')
153     def test_resume_cache_with_finished_streams(self, keep_client_head):
154         keep_client_head.side_effect = [True]
155         thing = {}
156         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
157         with tempfile.NamedTemporaryFile() as cachefile:
158             self.last_cache = arv_put.ResumeCache(cachefile.name)
159         self.last_cache.save(thing)
160         self.last_cache.close()
161         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
162         self.assertNotEqual(None, resume_cache)
163
164     @mock.patch('arvados.keep.KeepClient.head')
165     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
166         keep_client_head.side_effect = Exception('Locator not found')
167         thing = {}
168         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
169         with tempfile.NamedTemporaryFile() as cachefile:
170             self.last_cache = arv_put.ResumeCache(cachefile.name)
171         self.last_cache.save(thing)
172         self.last_cache.close()
173         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
174         self.assertNotEqual(None, resume_cache)
175         resume_cache.check_cache()
176
177     def test_basic_cache_storage(self):
178         thing = ['test', 'list']
179         with tempfile.NamedTemporaryFile() as cachefile:
180             self.last_cache = arv_put.ResumeCache(cachefile.name)
181         self.last_cache.save(thing)
182         self.assertEqual(thing, self.last_cache.load())
183
184     def test_empty_cache(self):
185         with tempfile.NamedTemporaryFile() as cachefile:
186             cache = arv_put.ResumeCache(cachefile.name)
187         self.assertRaises(ValueError, cache.load)
188
189     def test_cache_persistent(self):
190         thing = ['test', 'list']
191         path = os.path.join(self.make_tmpdir(), 'cache')
192         cache = arv_put.ResumeCache(path)
193         cache.save(thing)
194         cache.close()
195         self.last_cache = arv_put.ResumeCache(path)
196         self.assertEqual(thing, self.last_cache.load())
197
198     def test_multiple_cache_writes(self):
199         thing = ['short', 'list']
200         with tempfile.NamedTemporaryFile() as cachefile:
201             self.last_cache = arv_put.ResumeCache(cachefile.name)
202         # Start writing an object longer than the one we test, to make
203         # sure the cache file gets truncated.
204         self.last_cache.save(['long', 'long', 'list'])
205         self.last_cache.save(thing)
206         self.assertEqual(thing, self.last_cache.load())
207
208     def test_cache_is_locked(self):
209         with tempfile.NamedTemporaryFile() as cachefile:
210             cache = arv_put.ResumeCache(cachefile.name)
211             self.assertRaises(arv_put.ResumeCacheConflict,
212                               arv_put.ResumeCache, cachefile.name)
213
214     def test_cache_stays_locked(self):
215         with tempfile.NamedTemporaryFile() as cachefile:
216             self.last_cache = arv_put.ResumeCache(cachefile.name)
217             path = cachefile.name
218         self.last_cache.save('test')
219         self.assertRaises(arv_put.ResumeCacheConflict,
220                           arv_put.ResumeCache, path)
221
222     def test_destroy_cache(self):
223         cachefile = tempfile.NamedTemporaryFile(delete=False)
224         try:
225             cache = arv_put.ResumeCache(cachefile.name)
226             cache.save('test')
227             cache.destroy()
228             try:
229                 arv_put.ResumeCache(cachefile.name)
230             except arv_put.ResumeCacheConflict:
231                 self.fail("could not load cache after destroying it")
232             self.assertRaises(ValueError, cache.load)
233         finally:
234             if os.path.exists(cachefile.name):
235                 os.unlink(cachefile.name)
236
237     def test_restart_cache(self):
238         path = os.path.join(self.make_tmpdir(), 'cache')
239         cache = arv_put.ResumeCache(path)
240         cache.save('test')
241         cache.restart()
242         self.assertRaises(ValueError, cache.load)
243         self.assertRaises(arv_put.ResumeCacheConflict,
244                           arv_put.ResumeCache, path)
245
246
247 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
248                           ArvadosBaseTestCase):
249
250     def setUp(self):
251         super(ArvPutUploadJobTest, self).setUp()
252         run_test_server.authorize_with('active')
253         # Temp files creation
254         self.tempdir = tempfile.mkdtemp()
255         subdir = os.path.join(self.tempdir, 'subdir')
256         os.mkdir(subdir)
257         data = "x" * 1024 # 1 KB
258         for i in range(1, 5):
259             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
260                 f.write(data * i)
261         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
262             f.write(data * 5)
263         # Large temp file for resume test
264         _, self.large_file_name = tempfile.mkstemp()
265         fileobj = open(self.large_file_name, 'w')
266         # Make sure to write just a little more than one block
267         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
268             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
269             fileobj.write(data)
270         fileobj.close()
271         # Temp dir containing small files to be repacked
272         self.small_files_dir = tempfile.mkdtemp()
273         data = 'y' * 1024 * 1024 # 1 MB
274         for i in range(1, 70):
275             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
276                 f.write(data + str(i))
277         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
278         # Temp dir to hold a symlink to other temp dir
279         self.tempdir_with_symlink = tempfile.mkdtemp()
280         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
281         os.symlink(os.path.join(self.tempdir, '1'),
282                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
283
284     def tearDown(self):
285         super(ArvPutUploadJobTest, self).tearDown()
286         shutil.rmtree(self.tempdir)
287         os.unlink(self.large_file_name)
288         shutil.rmtree(self.small_files_dir)
289         shutil.rmtree(self.tempdir_with_symlink)
290
291     def test_symlinks_are_followed_by_default(self):
292         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
293         cwriter.start(save_collection=False)
294         self.assertIn('linkeddir', cwriter.manifest_text())
295         self.assertIn('linkedfile', cwriter.manifest_text())
296         cwriter.destroy_cache()
297
298     def test_symlinks_are_not_followed_when_requested(self):
299         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
300                                           follow_links=False)
301         cwriter.start(save_collection=False)
302         self.assertNotIn('linkeddir', cwriter.manifest_text())
303         self.assertNotIn('linkedfile', cwriter.manifest_text())
304         cwriter.destroy_cache()
305
306     def test_passing_nonexistant_path_raise_exception(self):
307         uuid_str = str(uuid.uuid4())
308         with self.assertRaises(arv_put.PathDoesNotExistError):
309             cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
310
311     def test_writer_works_without_cache(self):
312         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
313         cwriter.start(save_collection=False)
314         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
315
316     def test_writer_works_with_cache(self):
317         with tempfile.NamedTemporaryFile() as f:
318             f.write(b'foo')
319             f.flush()
320             cwriter = arv_put.ArvPutUploadJob([f.name])
321             cwriter.start(save_collection=False)
322             self.assertEqual(0, cwriter.bytes_skipped)
323             self.assertEqual(3, cwriter.bytes_written)
324             # Don't destroy the cache, and start another upload
325             cwriter_new = arv_put.ArvPutUploadJob([f.name])
326             cwriter_new.start(save_collection=False)
327             cwriter_new.destroy_cache()
328             self.assertEqual(3, cwriter_new.bytes_skipped)
329             self.assertEqual(3, cwriter_new.bytes_written)
330
331     def make_progress_tester(self):
332         progression = []
333         def record_func(written, expected):
334             progression.append((written, expected))
335         return progression, record_func
336
337     def test_progress_reporting(self):
338         with tempfile.NamedTemporaryFile() as f:
339             f.write(b'foo')
340             f.flush()
341             for expect_count in (None, 8):
342                 progression, reporter = self.make_progress_tester()
343                 cwriter = arv_put.ArvPutUploadJob([f.name],
344                                                   reporter=reporter)
345                 cwriter.bytes_expected = expect_count
346                 cwriter.start(save_collection=False)
347                 cwriter.destroy_cache()
348                 self.assertIn((3, expect_count), progression)
349
350     def test_writer_upload_directory(self):
351         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
352         cwriter.start(save_collection=False)
353         cwriter.destroy_cache()
354         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
355
356     def test_resume_large_file_upload(self):
357         def wrapped_write(*args, **kwargs):
358             data = args[1]
359             # Exit only on last block
360             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
361                 # Simulate a checkpoint before quitting. Ensure block commit.
362                 self.writer._update(final=True)
363                 raise SystemExit("Simulated error")
364             return self.arvfile_write(*args, **kwargs)
365
366         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
367                         autospec=True) as mocked_write:
368             mocked_write.side_effect = wrapped_write
369             writer = arv_put.ArvPutUploadJob([self.large_file_name],
370                                              replication_desired=1)
371             # We'll be accessing from inside the wrapper
372             self.writer = writer
373             with self.assertRaises(SystemExit):
374                 writer.start(save_collection=False)
375             # Confirm that the file was partially uploaded
376             self.assertGreater(writer.bytes_written, 0)
377             self.assertLess(writer.bytes_written,
378                             os.path.getsize(self.large_file_name))
379         # Retry the upload
380         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
381                                           replication_desired=1)
382         writer2.start(save_collection=False)
383         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
384                          os.path.getsize(self.large_file_name))
385         writer2.destroy_cache()
386         del(self.writer)
387
388     # Test for bug #11002
389     def test_graceful_exit_while_repacking_small_blocks(self):
390         def wrapped_commit(*args, **kwargs):
391             raise SystemExit("Simulated error")
392
393         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
394                         autospec=True) as mocked_commit:
395             mocked_commit.side_effect = wrapped_commit
396             # Upload a little more than 1 block, wrapped_commit will make the first block
397             # commit to fail.
398             # arv-put should not exit with an exception by trying to commit the collection
399             # as it's in an inconsistent state.
400             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
401                                              replication_desired=1)
402             try:
403                 with self.assertRaises(SystemExit):
404                     writer.start(save_collection=False)
405             except arvados.arvfile.UnownedBlockError:
406                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
407         writer.destroy_cache()
408
409     def test_no_resume_when_asked(self):
410         def wrapped_write(*args, **kwargs):
411             data = args[1]
412             # Exit only on last block
413             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
414                 # Simulate a checkpoint before quitting.
415                 self.writer._update()
416                 raise SystemExit("Simulated error")
417             return self.arvfile_write(*args, **kwargs)
418
419         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
420                         autospec=True) as mocked_write:
421             mocked_write.side_effect = wrapped_write
422             writer = arv_put.ArvPutUploadJob([self.large_file_name],
423                                              replication_desired=1)
424             # We'll be accessing from inside the wrapper
425             self.writer = writer
426             with self.assertRaises(SystemExit):
427                 writer.start(save_collection=False)
428             # Confirm that the file was partially uploaded
429             self.assertGreater(writer.bytes_written, 0)
430             self.assertLess(writer.bytes_written,
431                             os.path.getsize(self.large_file_name))
432         # Retry the upload, this time without resume
433         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
434                                           replication_desired=1,
435                                           resume=False)
436         writer2.start(save_collection=False)
437         self.assertEqual(writer2.bytes_skipped, 0)
438         self.assertEqual(writer2.bytes_written,
439                          os.path.getsize(self.large_file_name))
440         writer2.destroy_cache()
441         del(self.writer)
442
443     def test_no_resume_when_no_cache(self):
444         def wrapped_write(*args, **kwargs):
445             data = args[1]
446             # Exit only on last block
447             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
448                 # Simulate a checkpoint before quitting.
449                 self.writer._update()
450                 raise SystemExit("Simulated error")
451             return self.arvfile_write(*args, **kwargs)
452
453         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
454                         autospec=True) as mocked_write:
455             mocked_write.side_effect = wrapped_write
456             writer = arv_put.ArvPutUploadJob([self.large_file_name],
457                                              replication_desired=1)
458             # We'll be accessing from inside the wrapper
459             self.writer = writer
460             with self.assertRaises(SystemExit):
461                 writer.start(save_collection=False)
462             # Confirm that the file was partially uploaded
463             self.assertGreater(writer.bytes_written, 0)
464             self.assertLess(writer.bytes_written,
465                             os.path.getsize(self.large_file_name))
466         # Retry the upload, this time without cache usage
467         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
468                                           replication_desired=1,
469                                           resume=False,
470                                           use_cache=False)
471         writer2.start(save_collection=False)
472         self.assertEqual(writer2.bytes_skipped, 0)
473         self.assertEqual(writer2.bytes_written,
474                          os.path.getsize(self.large_file_name))
475         writer2.destroy_cache()
476         del(self.writer)
477
478     def test_dry_run_feature(self):
479         def wrapped_write(*args, **kwargs):
480             data = args[1]
481             # Exit only on last block
482             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
483                 # Simulate a checkpoint before quitting.
484                 self.writer._update()
485                 raise SystemExit("Simulated error")
486             return self.arvfile_write(*args, **kwargs)
487
488         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
489                         autospec=True) as mocked_write:
490             mocked_write.side_effect = wrapped_write
491             writer = arv_put.ArvPutUploadJob([self.large_file_name],
492                                              replication_desired=1)
493             # We'll be accessing from inside the wrapper
494             self.writer = writer
495             with self.assertRaises(SystemExit):
496                 writer.start(save_collection=False)
497             # Confirm that the file was partially uploaded
498             self.assertGreater(writer.bytes_written, 0)
499             self.assertLess(writer.bytes_written,
500                             os.path.getsize(self.large_file_name))
501         with self.assertRaises(arv_put.ArvPutUploadIsPending):
502             # Retry the upload using dry_run to check if there is a pending upload
503             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
504                                               replication_desired=1,
505                                               dry_run=True)
506         # Complete the pending upload
507         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
508                                           replication_desired=1)
509         writer3.start(save_collection=False)
510         with self.assertRaises(arv_put.ArvPutUploadNotPending):
511             # Confirm there's no pending upload with dry_run=True
512             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
513                                               replication_desired=1,
514                                               dry_run=True)
515         # Test obvious cases
516         with self.assertRaises(arv_put.ArvPutUploadIsPending):
517             arv_put.ArvPutUploadJob([self.large_file_name],
518                                     replication_desired=1,
519                                     dry_run=True,
520                                     resume=False,
521                                     use_cache=False)
522         with self.assertRaises(arv_put.ArvPutUploadIsPending):
523             arv_put.ArvPutUploadJob([self.large_file_name],
524                                     replication_desired=1,
525                                     dry_run=True,
526                                     resume=False)
527         del(self.writer)
528
529 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
530     TEST_SIZE = os.path.getsize(__file__)
531
532     def test_expected_bytes_for_file(self):
533         writer = arv_put.ArvPutUploadJob([__file__])
534         self.assertEqual(self.TEST_SIZE,
535                          writer.bytes_expected)
536
537     def test_expected_bytes_for_tree(self):
538         tree = self.make_tmpdir()
539         shutil.copyfile(__file__, os.path.join(tree, 'one'))
540         shutil.copyfile(__file__, os.path.join(tree, 'two'))
541
542         writer = arv_put.ArvPutUploadJob([tree])
543         self.assertEqual(self.TEST_SIZE * 2,
544                          writer.bytes_expected)
545         writer = arv_put.ArvPutUploadJob([tree, __file__])
546         self.assertEqual(self.TEST_SIZE * 3,
547                          writer.bytes_expected)
548
549     def test_expected_bytes_for_device(self):
550         writer = arv_put.ArvPutUploadJob(['/dev/null'])
551         self.assertIsNone(writer.bytes_expected)
552         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
553         self.assertIsNone(writer.bytes_expected)
554
555
556 class ArvadosPutReportTest(ArvadosBaseTestCase):
557     def test_machine_progress(self):
558         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
559             expect = ": {} written {} total\n".format(
560                 count, -1 if (total is None) else total)
561             self.assertTrue(
562                 arv_put.machine_progress(count, total).endswith(expect))
563
564     def test_known_human_progress(self):
565         for count, total in [(0, 1), (2, 4), (45, 60)]:
566             expect = '{:.1%}'.format(1.0*count/total)
567             actual = arv_put.human_progress(count, total)
568             self.assertTrue(actual.startswith('\r'))
569             self.assertIn(expect, actual)
570
571     def test_unknown_human_progress(self):
572         for count in [1, 20, 300, 4000, 50000]:
573             self.assertTrue(re.search(r'\b{}\b'.format(count),
574                                       arv_put.human_progress(count, None)))
575
576
577 class ArvadosPutTest(run_test_server.TestCaseWithServers,
578                      ArvadosBaseTestCase,
579                      tutil.VersionChecker):
580     MAIN_SERVER = {}
581     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
582
583     def call_main_with_args(self, args):
584         self.main_stdout = tutil.StringIO()
585         self.main_stderr = tutil.StringIO()
586         return arv_put.main(args, self.main_stdout, self.main_stderr)
587
588     def call_main_on_test_file(self, args=[]):
589         with self.make_test_file() as testfile:
590             path = testfile.name
591             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
592         self.assertTrue(
593             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
594                                         '098f6bcd4621d373cade4e832627b4f6')),
595             "did not find file stream in Keep store")
596
597     def setUp(self):
598         super(ArvadosPutTest, self).setUp()
599         run_test_server.authorize_with('active')
600         arv_put.api_client = None
601
602     def tearDown(self):
603         for outbuf in ['main_stdout', 'main_stderr']:
604             if hasattr(self, outbuf):
605                 getattr(self, outbuf).close()
606                 delattr(self, outbuf)
607         super(ArvadosPutTest, self).tearDown()
608
609     def test_version_argument(self):
610         with tutil.redirected_streams(
611                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
612             with self.assertRaises(SystemExit):
613                 self.call_main_with_args(['--version'])
614         self.assertVersionOutput(out, err)
615
616     def test_simple_file_put(self):
617         self.call_main_on_test_file()
618
619     def test_put_with_unwriteable_cache_dir(self):
620         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
621         cachedir = self.make_tmpdir()
622         os.chmod(cachedir, 0o0)
623         arv_put.ResumeCache.CACHE_DIR = cachedir
624         try:
625             self.call_main_on_test_file()
626         finally:
627             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
628             os.chmod(cachedir, 0o700)
629
630     def test_put_with_unwritable_cache_subdir(self):
631         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
632         cachedir = self.make_tmpdir()
633         os.chmod(cachedir, 0o0)
634         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
635         try:
636             self.call_main_on_test_file()
637         finally:
638             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
639             os.chmod(cachedir, 0o700)
640
641     def test_put_block_replication(self):
642         self.call_main_on_test_file()
643         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
644             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
645             self.call_main_on_test_file(['--replication', '1'])
646             self.call_main_on_test_file(['--replication', '4'])
647             self.call_main_on_test_file(['--replication', '5'])
648             self.assertEqual(
649                 [x[-1].get('copies') for x in put_mock.call_args_list],
650                 [1, 4, 5])
651
652     def test_normalize(self):
653         testfile1 = self.make_test_file()
654         testfile2 = self.make_test_file()
655         test_paths = [testfile1.name, testfile2.name]
656         # Reverse-sort the paths, so normalization must change their order.
657         test_paths.sort(reverse=True)
658         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
659                                  test_paths)
660         manifest = self.main_stdout.getvalue()
661         # Assert the second file we specified appears first in the manifest.
662         file_indices = [manifest.find(':' + os.path.basename(path))
663                         for path in test_paths]
664         self.assertGreater(*file_indices)
665
666     def test_error_name_without_collection(self):
667         self.assertRaises(SystemExit, self.call_main_with_args,
668                           ['--name', 'test without Collection',
669                            '--stream', '/dev/null'])
670
671     def test_error_when_project_not_found(self):
672         self.assertRaises(SystemExit,
673                           self.call_main_with_args,
674                           ['--project-uuid', self.Z_UUID])
675
676     def test_error_bad_project_uuid(self):
677         self.assertRaises(SystemExit,
678                           self.call_main_with_args,
679                           ['--project-uuid', self.Z_UUID, '--stream'])
680
681     def test_error_when_excluding_absolute_path(self):
682         tmpdir = self.make_tmpdir()
683         self.assertRaises(SystemExit,
684                           self.call_main_with_args,
685                           ['--exclude', '/some/absolute/path/*',
686                            tmpdir])
687
688     def test_api_error_handling(self):
689         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
690         coll_save_mock.side_effect = arvados.errors.ApiError(
691             fake_httplib2_response(403), b'{}')
692         with mock.patch('arvados.collection.Collection.save_new',
693                         new=coll_save_mock):
694             with self.assertRaises(SystemExit) as exc_test:
695                 self.call_main_with_args(['/dev/null'])
696             self.assertLess(0, exc_test.exception.args[0])
697             self.assertLess(0, coll_save_mock.call_count)
698             self.assertEqual("", self.main_stdout.getvalue())
699
700
701 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
702                             ArvadosBaseTestCase):
703     def _getKeepServerConfig():
704         for config_file, mandatory in [
705                 ['application.yml', False], ['application.default.yml', True]]:
706             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
707                                 "api", "config", config_file)
708             if not mandatory and not os.path.exists(path):
709                 continue
710             with open(path) as f:
711                 rails_config = yaml.load(f.read())
712                 for config_section in ['test', 'common']:
713                     try:
714                         key = rails_config[config_section]["blob_signing_key"]
715                     except (KeyError, TypeError):
716                         pass
717                     else:
718                         return {'blob_signing_key': key,
719                                 'enforce_permissions': True}
720         return {'blog_signing_key': None, 'enforce_permissions': False}
721
722     MAIN_SERVER = {}
723     KEEP_SERVER = _getKeepServerConfig()
724     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
725
726     @classmethod
727     def setUpClass(cls):
728         super(ArvPutIntegrationTest, cls).setUpClass()
729         cls.ENVIRON = os.environ.copy()
730         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
731
732     def datetime_to_hex(self, dt):
733         return hex(int(time.mktime(dt.timetuple())))[2:]
734
735     def setUp(self):
736         super(ArvPutIntegrationTest, self).setUp()
737         arv_put.api_client = None
738
739     def authorize_with(self, token_name):
740         run_test_server.authorize_with(token_name)
741         for v in ["ARVADOS_API_HOST",
742                   "ARVADOS_API_HOST_INSECURE",
743                   "ARVADOS_API_TOKEN"]:
744             self.ENVIRON[v] = arvados.config.settings()[v]
745         arv_put.api_client = arvados.api('v1')
746
747     def current_user(self):
748         return arv_put.api_client.users().current().execute()
749
750     def test_check_real_project_found(self):
751         self.authorize_with('active')
752         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
753                         "did not correctly find test fixture project")
754
755     def test_check_error_finding_nonexistent_uuid(self):
756         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
757         self.authorize_with('active')
758         try:
759             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
760                                                   0)
761         except ValueError as error:
762             self.assertIn(BAD_UUID, str(error))
763         else:
764             self.assertFalse(result, "incorrectly found nonexistent project")
765
766     def test_check_error_finding_nonexistent_project(self):
767         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
768         self.authorize_with('active')
769         with self.assertRaises(apiclient.errors.HttpError):
770             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
771                                                   0)
772
773     def test_short_put_from_stdin(self):
774         # Have to run this as an integration test since arv-put can't
775         # read from the tests' stdin.
776         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
777         # case, because the /proc entry is already gone by the time it tries.
778         pipe = subprocess.Popen(
779             [sys.executable, arv_put.__file__, '--stream'],
780             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
781             stderr=subprocess.STDOUT, env=self.ENVIRON)
782         pipe.stdin.write(b'stdin test\n')
783         pipe.stdin.close()
784         deadline = time.time() + 5
785         while (pipe.poll() is None) and (time.time() < deadline):
786             time.sleep(.1)
787         returncode = pipe.poll()
788         if returncode is None:
789             pipe.terminate()
790             self.fail("arv-put did not PUT from stdin within 5 seconds")
791         elif returncode != 0:
792             sys.stdout.write(pipe.stdout.read())
793             self.fail("arv-put returned exit code {}".format(returncode))
794         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
795                       pipe.stdout.read().decode())
796
797     def test_ArvPutSignedManifest(self):
798         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
799         # the newly created manifest from the API server, testing to confirm
800         # that the block locators in the returned manifest are signed.
801         self.authorize_with('active')
802
803         # Before doing anything, demonstrate that the collection
804         # we're about to create is not present in our test fixture.
805         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
806         with self.assertRaises(apiclient.errors.HttpError):
807             notfound = arv_put.api_client.collections().get(
808                 uuid=manifest_uuid).execute()
809
810         datadir = self.make_tmpdir()
811         with open(os.path.join(datadir, "foo"), "w") as f:
812             f.write("The quick brown fox jumped over the lazy dog")
813         p = subprocess.Popen([sys.executable, arv_put.__file__,
814                               os.path.join(datadir, 'foo')],
815                              stdout=subprocess.PIPE,
816                              stderr=subprocess.PIPE,
817                              env=self.ENVIRON)
818         (out, err) = p.communicate()
819         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
820         self.assertEqual(p.returncode, 0)
821
822         # The manifest text stored in the API server under the same
823         # manifest UUID must use signed locators.
824         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
825         self.assertRegex(
826             c['manifest_text'],
827             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
828
829         os.remove(os.path.join(datadir, "foo"))
830         os.rmdir(datadir)
831
832     def run_and_find_collection(self, text, extra_args=[]):
833         self.authorize_with('active')
834         pipe = subprocess.Popen(
835             [sys.executable, arv_put.__file__] + extra_args,
836             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
837             stderr=subprocess.PIPE, env=self.ENVIRON)
838         stdout, stderr = pipe.communicate(text.encode())
839         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
840         search_key = ('portable_data_hash'
841                       if '--portable-data-hash' in extra_args else 'uuid')
842         collection_list = arvados.api('v1').collections().list(
843             filters=[[search_key, '=', stdout.decode().strip()]]
844         ).execute().get('items', [])
845         self.assertEqual(1, len(collection_list))
846         return collection_list[0]
847
848     def test_expired_token_invalidates_cache(self):
849         self.authorize_with('active')
850         tmpdir = self.make_tmpdir()
851         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
852             f.write('foo')
853         # Upload a directory and get the cache file name
854         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
855                              stdout=subprocess.PIPE,
856                              stderr=subprocess.PIPE,
857                              env=self.ENVIRON)
858         (out, err) = p.communicate()
859         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
860         self.assertEqual(p.returncode, 0)
861         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
862                                    err.decode()).groups()[0]
863         self.assertTrue(os.path.isfile(cache_filepath))
864         # Load the cache file contents and modify the manifest to simulate
865         # an expired access token
866         with open(cache_filepath, 'r') as c:
867             cache = json.load(c)
868         self.assertRegex(cache['manifest'], r'\+A\S+\@')
869         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
870         cache['manifest'] = re.sub(
871             r'\@.*? ',
872             "@{} ".format(self.datetime_to_hex(a_month_ago)),
873             cache['manifest'])
874         with open(cache_filepath, 'w') as c:
875             c.write(json.dumps(cache))
876         # Re-run the upload and expect to get an invalid cache message
877         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
878                              stdout=subprocess.PIPE,
879                              stderr=subprocess.PIPE,
880                              env=self.ENVIRON)
881         (out, err) = p.communicate()
882         self.assertRegex(
883             err.decode(),
884             r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
885         self.assertEqual(p.returncode, 0)
886         # Confirm that the resulting cache is different from the last run.
887         with open(cache_filepath, 'r') as c2:
888             new_cache = json.load(c2)
889         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
890
891     def test_put_collection_with_later_update(self):
892         tmpdir = self.make_tmpdir()
893         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
894             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
895         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
896         self.assertNotEqual(None, col['uuid'])
897         # Add a new file to the directory
898         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
899             f.write('The quick brown fox jumped over the lazy dog')
900         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
901         self.assertEqual(col['uuid'], updated_col['uuid'])
902         # Get the manifest and check that the new file is being included
903         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
904         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
905
906     def test_upload_directory_reference_without_trailing_slash(self):
907         tmpdir1 = self.make_tmpdir()
908         tmpdir2 = self.make_tmpdir()
909         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
910             f.write('This is foo')
911         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
912             f.write('This is not foo')
913         # Upload one directory and one file
914         col = self.run_and_find_collection("", ['--no-progress',
915                                                 tmpdir1,
916                                                 os.path.join(tmpdir2, 'bar')])
917         self.assertNotEqual(None, col['uuid'])
918         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
919         # Check that 'foo' was written inside a subcollection
920         # OTOH, 'bar' should have been directly uploaded on the root collection
921         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
922
923     def test_upload_directory_reference_with_trailing_slash(self):
924         tmpdir1 = self.make_tmpdir()
925         tmpdir2 = self.make_tmpdir()
926         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
927             f.write('This is foo')
928         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
929             f.write('This is not foo')
930         # Upload one directory (with trailing slash) and one file
931         col = self.run_and_find_collection("", ['--no-progress',
932                                                 tmpdir1 + os.sep,
933                                                 os.path.join(tmpdir2, 'bar')])
934         self.assertNotEqual(None, col['uuid'])
935         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
936         # Check that 'foo' and 'bar' were written at the same level
937         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
938
939     def test_put_collection_with_high_redundancy(self):
940         # Write empty data: we're not testing CollectionWriter, just
941         # making sure collections.create tells the API server what our
942         # desired replication level is.
943         collection = self.run_and_find_collection("", ['--replication', '4'])
944         self.assertEqual(4, collection['replication_desired'])
945
946     def test_put_collection_with_default_redundancy(self):
947         collection = self.run_and_find_collection("")
948         self.assertEqual(None, collection['replication_desired'])
949
950     def test_put_collection_with_unnamed_project_link(self):
951         link = self.run_and_find_collection(
952             "Test unnamed collection",
953             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
954         username = pwd.getpwuid(os.getuid()).pw_name
955         self.assertRegex(
956             link['name'],
957             r'^Saved at .* by {}@'.format(re.escape(username)))
958
959     def test_put_collection_with_name_and_no_project(self):
960         link_name = 'Test Collection Link in home project'
961         collection = self.run_and_find_collection(
962             "Test named collection in home project",
963             ['--portable-data-hash', '--name', link_name])
964         self.assertEqual(link_name, collection['name'])
965         my_user_uuid = self.current_user()['uuid']
966         self.assertEqual(my_user_uuid, collection['owner_uuid'])
967
968     def test_put_collection_with_named_project_link(self):
969         link_name = 'Test auto Collection Link'
970         collection = self.run_and_find_collection("Test named collection",
971                                       ['--portable-data-hash',
972                                        '--name', link_name,
973                                        '--project-uuid', self.PROJECT_UUID])
974         self.assertEqual(link_name, collection['name'])
975
976     def test_exclude_filename_pattern(self):
977         tmpdir = self.make_tmpdir()
978         tmpsubdir = os.path.join(tmpdir, 'subdir')
979         os.mkdir(tmpsubdir)
980         for fname in ['file1', 'file2', 'file3']:
981             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
982                 f.write("This is %s" % fname)
983             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
984                 f.write("This is %s" % fname)
985         col = self.run_and_find_collection("", ['--no-progress',
986                                                 '--exclude', '*2.txt',
987                                                 '--exclude', 'file3.*',
988                                                  tmpdir])
989         self.assertNotEqual(None, col['uuid'])
990         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
991         # None of the file2.txt & file3.txt should have been uploaded
992         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
993         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
994         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
995
996     def test_exclude_filepath_pattern(self):
997         tmpdir = self.make_tmpdir()
998         tmpsubdir = os.path.join(tmpdir, 'subdir')
999         os.mkdir(tmpsubdir)
1000         for fname in ['file1', 'file2', 'file3']:
1001             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1002                 f.write("This is %s" % fname)
1003             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1004                 f.write("This is %s" % fname)
1005         col = self.run_and_find_collection("", ['--no-progress',
1006                                                 '--exclude', 'subdir/*2.txt',
1007                                                 '--exclude', './file1.*',
1008                                                  tmpdir])
1009         self.assertNotEqual(None, col['uuid'])
1010         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1011         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1012         self.assertNotRegex(c['manifest_text'],
1013                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1014         self.assertNotRegex(c['manifest_text'],
1015                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1016         self.assertRegex(c['manifest_text'],
1017                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1018         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1019
1020     def test_silent_mode_no_errors(self):
1021         self.authorize_with('active')
1022         tmpdir = self.make_tmpdir()
1023         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1024             f.write('hello world')
1025         pipe = subprocess.Popen(
1026             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1027             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1028             stderr=subprocess.PIPE, env=self.ENVIRON)
1029         stdout, stderr = pipe.communicate()
1030         # No console output should occur on normal operations
1031         self.assertNotRegex(stderr.decode(), r'.+')
1032         self.assertNotRegex(stdout.decode(), r'.+')
1033
1034     def test_silent_mode_does_not_avoid_error_messages(self):
1035         self.authorize_with('active')
1036         pipe = subprocess.Popen(
1037             [sys.executable, arv_put.__file__] + ['--silent',
1038                                                   '/path/not/existant'],
1039             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1040             stderr=subprocess.PIPE, env=self.ENVIRON)
1041         stdout, stderr = pipe.communicate()
1042         # Error message should be displayed when errors happen
1043         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1044         self.assertNotRegex(stdout.decode(), r'.+')
1045
1046
1047 if __name__ == '__main__':
1048     unittest.main()