11789: Merge branch 'master' into 11789-arvput-exclude-flag
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 import apiclient
12 import mock
13 import os
14 import pwd
15 import re
16 import shutil
17 import subprocess
18 import sys
19 import tempfile
20 import time
21 import unittest
22 import yaml
23 import threading
24 import hashlib
25 import random
26 import uuid
27
28 import arvados
29 import arvados.commands.put as arv_put
30 from . import arvados_testutil as tutil
31
32 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
33 from . import run_test_server
34
35 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
36     CACHE_ARGSET = [
37         [],
38         ['/dev/null'],
39         ['/dev/null', '--filename', 'empty'],
40         ['/tmp']
41         ]
42
43     def tearDown(self):
44         super(ArvadosPutResumeCacheTest, self).tearDown()
45         try:
46             self.last_cache.destroy()
47         except AttributeError:
48             pass
49
50     def cache_path_from_arglist(self, arglist):
51         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
52
53     def test_cache_names_stable(self):
54         for argset in self.CACHE_ARGSET:
55             self.assertEqual(self.cache_path_from_arglist(argset),
56                               self.cache_path_from_arglist(argset),
57                               "cache name changed for {}".format(argset))
58
59     def test_cache_names_unique(self):
60         results = []
61         for argset in self.CACHE_ARGSET:
62             path = self.cache_path_from_arglist(argset)
63             self.assertNotIn(path, results)
64             results.append(path)
65
66     def test_cache_names_simple(self):
67         # The goal here is to make sure the filename doesn't use characters
68         # reserved by the filesystem.  Feel free to adjust this regexp as
69         # long as it still does that.
70         bad_chars = re.compile(r'[^-\.\w]')
71         for argset in self.CACHE_ARGSET:
72             path = self.cache_path_from_arglist(argset)
73             self.assertFalse(bad_chars.search(os.path.basename(path)),
74                              "path too exotic: {}".format(path))
75
76     def test_cache_names_ignore_argument_order(self):
77         self.assertEqual(
78             self.cache_path_from_arglist(['a', 'b', 'c']),
79             self.cache_path_from_arglist(['c', 'a', 'b']))
80         self.assertEqual(
81             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
82             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
83
84     def test_cache_names_differ_for_similar_paths(self):
85         # This test needs names at / that don't exist on the real filesystem.
86         self.assertNotEqual(
87             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
88             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
89
90     def test_cache_names_ignore_irrelevant_arguments(self):
91         # Workaround: parse_arguments bails on --filename with a directory.
92         path1 = self.cache_path_from_arglist(['/tmp'])
93         args = arv_put.parse_arguments(['/tmp'])
94         args.filename = 'tmp'
95         path2 = arv_put.ResumeCache.make_path(args)
96         self.assertEqual(path1, path2,
97                          "cache path considered --filename for directory")
98         self.assertEqual(
99             self.cache_path_from_arglist(['-']),
100             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
101             "cache path considered --max-manifest-depth for file")
102
103     def test_cache_names_treat_negative_manifest_depths_identically(self):
104         base_args = ['/tmp', '--max-manifest-depth']
105         self.assertEqual(
106             self.cache_path_from_arglist(base_args + ['-1']),
107             self.cache_path_from_arglist(base_args + ['-2']))
108
109     def test_cache_names_treat_stdin_consistently(self):
110         self.assertEqual(
111             self.cache_path_from_arglist(['-', '--filename', 'test']),
112             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
113
114     def test_cache_names_identical_for_synonymous_names(self):
115         self.assertEqual(
116             self.cache_path_from_arglist(['.']),
117             self.cache_path_from_arglist([os.path.realpath('.')]))
118         testdir = self.make_tmpdir()
119         looplink = os.path.join(testdir, 'loop')
120         os.symlink(testdir, looplink)
121         self.assertEqual(
122             self.cache_path_from_arglist([testdir]),
123             self.cache_path_from_arglist([looplink]))
124
125     def test_cache_names_different_by_api_host(self):
126         config = arvados.config.settings()
127         orig_host = config.get('ARVADOS_API_HOST')
128         try:
129             name1 = self.cache_path_from_arglist(['.'])
130             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
131             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
132         finally:
133             if orig_host is None:
134                 del config['ARVADOS_API_HOST']
135             else:
136                 config['ARVADOS_API_HOST'] = orig_host
137
138     @mock.patch('arvados.keep.KeepClient.head')
139     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
140         keep_client_head.side_effect = [True]
141         thing = {}
142         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
143         with tempfile.NamedTemporaryFile() as cachefile:
144             self.last_cache = arv_put.ResumeCache(cachefile.name)
145         self.last_cache.save(thing)
146         self.last_cache.close()
147         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
148         self.assertNotEqual(None, resume_cache)
149
150     @mock.patch('arvados.keep.KeepClient.head')
151     def test_resume_cache_with_finished_streams(self, keep_client_head):
152         keep_client_head.side_effect = [True]
153         thing = {}
154         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
155         with tempfile.NamedTemporaryFile() as cachefile:
156             self.last_cache = arv_put.ResumeCache(cachefile.name)
157         self.last_cache.save(thing)
158         self.last_cache.close()
159         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
160         self.assertNotEqual(None, resume_cache)
161
162     @mock.patch('arvados.keep.KeepClient.head')
163     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
164         keep_client_head.side_effect = Exception('Locator not found')
165         thing = {}
166         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
167         with tempfile.NamedTemporaryFile() as cachefile:
168             self.last_cache = arv_put.ResumeCache(cachefile.name)
169         self.last_cache.save(thing)
170         self.last_cache.close()
171         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
172         self.assertNotEqual(None, resume_cache)
173         self.assertRaises(None, resume_cache.check_cache())
174
175     def test_basic_cache_storage(self):
176         thing = ['test', 'list']
177         with tempfile.NamedTemporaryFile() as cachefile:
178             self.last_cache = arv_put.ResumeCache(cachefile.name)
179         self.last_cache.save(thing)
180         self.assertEqual(thing, self.last_cache.load())
181
182     def test_empty_cache(self):
183         with tempfile.NamedTemporaryFile() as cachefile:
184             cache = arv_put.ResumeCache(cachefile.name)
185         self.assertRaises(ValueError, cache.load)
186
187     def test_cache_persistent(self):
188         thing = ['test', 'list']
189         path = os.path.join(self.make_tmpdir(), 'cache')
190         cache = arv_put.ResumeCache(path)
191         cache.save(thing)
192         cache.close()
193         self.last_cache = arv_put.ResumeCache(path)
194         self.assertEqual(thing, self.last_cache.load())
195
196     def test_multiple_cache_writes(self):
197         thing = ['short', 'list']
198         with tempfile.NamedTemporaryFile() as cachefile:
199             self.last_cache = arv_put.ResumeCache(cachefile.name)
200         # Start writing an object longer than the one we test, to make
201         # sure the cache file gets truncated.
202         self.last_cache.save(['long', 'long', 'list'])
203         self.last_cache.save(thing)
204         self.assertEqual(thing, self.last_cache.load())
205
206     def test_cache_is_locked(self):
207         with tempfile.NamedTemporaryFile() as cachefile:
208             cache = arv_put.ResumeCache(cachefile.name)
209             self.assertRaises(arv_put.ResumeCacheConflict,
210                               arv_put.ResumeCache, cachefile.name)
211
212     def test_cache_stays_locked(self):
213         with tempfile.NamedTemporaryFile() as cachefile:
214             self.last_cache = arv_put.ResumeCache(cachefile.name)
215             path = cachefile.name
216         self.last_cache.save('test')
217         self.assertRaises(arv_put.ResumeCacheConflict,
218                           arv_put.ResumeCache, path)
219
220     def test_destroy_cache(self):
221         cachefile = tempfile.NamedTemporaryFile(delete=False)
222         try:
223             cache = arv_put.ResumeCache(cachefile.name)
224             cache.save('test')
225             cache.destroy()
226             try:
227                 arv_put.ResumeCache(cachefile.name)
228             except arv_put.ResumeCacheConflict:
229                 self.fail("could not load cache after destroying it")
230             self.assertRaises(ValueError, cache.load)
231         finally:
232             if os.path.exists(cachefile.name):
233                 os.unlink(cachefile.name)
234
235     def test_restart_cache(self):
236         path = os.path.join(self.make_tmpdir(), 'cache')
237         cache = arv_put.ResumeCache(path)
238         cache.save('test')
239         cache.restart()
240         self.assertRaises(ValueError, cache.load)
241         self.assertRaises(arv_put.ResumeCacheConflict,
242                           arv_put.ResumeCache, path)
243
244
245 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
246                           ArvadosBaseTestCase):
247
248     def setUp(self):
249         super(ArvPutUploadJobTest, self).setUp()
250         run_test_server.authorize_with('active')
251         # Temp files creation
252         self.tempdir = tempfile.mkdtemp()
253         subdir = os.path.join(self.tempdir, 'subdir')
254         os.mkdir(subdir)
255         data = "x" * 1024 # 1 KB
256         for i in range(1, 5):
257             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
258                 f.write(data * i)
259         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
260             f.write(data * 5)
261         # Large temp file for resume test
262         _, self.large_file_name = tempfile.mkstemp()
263         fileobj = open(self.large_file_name, 'w')
264         # Make sure to write just a little more than one block
265         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
266             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
267             fileobj.write(data)
268         fileobj.close()
269         # Temp dir containing small files to be repacked
270         self.small_files_dir = tempfile.mkdtemp()
271         data = 'y' * 1024 * 1024 # 1 MB
272         for i in range(1, 70):
273             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
274                 f.write(data + str(i))
275         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
276         # Temp dir to hold a symlink to other temp dir
277         self.tempdir_with_symlink = tempfile.mkdtemp()
278         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
279         os.symlink(os.path.join(self.tempdir, '1'),
280                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
281
282     def tearDown(self):
283         super(ArvPutUploadJobTest, self).tearDown()
284         shutil.rmtree(self.tempdir)
285         os.unlink(self.large_file_name)
286         shutil.rmtree(self.small_files_dir)
287         shutil.rmtree(self.tempdir_with_symlink)
288
289     def test_symlinks_are_followed_by_default(self):
290         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
291         cwriter.start(save_collection=False)
292         self.assertIn('linkeddir', cwriter.manifest_text())
293         self.assertIn('linkedfile', cwriter.manifest_text())
294         cwriter.destroy_cache()
295
296     def test_symlinks_are_not_followed_when_requested(self):
297         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
298                                           follow_links=False)
299         cwriter.start(save_collection=False)
300         self.assertNotIn('linkeddir', cwriter.manifest_text())
301         self.assertNotIn('linkedfile', cwriter.manifest_text())
302         cwriter.destroy_cache()
303
304     def test_passing_nonexistant_path_raise_exception(self):
305         uuid_str = str(uuid.uuid4())
306         with self.assertRaises(arv_put.PathDoesNotExistError):
307             cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
308
309     def test_writer_works_without_cache(self):
310         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
311         cwriter.start(save_collection=False)
312         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
313
314     def test_writer_works_with_cache(self):
315         with tempfile.NamedTemporaryFile() as f:
316             f.write(b'foo')
317             f.flush()
318             cwriter = arv_put.ArvPutUploadJob([f.name])
319             cwriter.start(save_collection=False)
320             self.assertEqual(0, cwriter.bytes_skipped)
321             self.assertEqual(3, cwriter.bytes_written)
322             # Don't destroy the cache, and start another upload
323             cwriter_new = arv_put.ArvPutUploadJob([f.name])
324             cwriter_new.start(save_collection=False)
325             cwriter_new.destroy_cache()
326             self.assertEqual(3, cwriter_new.bytes_skipped)
327             self.assertEqual(3, cwriter_new.bytes_written)
328
329     def make_progress_tester(self):
330         progression = []
331         def record_func(written, expected):
332             progression.append((written, expected))
333         return progression, record_func
334
335     def test_progress_reporting(self):
336         with tempfile.NamedTemporaryFile() as f:
337             f.write(b'foo')
338             f.flush()
339             for expect_count in (None, 8):
340                 progression, reporter = self.make_progress_tester()
341                 cwriter = arv_put.ArvPutUploadJob([f.name],
342                                                   reporter=reporter)
343                 cwriter.bytes_expected = expect_count
344                 cwriter.start(save_collection=False)
345                 cwriter.destroy_cache()
346                 self.assertIn((3, expect_count), progression)
347
348     def test_writer_upload_directory(self):
349         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
350         cwriter.start(save_collection=False)
351         cwriter.destroy_cache()
352         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
353
354     def test_resume_large_file_upload(self):
355         def wrapped_write(*args, **kwargs):
356             data = args[1]
357             # Exit only on last block
358             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
359                 # Simulate a checkpoint before quitting. Ensure block commit.
360                 self.writer._update(final=True)
361                 raise SystemExit("Simulated error")
362             return self.arvfile_write(*args, **kwargs)
363
364         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
365                         autospec=True) as mocked_write:
366             mocked_write.side_effect = wrapped_write
367             writer = arv_put.ArvPutUploadJob([self.large_file_name],
368                                              replication_desired=1)
369             # We'll be accessing from inside the wrapper
370             self.writer = writer
371             with self.assertRaises(SystemExit):
372                 writer.start(save_collection=False)
373             # Confirm that the file was partially uploaded
374             self.assertGreater(writer.bytes_written, 0)
375             self.assertLess(writer.bytes_written,
376                             os.path.getsize(self.large_file_name))
377         # Retry the upload
378         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
379                                           replication_desired=1)
380         writer2.start(save_collection=False)
381         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
382                          os.path.getsize(self.large_file_name))
383         writer2.destroy_cache()
384         del(self.writer)
385
386     # Test for bug #11002
387     def test_graceful_exit_while_repacking_small_blocks(self):
388         def wrapped_commit(*args, **kwargs):
389             raise SystemExit("Simulated error")
390
391         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
392                         autospec=True) as mocked_commit:
393             mocked_commit.side_effect = wrapped_commit
394             # Upload a little more than 1 block, wrapped_commit will make the first block
395             # commit to fail.
396             # arv-put should not exit with an exception by trying to commit the collection
397             # as it's in an inconsistent state.
398             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
399                                              replication_desired=1)
400             try:
401                 with self.assertRaises(SystemExit):
402                     writer.start(save_collection=False)
403             except arvados.arvfile.UnownedBlockError:
404                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
405         writer.destroy_cache()
406
407     def test_no_resume_when_asked(self):
408         def wrapped_write(*args, **kwargs):
409             data = args[1]
410             # Exit only on last block
411             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
412                 # Simulate a checkpoint before quitting.
413                 self.writer._update()
414                 raise SystemExit("Simulated error")
415             return self.arvfile_write(*args, **kwargs)
416
417         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
418                         autospec=True) as mocked_write:
419             mocked_write.side_effect = wrapped_write
420             writer = arv_put.ArvPutUploadJob([self.large_file_name],
421                                              replication_desired=1)
422             # We'll be accessing from inside the wrapper
423             self.writer = writer
424             with self.assertRaises(SystemExit):
425                 writer.start(save_collection=False)
426             # Confirm that the file was partially uploaded
427             self.assertGreater(writer.bytes_written, 0)
428             self.assertLess(writer.bytes_written,
429                             os.path.getsize(self.large_file_name))
430         # Retry the upload, this time without resume
431         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
432                                           replication_desired=1,
433                                           resume=False)
434         writer2.start(save_collection=False)
435         self.assertEqual(writer2.bytes_skipped, 0)
436         self.assertEqual(writer2.bytes_written,
437                          os.path.getsize(self.large_file_name))
438         writer2.destroy_cache()
439         del(self.writer)
440
441     def test_no_resume_when_no_cache(self):
442         def wrapped_write(*args, **kwargs):
443             data = args[1]
444             # Exit only on last block
445             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
446                 # Simulate a checkpoint before quitting.
447                 self.writer._update()
448                 raise SystemExit("Simulated error")
449             return self.arvfile_write(*args, **kwargs)
450
451         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
452                         autospec=True) as mocked_write:
453             mocked_write.side_effect = wrapped_write
454             writer = arv_put.ArvPutUploadJob([self.large_file_name],
455                                              replication_desired=1)
456             # We'll be accessing from inside the wrapper
457             self.writer = writer
458             with self.assertRaises(SystemExit):
459                 writer.start(save_collection=False)
460             # Confirm that the file was partially uploaded
461             self.assertGreater(writer.bytes_written, 0)
462             self.assertLess(writer.bytes_written,
463                             os.path.getsize(self.large_file_name))
464         # Retry the upload, this time without cache usage
465         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
466                                           replication_desired=1,
467                                           resume=False,
468                                           use_cache=False)
469         writer2.start(save_collection=False)
470         self.assertEqual(writer2.bytes_skipped, 0)
471         self.assertEqual(writer2.bytes_written,
472                          os.path.getsize(self.large_file_name))
473         writer2.destroy_cache()
474         del(self.writer)
475
476     def test_dry_run_feature(self):
477         def wrapped_write(*args, **kwargs):
478             data = args[1]
479             # Exit only on last block
480             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
481                 # Simulate a checkpoint before quitting.
482                 self.writer._update()
483                 raise SystemExit("Simulated error")
484             return self.arvfile_write(*args, **kwargs)
485
486         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
487                         autospec=True) as mocked_write:
488             mocked_write.side_effect = wrapped_write
489             writer = arv_put.ArvPutUploadJob([self.large_file_name],
490                                              replication_desired=1)
491             # We'll be accessing from inside the wrapper
492             self.writer = writer
493             with self.assertRaises(SystemExit):
494                 writer.start(save_collection=False)
495             # Confirm that the file was partially uploaded
496             self.assertGreater(writer.bytes_written, 0)
497             self.assertLess(writer.bytes_written,
498                             os.path.getsize(self.large_file_name))
499         with self.assertRaises(arv_put.ArvPutUploadIsPending):
500             # Retry the upload using dry_run to check if there is a pending upload
501             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
502                                               replication_desired=1,
503                                               dry_run=True)
504         # Complete the pending upload
505         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
506                                           replication_desired=1)
507         writer3.start(save_collection=False)
508         with self.assertRaises(arv_put.ArvPutUploadNotPending):
509             # Confirm there's no pending upload with dry_run=True
510             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
511                                               replication_desired=1,
512                                               dry_run=True)
513         # Test obvious cases
514         with self.assertRaises(arv_put.ArvPutUploadIsPending):
515             arv_put.ArvPutUploadJob([self.large_file_name],
516                                     replication_desired=1,
517                                     dry_run=True,
518                                     resume=False,
519                                     use_cache=False)
520         with self.assertRaises(arv_put.ArvPutUploadIsPending):
521             arv_put.ArvPutUploadJob([self.large_file_name],
522                                     replication_desired=1,
523                                     dry_run=True,
524                                     resume=False)
525         del(self.writer)
526
527 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
528     TEST_SIZE = os.path.getsize(__file__)
529
530     def test_expected_bytes_for_file(self):
531         writer = arv_put.ArvPutUploadJob([__file__])
532         self.assertEqual(self.TEST_SIZE,
533                          writer.bytes_expected)
534
535     def test_expected_bytes_for_tree(self):
536         tree = self.make_tmpdir()
537         shutil.copyfile(__file__, os.path.join(tree, 'one'))
538         shutil.copyfile(__file__, os.path.join(tree, 'two'))
539
540         writer = arv_put.ArvPutUploadJob([tree])
541         self.assertEqual(self.TEST_SIZE * 2,
542                          writer.bytes_expected)
543         writer = arv_put.ArvPutUploadJob([tree, __file__])
544         self.assertEqual(self.TEST_SIZE * 3,
545                          writer.bytes_expected)
546
547     def test_expected_bytes_for_device(self):
548         writer = arv_put.ArvPutUploadJob(['/dev/null'])
549         self.assertIsNone(writer.bytes_expected)
550         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
551         self.assertIsNone(writer.bytes_expected)
552
553
554 class ArvadosPutReportTest(ArvadosBaseTestCase):
555     def test_machine_progress(self):
556         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
557             expect = ": {} written {} total\n".format(
558                 count, -1 if (total is None) else total)
559             self.assertTrue(
560                 arv_put.machine_progress(count, total).endswith(expect))
561
562     def test_known_human_progress(self):
563         for count, total in [(0, 1), (2, 4), (45, 60)]:
564             expect = '{:.1%}'.format(1.0*count/total)
565             actual = arv_put.human_progress(count, total)
566             self.assertTrue(actual.startswith('\r'))
567             self.assertIn(expect, actual)
568
569     def test_unknown_human_progress(self):
570         for count in [1, 20, 300, 4000, 50000]:
571             self.assertTrue(re.search(r'\b{}\b'.format(count),
572                                       arv_put.human_progress(count, None)))
573
574
575 class ArvadosPutTest(run_test_server.TestCaseWithServers,
576                      ArvadosBaseTestCase,
577                      tutil.VersionChecker):
578     MAIN_SERVER = {}
579     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
580
581     def call_main_with_args(self, args):
582         self.main_stdout = tutil.StringIO()
583         self.main_stderr = tutil.StringIO()
584         return arv_put.main(args, self.main_stdout, self.main_stderr)
585
586     def call_main_on_test_file(self, args=[]):
587         with self.make_test_file() as testfile:
588             path = testfile.name
589             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
590         self.assertTrue(
591             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
592                                         '098f6bcd4621d373cade4e832627b4f6')),
593             "did not find file stream in Keep store")
594
595     def setUp(self):
596         super(ArvadosPutTest, self).setUp()
597         run_test_server.authorize_with('active')
598         arv_put.api_client = None
599
600     def tearDown(self):
601         for outbuf in ['main_stdout', 'main_stderr']:
602             if hasattr(self, outbuf):
603                 getattr(self, outbuf).close()
604                 delattr(self, outbuf)
605         super(ArvadosPutTest, self).tearDown()
606
607     def test_version_argument(self):
608         with tutil.redirected_streams(
609                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
610             with self.assertRaises(SystemExit):
611                 self.call_main_with_args(['--version'])
612         self.assertVersionOutput(out, err)
613
614     def test_simple_file_put(self):
615         self.call_main_on_test_file()
616
617     def test_put_with_unwriteable_cache_dir(self):
618         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
619         cachedir = self.make_tmpdir()
620         os.chmod(cachedir, 0o0)
621         arv_put.ResumeCache.CACHE_DIR = cachedir
622         try:
623             self.call_main_on_test_file()
624         finally:
625             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
626             os.chmod(cachedir, 0o700)
627
628     def test_put_with_unwritable_cache_subdir(self):
629         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
630         cachedir = self.make_tmpdir()
631         os.chmod(cachedir, 0o0)
632         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
633         try:
634             self.call_main_on_test_file()
635         finally:
636             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
637             os.chmod(cachedir, 0o700)
638
639     def test_put_block_replication(self):
640         self.call_main_on_test_file()
641         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
642             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
643             self.call_main_on_test_file(['--replication', '1'])
644             self.call_main_on_test_file(['--replication', '4'])
645             self.call_main_on_test_file(['--replication', '5'])
646             self.assertEqual(
647                 [x[-1].get('copies') for x in put_mock.call_args_list],
648                 [1, 4, 5])
649
650     def test_normalize(self):
651         testfile1 = self.make_test_file()
652         testfile2 = self.make_test_file()
653         test_paths = [testfile1.name, testfile2.name]
654         # Reverse-sort the paths, so normalization must change their order.
655         test_paths.sort(reverse=True)
656         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
657                                  test_paths)
658         manifest = self.main_stdout.getvalue()
659         # Assert the second file we specified appears first in the manifest.
660         file_indices = [manifest.find(':' + os.path.basename(path))
661                         for path in test_paths]
662         self.assertGreater(*file_indices)
663
664     def test_error_name_without_collection(self):
665         self.assertRaises(SystemExit, self.call_main_with_args,
666                           ['--name', 'test without Collection',
667                            '--stream', '/dev/null'])
668
669     def test_error_when_project_not_found(self):
670         self.assertRaises(SystemExit,
671                           self.call_main_with_args,
672                           ['--project-uuid', self.Z_UUID])
673
674     def test_error_bad_project_uuid(self):
675         self.assertRaises(SystemExit,
676                           self.call_main_with_args,
677                           ['--project-uuid', self.Z_UUID, '--stream'])
678
679     def test_error_when_excluding_absolute_path(self):
680         tmpdir = self.make_tmpdir()
681         self.assertRaises(SystemExit,
682                           self.call_main_with_args,
683                           ['--exclude', '/some/absolute/path/*',
684                            tmpdir])
685
686     def test_api_error_handling(self):
687         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
688         coll_save_mock.side_effect = arvados.errors.ApiError(
689             fake_httplib2_response(403), b'{}')
690         with mock.patch('arvados.collection.Collection.save_new',
691                         new=coll_save_mock):
692             with self.assertRaises(SystemExit) as exc_test:
693                 self.call_main_with_args(['/dev/null'])
694             self.assertLess(0, exc_test.exception.args[0])
695             self.assertLess(0, coll_save_mock.call_count)
696             self.assertEqual("", self.main_stdout.getvalue())
697
698
699 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
700                             ArvadosBaseTestCase):
701     def _getKeepServerConfig():
702         for config_file, mandatory in [
703                 ['application.yml', False], ['application.default.yml', True]]:
704             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
705                                 "api", "config", config_file)
706             if not mandatory and not os.path.exists(path):
707                 continue
708             with open(path) as f:
709                 rails_config = yaml.load(f.read())
710                 for config_section in ['test', 'common']:
711                     try:
712                         key = rails_config[config_section]["blob_signing_key"]
713                     except (KeyError, TypeError):
714                         pass
715                     else:
716                         return {'blob_signing_key': key,
717                                 'enforce_permissions': True}
718         return {'blog_signing_key': None, 'enforce_permissions': False}
719
720     MAIN_SERVER = {}
721     KEEP_SERVER = _getKeepServerConfig()
722     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
723
724     @classmethod
725     def setUpClass(cls):
726         super(ArvPutIntegrationTest, cls).setUpClass()
727         cls.ENVIRON = os.environ.copy()
728         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
729
730     def setUp(self):
731         super(ArvPutIntegrationTest, self).setUp()
732         arv_put.api_client = None
733
734     def authorize_with(self, token_name):
735         run_test_server.authorize_with(token_name)
736         for v in ["ARVADOS_API_HOST",
737                   "ARVADOS_API_HOST_INSECURE",
738                   "ARVADOS_API_TOKEN"]:
739             self.ENVIRON[v] = arvados.config.settings()[v]
740         arv_put.api_client = arvados.api('v1')
741
742     def current_user(self):
743         return arv_put.api_client.users().current().execute()
744
745     def test_check_real_project_found(self):
746         self.authorize_with('active')
747         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
748                         "did not correctly find test fixture project")
749
750     def test_check_error_finding_nonexistent_uuid(self):
751         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
752         self.authorize_with('active')
753         try:
754             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
755                                                   0)
756         except ValueError as error:
757             self.assertIn(BAD_UUID, str(error))
758         else:
759             self.assertFalse(result, "incorrectly found nonexistent project")
760
761     def test_check_error_finding_nonexistent_project(self):
762         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
763         self.authorize_with('active')
764         with self.assertRaises(apiclient.errors.HttpError):
765             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
766                                                   0)
767
768     def test_short_put_from_stdin(self):
769         # Have to run this as an integration test since arv-put can't
770         # read from the tests' stdin.
771         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
772         # case, because the /proc entry is already gone by the time it tries.
773         pipe = subprocess.Popen(
774             [sys.executable, arv_put.__file__, '--stream'],
775             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
776             stderr=subprocess.STDOUT, env=self.ENVIRON)
777         pipe.stdin.write(b'stdin test\n')
778         pipe.stdin.close()
779         deadline = time.time() + 5
780         while (pipe.poll() is None) and (time.time() < deadline):
781             time.sleep(.1)
782         returncode = pipe.poll()
783         if returncode is None:
784             pipe.terminate()
785             self.fail("arv-put did not PUT from stdin within 5 seconds")
786         elif returncode != 0:
787             sys.stdout.write(pipe.stdout.read())
788             self.fail("arv-put returned exit code {}".format(returncode))
789         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
790                       pipe.stdout.read().decode())
791
792     def test_ArvPutSignedManifest(self):
793         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
794         # the newly created manifest from the API server, testing to confirm
795         # that the block locators in the returned manifest are signed.
796         self.authorize_with('active')
797
798         # Before doing anything, demonstrate that the collection
799         # we're about to create is not present in our test fixture.
800         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
801         with self.assertRaises(apiclient.errors.HttpError):
802             notfound = arv_put.api_client.collections().get(
803                 uuid=manifest_uuid).execute()
804
805         datadir = self.make_tmpdir()
806         with open(os.path.join(datadir, "foo"), "w") as f:
807             f.write("The quick brown fox jumped over the lazy dog")
808         p = subprocess.Popen([sys.executable, arv_put.__file__,
809                               os.path.join(datadir, 'foo')],
810                              stdout=subprocess.PIPE,
811                              stderr=subprocess.PIPE,
812                              env=self.ENVIRON)
813         (out, err) = p.communicate()
814         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
815         self.assertEqual(p.returncode, 0)
816
817         # The manifest text stored in the API server under the same
818         # manifest UUID must use signed locators.
819         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
820         self.assertRegex(
821             c['manifest_text'],
822             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
823
824         os.remove(os.path.join(datadir, "foo"))
825         os.rmdir(datadir)
826
827     def run_and_find_collection(self, text, extra_args=[]):
828         self.authorize_with('active')
829         pipe = subprocess.Popen(
830             [sys.executable, arv_put.__file__] + extra_args,
831             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
832             stderr=subprocess.PIPE, env=self.ENVIRON)
833         stdout, stderr = pipe.communicate(text.encode())
834         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
835         search_key = ('portable_data_hash'
836                       if '--portable-data-hash' in extra_args else 'uuid')
837         collection_list = arvados.api('v1').collections().list(
838             filters=[[search_key, '=', stdout.decode().strip()]]
839         ).execute().get('items', [])
840         self.assertEqual(1, len(collection_list))
841         return collection_list[0]
842
843     def test_put_collection_with_later_update(self):
844         tmpdir = self.make_tmpdir()
845         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
846             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
847         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
848         self.assertNotEqual(None, col['uuid'])
849         # Add a new file to the directory
850         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
851             f.write('The quick brown fox jumped over the lazy dog')
852         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
853         self.assertEqual(col['uuid'], updated_col['uuid'])
854         # Get the manifest and check that the new file is being included
855         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
856         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
857
858     def test_upload_directory_reference_without_trailing_slash(self):
859         tmpdir1 = self.make_tmpdir()
860         tmpdir2 = self.make_tmpdir()
861         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
862             f.write('This is foo')
863         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
864             f.write('This is not foo')
865         # Upload one directory and one file
866         col = self.run_and_find_collection("", ['--no-progress',
867                                                 tmpdir1,
868                                                 os.path.join(tmpdir2, 'bar')])
869         self.assertNotEqual(None, col['uuid'])
870         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
871         # Check that 'foo' was written inside a subcollection
872         # OTOH, 'bar' should have been directly uploaded on the root collection
873         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
874
875     def test_upload_directory_reference_with_trailing_slash(self):
876         tmpdir1 = self.make_tmpdir()
877         tmpdir2 = self.make_tmpdir()
878         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
879             f.write('This is foo')
880         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
881             f.write('This is not foo')
882         # Upload one directory (with trailing slash) and one file
883         col = self.run_and_find_collection("", ['--no-progress',
884                                                 tmpdir1 + os.sep,
885                                                 os.path.join(tmpdir2, 'bar')])
886         self.assertNotEqual(None, col['uuid'])
887         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
888         # Check that 'foo' and 'bar' were written at the same level
889         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
890
891     def test_put_collection_with_high_redundancy(self):
892         # Write empty data: we're not testing CollectionWriter, just
893         # making sure collections.create tells the API server what our
894         # desired replication level is.
895         collection = self.run_and_find_collection("", ['--replication', '4'])
896         self.assertEqual(4, collection['replication_desired'])
897
898     def test_put_collection_with_default_redundancy(self):
899         collection = self.run_and_find_collection("")
900         self.assertEqual(None, collection['replication_desired'])
901
902     def test_put_collection_with_unnamed_project_link(self):
903         link = self.run_and_find_collection(
904             "Test unnamed collection",
905             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
906         username = pwd.getpwuid(os.getuid()).pw_name
907         self.assertRegex(
908             link['name'],
909             r'^Saved at .* by {}@'.format(re.escape(username)))
910
911     def test_put_collection_with_name_and_no_project(self):
912         link_name = 'Test Collection Link in home project'
913         collection = self.run_and_find_collection(
914             "Test named collection in home project",
915             ['--portable-data-hash', '--name', link_name])
916         self.assertEqual(link_name, collection['name'])
917         my_user_uuid = self.current_user()['uuid']
918         self.assertEqual(my_user_uuid, collection['owner_uuid'])
919
920     def test_put_collection_with_named_project_link(self):
921         link_name = 'Test auto Collection Link'
922         collection = self.run_and_find_collection("Test named collection",
923                                       ['--portable-data-hash',
924                                        '--name', link_name,
925                                        '--project-uuid', self.PROJECT_UUID])
926         self.assertEqual(link_name, collection['name'])
927
928     def test_exclude_filename_pattern(self):
929         tmpdir = self.make_tmpdir()
930         tmpsubdir = os.path.join(tmpdir, 'subdir')
931         os.mkdir(tmpsubdir)
932         for fname in ['file1', 'file2', 'file3']:
933             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
934                 f.write("This is %s" % fname)
935             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
936                 f.write("This is %s" % fname)
937         col = self.run_and_find_collection("", ['--no-progress',
938                                                 '--exclude', '*2.txt',
939                                                 '--exclude', 'file3.*',
940                                                  tmpdir])
941         self.assertNotEqual(None, col['uuid'])
942         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
943         # None of the file2.txt & file3.txt should have been uploaded
944         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
945         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
946         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
947
948     def test_exclude_filepath_pattern(self):
949         tmpdir = self.make_tmpdir()
950         tmpsubdir = os.path.join(tmpdir, 'subdir')
951         os.mkdir(tmpsubdir)
952         for fname in ['file1', 'file2', 'file3']:
953             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
954                 f.write("This is %s" % fname)
955             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
956                 f.write("This is %s" % fname)
957         col = self.run_and_find_collection("", ['--no-progress',
958                                                 '--exclude', 'subdir/*2.txt',
959                                                  tmpdir])
960         self.assertNotEqual(None, col['uuid'])
961         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
962         # Only tmpdir/file2.txt should have been uploaded
963         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
964         self.assertRegex(c['manifest_text'],
965                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
966         self.assertNotRegex(c['manifest_text'],
967                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
968         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
969
970
971 if __name__ == '__main__':
972     unittest.main()