Merge branch 'master' of git.curoverse.com:arvados into 11876-r-sdk
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 import apiclient
12 import datetime
13 import hashlib
14 import json
15 import logging
16 import mock
17 import os
18 import pwd
19 import random
20 import re
21 import shutil
22 import subprocess
23 import sys
24 import tempfile
25 import threading
26 import time
27 import unittest
28 import uuid
29 import yaml
30
31 import arvados
32 import arvados.commands.put as arv_put
33 from . import arvados_testutil as tutil
34
35 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
36 from . import run_test_server
37
38 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
39     CACHE_ARGSET = [
40         [],
41         ['/dev/null'],
42         ['/dev/null', '--filename', 'empty'],
43         ['/tmp']
44         ]
45
46     def tearDown(self):
47         super(ArvadosPutResumeCacheTest, self).tearDown()
48         try:
49             self.last_cache.destroy()
50         except AttributeError:
51             pass
52
53     def cache_path_from_arglist(self, arglist):
54         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
55
56     def test_cache_names_stable(self):
57         for argset in self.CACHE_ARGSET:
58             self.assertEqual(self.cache_path_from_arglist(argset),
59                               self.cache_path_from_arglist(argset),
60                               "cache name changed for {}".format(argset))
61
62     def test_cache_names_unique(self):
63         results = []
64         for argset in self.CACHE_ARGSET:
65             path = self.cache_path_from_arglist(argset)
66             self.assertNotIn(path, results)
67             results.append(path)
68
69     def test_cache_names_simple(self):
70         # The goal here is to make sure the filename doesn't use characters
71         # reserved by the filesystem.  Feel free to adjust this regexp as
72         # long as it still does that.
73         bad_chars = re.compile(r'[^-\.\w]')
74         for argset in self.CACHE_ARGSET:
75             path = self.cache_path_from_arglist(argset)
76             self.assertFalse(bad_chars.search(os.path.basename(path)),
77                              "path too exotic: {}".format(path))
78
79     def test_cache_names_ignore_argument_order(self):
80         self.assertEqual(
81             self.cache_path_from_arglist(['a', 'b', 'c']),
82             self.cache_path_from_arglist(['c', 'a', 'b']))
83         self.assertEqual(
84             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
85             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
86
87     def test_cache_names_differ_for_similar_paths(self):
88         # This test needs names at / that don't exist on the real filesystem.
89         self.assertNotEqual(
90             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
91             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
92
93     def test_cache_names_ignore_irrelevant_arguments(self):
94         # Workaround: parse_arguments bails on --filename with a directory.
95         path1 = self.cache_path_from_arglist(['/tmp'])
96         args = arv_put.parse_arguments(['/tmp'])
97         args.filename = 'tmp'
98         path2 = arv_put.ResumeCache.make_path(args)
99         self.assertEqual(path1, path2,
100                          "cache path considered --filename for directory")
101         self.assertEqual(
102             self.cache_path_from_arglist(['-']),
103             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
104             "cache path considered --max-manifest-depth for file")
105
106     def test_cache_names_treat_negative_manifest_depths_identically(self):
107         base_args = ['/tmp', '--max-manifest-depth']
108         self.assertEqual(
109             self.cache_path_from_arglist(base_args + ['-1']),
110             self.cache_path_from_arglist(base_args + ['-2']))
111
112     def test_cache_names_treat_stdin_consistently(self):
113         self.assertEqual(
114             self.cache_path_from_arglist(['-', '--filename', 'test']),
115             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
116
117     def test_cache_names_identical_for_synonymous_names(self):
118         self.assertEqual(
119             self.cache_path_from_arglist(['.']),
120             self.cache_path_from_arglist([os.path.realpath('.')]))
121         testdir = self.make_tmpdir()
122         looplink = os.path.join(testdir, 'loop')
123         os.symlink(testdir, looplink)
124         self.assertEqual(
125             self.cache_path_from_arglist([testdir]),
126             self.cache_path_from_arglist([looplink]))
127
128     def test_cache_names_different_by_api_host(self):
129         config = arvados.config.settings()
130         orig_host = config.get('ARVADOS_API_HOST')
131         try:
132             name1 = self.cache_path_from_arglist(['.'])
133             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
134             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
135         finally:
136             if orig_host is None:
137                 del config['ARVADOS_API_HOST']
138             else:
139                 config['ARVADOS_API_HOST'] = orig_host
140
141     @mock.patch('arvados.keep.KeepClient.head')
142     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
143         keep_client_head.side_effect = [True]
144         thing = {}
145         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
146         with tempfile.NamedTemporaryFile() as cachefile:
147             self.last_cache = arv_put.ResumeCache(cachefile.name)
148         self.last_cache.save(thing)
149         self.last_cache.close()
150         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
151         self.assertNotEqual(None, resume_cache)
152
153     @mock.patch('arvados.keep.KeepClient.head')
154     def test_resume_cache_with_finished_streams(self, keep_client_head):
155         keep_client_head.side_effect = [True]
156         thing = {}
157         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
158         with tempfile.NamedTemporaryFile() as cachefile:
159             self.last_cache = arv_put.ResumeCache(cachefile.name)
160         self.last_cache.save(thing)
161         self.last_cache.close()
162         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
163         self.assertNotEqual(None, resume_cache)
164
165     @mock.patch('arvados.keep.KeepClient.head')
166     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
167         keep_client_head.side_effect = Exception('Locator not found')
168         thing = {}
169         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
170         with tempfile.NamedTemporaryFile() as cachefile:
171             self.last_cache = arv_put.ResumeCache(cachefile.name)
172         self.last_cache.save(thing)
173         self.last_cache.close()
174         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
175         self.assertNotEqual(None, resume_cache)
176         resume_cache.check_cache()
177
178     def test_basic_cache_storage(self):
179         thing = ['test', 'list']
180         with tempfile.NamedTemporaryFile() as cachefile:
181             self.last_cache = arv_put.ResumeCache(cachefile.name)
182         self.last_cache.save(thing)
183         self.assertEqual(thing, self.last_cache.load())
184
185     def test_empty_cache(self):
186         with tempfile.NamedTemporaryFile() as cachefile:
187             cache = arv_put.ResumeCache(cachefile.name)
188         self.assertRaises(ValueError, cache.load)
189
190     def test_cache_persistent(self):
191         thing = ['test', 'list']
192         path = os.path.join(self.make_tmpdir(), 'cache')
193         cache = arv_put.ResumeCache(path)
194         cache.save(thing)
195         cache.close()
196         self.last_cache = arv_put.ResumeCache(path)
197         self.assertEqual(thing, self.last_cache.load())
198
199     def test_multiple_cache_writes(self):
200         thing = ['short', 'list']
201         with tempfile.NamedTemporaryFile() as cachefile:
202             self.last_cache = arv_put.ResumeCache(cachefile.name)
203         # Start writing an object longer than the one we test, to make
204         # sure the cache file gets truncated.
205         self.last_cache.save(['long', 'long', 'list'])
206         self.last_cache.save(thing)
207         self.assertEqual(thing, self.last_cache.load())
208
209     def test_cache_is_locked(self):
210         with tempfile.NamedTemporaryFile() as cachefile:
211             cache = arv_put.ResumeCache(cachefile.name)
212             self.assertRaises(arv_put.ResumeCacheConflict,
213                               arv_put.ResumeCache, cachefile.name)
214
215     def test_cache_stays_locked(self):
216         with tempfile.NamedTemporaryFile() as cachefile:
217             self.last_cache = arv_put.ResumeCache(cachefile.name)
218             path = cachefile.name
219         self.last_cache.save('test')
220         self.assertRaises(arv_put.ResumeCacheConflict,
221                           arv_put.ResumeCache, path)
222
223     def test_destroy_cache(self):
224         cachefile = tempfile.NamedTemporaryFile(delete=False)
225         try:
226             cache = arv_put.ResumeCache(cachefile.name)
227             cache.save('test')
228             cache.destroy()
229             try:
230                 arv_put.ResumeCache(cachefile.name)
231             except arv_put.ResumeCacheConflict:
232                 self.fail("could not load cache after destroying it")
233             self.assertRaises(ValueError, cache.load)
234         finally:
235             if os.path.exists(cachefile.name):
236                 os.unlink(cachefile.name)
237
238     def test_restart_cache(self):
239         path = os.path.join(self.make_tmpdir(), 'cache')
240         cache = arv_put.ResumeCache(path)
241         cache.save('test')
242         cache.restart()
243         self.assertRaises(ValueError, cache.load)
244         self.assertRaises(arv_put.ResumeCacheConflict,
245                           arv_put.ResumeCache, path)
246
247
248 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
249                           ArvadosBaseTestCase):
250
251     def setUp(self):
252         super(ArvPutUploadJobTest, self).setUp()
253         run_test_server.authorize_with('active')
254         # Temp files creation
255         self.tempdir = tempfile.mkdtemp()
256         subdir = os.path.join(self.tempdir, 'subdir')
257         os.mkdir(subdir)
258         data = "x" * 1024 # 1 KB
259         for i in range(1, 5):
260             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
261                 f.write(data * i)
262         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
263             f.write(data * 5)
264         # Large temp file for resume test
265         _, self.large_file_name = tempfile.mkstemp()
266         fileobj = open(self.large_file_name, 'w')
267         # Make sure to write just a little more than one block
268         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
269             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
270             fileobj.write(data)
271         fileobj.close()
272         # Temp dir containing small files to be repacked
273         self.small_files_dir = tempfile.mkdtemp()
274         data = 'y' * 1024 * 1024 # 1 MB
275         for i in range(1, 70):
276             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
277                 f.write(data + str(i))
278         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
279         # Temp dir to hold a symlink to other temp dir
280         self.tempdir_with_symlink = tempfile.mkdtemp()
281         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
282         os.symlink(os.path.join(self.tempdir, '1'),
283                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
284
285     def tearDown(self):
286         super(ArvPutUploadJobTest, self).tearDown()
287         shutil.rmtree(self.tempdir)
288         os.unlink(self.large_file_name)
289         shutil.rmtree(self.small_files_dir)
290         shutil.rmtree(self.tempdir_with_symlink)
291
292     def test_symlinks_are_followed_by_default(self):
293         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
294         cwriter.start(save_collection=False)
295         self.assertIn('linkeddir', cwriter.manifest_text())
296         self.assertIn('linkedfile', cwriter.manifest_text())
297         cwriter.destroy_cache()
298
299     def test_symlinks_are_not_followed_when_requested(self):
300         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
301                                           follow_links=False)
302         cwriter.start(save_collection=False)
303         self.assertNotIn('linkeddir', cwriter.manifest_text())
304         self.assertNotIn('linkedfile', cwriter.manifest_text())
305         cwriter.destroy_cache()
306
307     def test_passing_nonexistant_path_raise_exception(self):
308         uuid_str = str(uuid.uuid4())
309         with self.assertRaises(arv_put.PathDoesNotExistError):
310             cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
311
312     def test_writer_works_without_cache(self):
313         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
314         cwriter.start(save_collection=False)
315         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
316
317     def test_writer_works_with_cache(self):
318         with tempfile.NamedTemporaryFile() as f:
319             f.write(b'foo')
320             f.flush()
321             cwriter = arv_put.ArvPutUploadJob([f.name])
322             cwriter.start(save_collection=False)
323             self.assertEqual(0, cwriter.bytes_skipped)
324             self.assertEqual(3, cwriter.bytes_written)
325             # Don't destroy the cache, and start another upload
326             cwriter_new = arv_put.ArvPutUploadJob([f.name])
327             cwriter_new.start(save_collection=False)
328             cwriter_new.destroy_cache()
329             self.assertEqual(3, cwriter_new.bytes_skipped)
330             self.assertEqual(3, cwriter_new.bytes_written)
331
332     def make_progress_tester(self):
333         progression = []
334         def record_func(written, expected):
335             progression.append((written, expected))
336         return progression, record_func
337
338     def test_progress_reporting(self):
339         with tempfile.NamedTemporaryFile() as f:
340             f.write(b'foo')
341             f.flush()
342             for expect_count in (None, 8):
343                 progression, reporter = self.make_progress_tester()
344                 cwriter = arv_put.ArvPutUploadJob([f.name],
345                                                   reporter=reporter)
346                 cwriter.bytes_expected = expect_count
347                 cwriter.start(save_collection=False)
348                 cwriter.destroy_cache()
349                 self.assertIn((3, expect_count), progression)
350
351     def test_writer_upload_directory(self):
352         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
353         cwriter.start(save_collection=False)
354         cwriter.destroy_cache()
355         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
356
357     def test_resume_large_file_upload(self):
358         def wrapped_write(*args, **kwargs):
359             data = args[1]
360             # Exit only on last block
361             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
362                 # Simulate a checkpoint before quitting. Ensure block commit.
363                 self.writer._update(final=True)
364                 raise SystemExit("Simulated error")
365             return self.arvfile_write(*args, **kwargs)
366
367         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
368                         autospec=True) as mocked_write:
369             mocked_write.side_effect = wrapped_write
370             writer = arv_put.ArvPutUploadJob([self.large_file_name],
371                                              replication_desired=1)
372             # We'll be accessing from inside the wrapper
373             self.writer = writer
374             with self.assertRaises(SystemExit):
375                 writer.start(save_collection=False)
376             # Confirm that the file was partially uploaded
377             self.assertGreater(writer.bytes_written, 0)
378             self.assertLess(writer.bytes_written,
379                             os.path.getsize(self.large_file_name))
380         # Retry the upload
381         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
382                                           replication_desired=1)
383         writer2.start(save_collection=False)
384         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
385                          os.path.getsize(self.large_file_name))
386         writer2.destroy_cache()
387         del(self.writer)
388
389     # Test for bug #11002
390     def test_graceful_exit_while_repacking_small_blocks(self):
391         def wrapped_commit(*args, **kwargs):
392             raise SystemExit("Simulated error")
393
394         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
395                         autospec=True) as mocked_commit:
396             mocked_commit.side_effect = wrapped_commit
397             # Upload a little more than 1 block, wrapped_commit will make the first block
398             # commit to fail.
399             # arv-put should not exit with an exception by trying to commit the collection
400             # as it's in an inconsistent state.
401             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
402                                              replication_desired=1)
403             try:
404                 with self.assertRaises(SystemExit):
405                     writer.start(save_collection=False)
406             except arvados.arvfile.UnownedBlockError:
407                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
408         writer.destroy_cache()
409
410     def test_no_resume_when_asked(self):
411         def wrapped_write(*args, **kwargs):
412             data = args[1]
413             # Exit only on last block
414             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
415                 # Simulate a checkpoint before quitting.
416                 self.writer._update()
417                 raise SystemExit("Simulated error")
418             return self.arvfile_write(*args, **kwargs)
419
420         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
421                         autospec=True) as mocked_write:
422             mocked_write.side_effect = wrapped_write
423             writer = arv_put.ArvPutUploadJob([self.large_file_name],
424                                              replication_desired=1)
425             # We'll be accessing from inside the wrapper
426             self.writer = writer
427             with self.assertRaises(SystemExit):
428                 writer.start(save_collection=False)
429             # Confirm that the file was partially uploaded
430             self.assertGreater(writer.bytes_written, 0)
431             self.assertLess(writer.bytes_written,
432                             os.path.getsize(self.large_file_name))
433         # Retry the upload, this time without resume
434         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
435                                           replication_desired=1,
436                                           resume=False)
437         writer2.start(save_collection=False)
438         self.assertEqual(writer2.bytes_skipped, 0)
439         self.assertEqual(writer2.bytes_written,
440                          os.path.getsize(self.large_file_name))
441         writer2.destroy_cache()
442         del(self.writer)
443
444     def test_no_resume_when_no_cache(self):
445         def wrapped_write(*args, **kwargs):
446             data = args[1]
447             # Exit only on last block
448             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
449                 # Simulate a checkpoint before quitting.
450                 self.writer._update()
451                 raise SystemExit("Simulated error")
452             return self.arvfile_write(*args, **kwargs)
453
454         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
455                         autospec=True) as mocked_write:
456             mocked_write.side_effect = wrapped_write
457             writer = arv_put.ArvPutUploadJob([self.large_file_name],
458                                              replication_desired=1)
459             # We'll be accessing from inside the wrapper
460             self.writer = writer
461             with self.assertRaises(SystemExit):
462                 writer.start(save_collection=False)
463             # Confirm that the file was partially uploaded
464             self.assertGreater(writer.bytes_written, 0)
465             self.assertLess(writer.bytes_written,
466                             os.path.getsize(self.large_file_name))
467         # Retry the upload, this time without cache usage
468         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
469                                           replication_desired=1,
470                                           resume=False,
471                                           use_cache=False)
472         writer2.start(save_collection=False)
473         self.assertEqual(writer2.bytes_skipped, 0)
474         self.assertEqual(writer2.bytes_written,
475                          os.path.getsize(self.large_file_name))
476         writer2.destroy_cache()
477         del(self.writer)
478
479     def test_dry_run_feature(self):
480         def wrapped_write(*args, **kwargs):
481             data = args[1]
482             # Exit only on last block
483             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
484                 # Simulate a checkpoint before quitting.
485                 self.writer._update()
486                 raise SystemExit("Simulated error")
487             return self.arvfile_write(*args, **kwargs)
488
489         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
490                         autospec=True) as mocked_write:
491             mocked_write.side_effect = wrapped_write
492             writer = arv_put.ArvPutUploadJob([self.large_file_name],
493                                              replication_desired=1)
494             # We'll be accessing from inside the wrapper
495             self.writer = writer
496             with self.assertRaises(SystemExit):
497                 writer.start(save_collection=False)
498             # Confirm that the file was partially uploaded
499             self.assertGreater(writer.bytes_written, 0)
500             self.assertLess(writer.bytes_written,
501                             os.path.getsize(self.large_file_name))
502         with self.assertRaises(arv_put.ArvPutUploadIsPending):
503             # Retry the upload using dry_run to check if there is a pending upload
504             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
505                                               replication_desired=1,
506                                               dry_run=True)
507         # Complete the pending upload
508         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
509                                           replication_desired=1)
510         writer3.start(save_collection=False)
511         with self.assertRaises(arv_put.ArvPutUploadNotPending):
512             # Confirm there's no pending upload with dry_run=True
513             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
514                                               replication_desired=1,
515                                               dry_run=True)
516         # Test obvious cases
517         with self.assertRaises(arv_put.ArvPutUploadIsPending):
518             arv_put.ArvPutUploadJob([self.large_file_name],
519                                     replication_desired=1,
520                                     dry_run=True,
521                                     resume=False,
522                                     use_cache=False)
523         with self.assertRaises(arv_put.ArvPutUploadIsPending):
524             arv_put.ArvPutUploadJob([self.large_file_name],
525                                     replication_desired=1,
526                                     dry_run=True,
527                                     resume=False)
528         del(self.writer)
529
530 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
531     TEST_SIZE = os.path.getsize(__file__)
532
533     def test_expected_bytes_for_file(self):
534         writer = arv_put.ArvPutUploadJob([__file__])
535         self.assertEqual(self.TEST_SIZE,
536                          writer.bytes_expected)
537
538     def test_expected_bytes_for_tree(self):
539         tree = self.make_tmpdir()
540         shutil.copyfile(__file__, os.path.join(tree, 'one'))
541         shutil.copyfile(__file__, os.path.join(tree, 'two'))
542
543         writer = arv_put.ArvPutUploadJob([tree])
544         self.assertEqual(self.TEST_SIZE * 2,
545                          writer.bytes_expected)
546         writer = arv_put.ArvPutUploadJob([tree, __file__])
547         self.assertEqual(self.TEST_SIZE * 3,
548                          writer.bytes_expected)
549
550     def test_expected_bytes_for_device(self):
551         writer = arv_put.ArvPutUploadJob(['/dev/null'])
552         self.assertIsNone(writer.bytes_expected)
553         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
554         self.assertIsNone(writer.bytes_expected)
555
556
557 class ArvadosPutReportTest(ArvadosBaseTestCase):
558     def test_machine_progress(self):
559         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
560             expect = ": {} written {} total\n".format(
561                 count, -1 if (total is None) else total)
562             self.assertTrue(
563                 arv_put.machine_progress(count, total).endswith(expect))
564
565     def test_known_human_progress(self):
566         for count, total in [(0, 1), (2, 4), (45, 60)]:
567             expect = '{:.1%}'.format(1.0*count/total)
568             actual = arv_put.human_progress(count, total)
569             self.assertTrue(actual.startswith('\r'))
570             self.assertIn(expect, actual)
571
572     def test_unknown_human_progress(self):
573         for count in [1, 20, 300, 4000, 50000]:
574             self.assertTrue(re.search(r'\b{}\b'.format(count),
575                                       arv_put.human_progress(count, None)))
576
577
578 class ArvadosPutTest(run_test_server.TestCaseWithServers,
579                      ArvadosBaseTestCase,
580                      tutil.VersionChecker):
581     MAIN_SERVER = {}
582     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
583
584     def call_main_with_args(self, args):
585         self.main_stdout.seek(0, 0)
586         self.main_stdout.truncate(0)
587         self.main_stderr.seek(0, 0)
588         self.main_stderr.truncate(0)
589         return arv_put.main(args, self.main_stdout, self.main_stderr)
590
591     def call_main_on_test_file(self, args=[]):
592         with self.make_test_file() as testfile:
593             path = testfile.name
594             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
595         self.assertTrue(
596             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
597                                         '098f6bcd4621d373cade4e832627b4f6')),
598             "did not find file stream in Keep store")
599
600     def setUp(self):
601         super(ArvadosPutTest, self).setUp()
602         run_test_server.authorize_with('active')
603         arv_put.api_client = None
604         self.main_stdout = tutil.StringIO()
605         self.main_stderr = tutil.StringIO()
606         self.loggingHandler = logging.StreamHandler(self.main_stderr)
607         self.loggingHandler.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
608         logging.getLogger().addHandler(self.loggingHandler)
609
610     def tearDown(self):
611         logging.getLogger().removeHandler(self.loggingHandler)
612         for outbuf in ['main_stdout', 'main_stderr']:
613             if hasattr(self, outbuf):
614                 getattr(self, outbuf).close()
615                 delattr(self, outbuf)
616         super(ArvadosPutTest, self).tearDown()
617
618     def test_version_argument(self):
619         with tutil.redirected_streams(
620                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
621             with self.assertRaises(SystemExit):
622                 self.call_main_with_args(['--version'])
623         self.assertVersionOutput(out, err)
624
625     def test_simple_file_put(self):
626         self.call_main_on_test_file()
627
628     def test_put_with_unwriteable_cache_dir(self):
629         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
630         cachedir = self.make_tmpdir()
631         os.chmod(cachedir, 0o0)
632         arv_put.ResumeCache.CACHE_DIR = cachedir
633         try:
634             self.call_main_on_test_file()
635         finally:
636             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
637             os.chmod(cachedir, 0o700)
638
639     def test_put_with_unwritable_cache_subdir(self):
640         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
641         cachedir = self.make_tmpdir()
642         os.chmod(cachedir, 0o0)
643         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
644         try:
645             self.call_main_on_test_file()
646         finally:
647             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
648             os.chmod(cachedir, 0o700)
649
650     def test_put_block_replication(self):
651         self.call_main_on_test_file()
652         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
653             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
654             self.call_main_on_test_file(['--replication', '1'])
655             self.call_main_on_test_file(['--replication', '4'])
656             self.call_main_on_test_file(['--replication', '5'])
657             self.assertEqual(
658                 [x[-1].get('copies') for x in put_mock.call_args_list],
659                 [1, 4, 5])
660
661     def test_normalize(self):
662         testfile1 = self.make_test_file()
663         testfile2 = self.make_test_file()
664         test_paths = [testfile1.name, testfile2.name]
665         # Reverse-sort the paths, so normalization must change their order.
666         test_paths.sort(reverse=True)
667         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
668                                  test_paths)
669         manifest = self.main_stdout.getvalue()
670         # Assert the second file we specified appears first in the manifest.
671         file_indices = [manifest.find(':' + os.path.basename(path))
672                         for path in test_paths]
673         self.assertGreater(*file_indices)
674
675     def test_error_name_without_collection(self):
676         self.assertRaises(SystemExit, self.call_main_with_args,
677                           ['--name', 'test without Collection',
678                            '--stream', '/dev/null'])
679
680     def test_error_when_project_not_found(self):
681         self.assertRaises(SystemExit,
682                           self.call_main_with_args,
683                           ['--project-uuid', self.Z_UUID])
684
685     def test_error_bad_project_uuid(self):
686         self.assertRaises(SystemExit,
687                           self.call_main_with_args,
688                           ['--project-uuid', self.Z_UUID, '--stream'])
689
690     def test_error_when_excluding_absolute_path(self):
691         tmpdir = self.make_tmpdir()
692         self.assertRaises(SystemExit,
693                           self.call_main_with_args,
694                           ['--exclude', '/some/absolute/path/*',
695                            tmpdir])
696
697     def test_api_error_handling(self):
698         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
699         coll_save_mock.side_effect = arvados.errors.ApiError(
700             fake_httplib2_response(403), b'{}')
701         with mock.patch('arvados.collection.Collection.save_new',
702                         new=coll_save_mock):
703             with self.assertRaises(SystemExit) as exc_test:
704                 self.call_main_with_args(['/dev/null'])
705             self.assertLess(0, exc_test.exception.args[0])
706             self.assertLess(0, coll_save_mock.call_count)
707             self.assertEqual("", self.main_stdout.getvalue())
708
709     def test_request_id_logging(self):
710         matcher = r'INFO: X-Request-Id: req-[a-z0-9]{20}\n'
711
712         self.call_main_on_test_file()
713         self.assertRegex(self.main_stderr.getvalue(), matcher)
714
715         self.call_main_on_test_file(['--silent'])
716         self.assertNotRegex(self.main_stderr.getvalue(), matcher)
717
718
719 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
720                             ArvadosBaseTestCase):
721     def _getKeepServerConfig():
722         for config_file, mandatory in [
723                 ['application.yml', False], ['application.default.yml', True]]:
724             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
725                                 "api", "config", config_file)
726             if not mandatory and not os.path.exists(path):
727                 continue
728             with open(path) as f:
729                 rails_config = yaml.load(f.read())
730                 for config_section in ['test', 'common']:
731                     try:
732                         key = rails_config[config_section]["blob_signing_key"]
733                     except (KeyError, TypeError):
734                         pass
735                     else:
736                         return {'blob_signing_key': key,
737                                 'enforce_permissions': True}
738         return {'blog_signing_key': None, 'enforce_permissions': False}
739
740     MAIN_SERVER = {}
741     KEEP_SERVER = _getKeepServerConfig()
742     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
743
744     @classmethod
745     def setUpClass(cls):
746         super(ArvPutIntegrationTest, cls).setUpClass()
747         cls.ENVIRON = os.environ.copy()
748         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
749
750     def datetime_to_hex(self, dt):
751         return hex(int(time.mktime(dt.timetuple())))[2:]
752
753     def setUp(self):
754         super(ArvPutIntegrationTest, self).setUp()
755         arv_put.api_client = None
756
757     def authorize_with(self, token_name):
758         run_test_server.authorize_with(token_name)
759         for v in ["ARVADOS_API_HOST",
760                   "ARVADOS_API_HOST_INSECURE",
761                   "ARVADOS_API_TOKEN"]:
762             self.ENVIRON[v] = arvados.config.settings()[v]
763         arv_put.api_client = arvados.api('v1')
764
765     def current_user(self):
766         return arv_put.api_client.users().current().execute()
767
768     def test_check_real_project_found(self):
769         self.authorize_with('active')
770         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
771                         "did not correctly find test fixture project")
772
773     def test_check_error_finding_nonexistent_uuid(self):
774         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
775         self.authorize_with('active')
776         try:
777             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
778                                                   0)
779         except ValueError as error:
780             self.assertIn(BAD_UUID, str(error))
781         else:
782             self.assertFalse(result, "incorrectly found nonexistent project")
783
784     def test_check_error_finding_nonexistent_project(self):
785         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
786         self.authorize_with('active')
787         with self.assertRaises(apiclient.errors.HttpError):
788             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
789                                                   0)
790
791     def test_short_put_from_stdin(self):
792         # Have to run this as an integration test since arv-put can't
793         # read from the tests' stdin.
794         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
795         # case, because the /proc entry is already gone by the time it tries.
796         pipe = subprocess.Popen(
797             [sys.executable, arv_put.__file__, '--stream'],
798             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
799             stderr=subprocess.STDOUT, env=self.ENVIRON)
800         pipe.stdin.write(b'stdin test\n')
801         pipe.stdin.close()
802         deadline = time.time() + 5
803         while (pipe.poll() is None) and (time.time() < deadline):
804             time.sleep(.1)
805         returncode = pipe.poll()
806         if returncode is None:
807             pipe.terminate()
808             self.fail("arv-put did not PUT from stdin within 5 seconds")
809         elif returncode != 0:
810             sys.stdout.write(pipe.stdout.read())
811             self.fail("arv-put returned exit code {}".format(returncode))
812         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
813                       pipe.stdout.read().decode())
814
815     def test_ArvPutSignedManifest(self):
816         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
817         # the newly created manifest from the API server, testing to confirm
818         # that the block locators in the returned manifest are signed.
819         self.authorize_with('active')
820
821         # Before doing anything, demonstrate that the collection
822         # we're about to create is not present in our test fixture.
823         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
824         with self.assertRaises(apiclient.errors.HttpError):
825             notfound = arv_put.api_client.collections().get(
826                 uuid=manifest_uuid).execute()
827
828         datadir = self.make_tmpdir()
829         with open(os.path.join(datadir, "foo"), "w") as f:
830             f.write("The quick brown fox jumped over the lazy dog")
831         p = subprocess.Popen([sys.executable, arv_put.__file__,
832                               os.path.join(datadir, 'foo')],
833                              stdout=subprocess.PIPE,
834                              stderr=subprocess.PIPE,
835                              env=self.ENVIRON)
836         (out, err) = p.communicate()
837         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
838         self.assertEqual(p.returncode, 0)
839
840         # The manifest text stored in the API server under the same
841         # manifest UUID must use signed locators.
842         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
843         self.assertRegex(
844             c['manifest_text'],
845             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
846
847         os.remove(os.path.join(datadir, "foo"))
848         os.rmdir(datadir)
849
850     def run_and_find_collection(self, text, extra_args=[]):
851         self.authorize_with('active')
852         pipe = subprocess.Popen(
853             [sys.executable, arv_put.__file__] + extra_args,
854             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
855             stderr=subprocess.PIPE, env=self.ENVIRON)
856         stdout, stderr = pipe.communicate(text.encode())
857         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
858         search_key = ('portable_data_hash'
859                       if '--portable-data-hash' in extra_args else 'uuid')
860         collection_list = arvados.api('v1').collections().list(
861             filters=[[search_key, '=', stdout.decode().strip()]]
862         ).execute().get('items', [])
863         self.assertEqual(1, len(collection_list))
864         return collection_list[0]
865
866     def test_expired_token_invalidates_cache(self):
867         self.authorize_with('active')
868         tmpdir = self.make_tmpdir()
869         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
870             f.write('foo')
871         # Upload a directory and get the cache file name
872         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
873                              stdout=subprocess.PIPE,
874                              stderr=subprocess.PIPE,
875                              env=self.ENVIRON)
876         (out, err) = p.communicate()
877         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
878         self.assertEqual(p.returncode, 0)
879         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
880                                    err.decode()).groups()[0]
881         self.assertTrue(os.path.isfile(cache_filepath))
882         # Load the cache file contents and modify the manifest to simulate
883         # an expired access token
884         with open(cache_filepath, 'r') as c:
885             cache = json.load(c)
886         self.assertRegex(cache['manifest'], r'\+A\S+\@')
887         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
888         cache['manifest'] = re.sub(
889             r'\@.*? ',
890             "@{} ".format(self.datetime_to_hex(a_month_ago)),
891             cache['manifest'])
892         with open(cache_filepath, 'w') as c:
893             c.write(json.dumps(cache))
894         # Re-run the upload and expect to get an invalid cache message
895         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
896                              stdout=subprocess.PIPE,
897                              stderr=subprocess.PIPE,
898                              env=self.ENVIRON)
899         (out, err) = p.communicate()
900         self.assertRegex(
901             err.decode(),
902             r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
903         self.assertEqual(p.returncode, 0)
904         # Confirm that the resulting cache is different from the last run.
905         with open(cache_filepath, 'r') as c2:
906             new_cache = json.load(c2)
907         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
908
909     def test_put_collection_with_later_update(self):
910         tmpdir = self.make_tmpdir()
911         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
912             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
913         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
914         self.assertNotEqual(None, col['uuid'])
915         # Add a new file to the directory
916         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
917             f.write('The quick brown fox jumped over the lazy dog')
918         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
919         self.assertEqual(col['uuid'], updated_col['uuid'])
920         # Get the manifest and check that the new file is being included
921         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
922         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
923
924     def test_upload_directory_reference_without_trailing_slash(self):
925         tmpdir1 = self.make_tmpdir()
926         tmpdir2 = self.make_tmpdir()
927         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
928             f.write('This is foo')
929         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
930             f.write('This is not foo')
931         # Upload one directory and one file
932         col = self.run_and_find_collection("", ['--no-progress',
933                                                 tmpdir1,
934                                                 os.path.join(tmpdir2, 'bar')])
935         self.assertNotEqual(None, col['uuid'])
936         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
937         # Check that 'foo' was written inside a subcollection
938         # OTOH, 'bar' should have been directly uploaded on the root collection
939         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
940
941     def test_upload_directory_reference_with_trailing_slash(self):
942         tmpdir1 = self.make_tmpdir()
943         tmpdir2 = self.make_tmpdir()
944         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
945             f.write('This is foo')
946         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
947             f.write('This is not foo')
948         # Upload one directory (with trailing slash) and one file
949         col = self.run_and_find_collection("", ['--no-progress',
950                                                 tmpdir1 + os.sep,
951                                                 os.path.join(tmpdir2, 'bar')])
952         self.assertNotEqual(None, col['uuid'])
953         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
954         # Check that 'foo' and 'bar' were written at the same level
955         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
956
957     def test_put_collection_with_high_redundancy(self):
958         # Write empty data: we're not testing CollectionWriter, just
959         # making sure collections.create tells the API server what our
960         # desired replication level is.
961         collection = self.run_and_find_collection("", ['--replication', '4'])
962         self.assertEqual(4, collection['replication_desired'])
963
964     def test_put_collection_with_default_redundancy(self):
965         collection = self.run_and_find_collection("")
966         self.assertEqual(None, collection['replication_desired'])
967
968     def test_put_collection_with_unnamed_project_link(self):
969         link = self.run_and_find_collection(
970             "Test unnamed collection",
971             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
972         username = pwd.getpwuid(os.getuid()).pw_name
973         self.assertRegex(
974             link['name'],
975             r'^Saved at .* by {}@'.format(re.escape(username)))
976
977     def test_put_collection_with_name_and_no_project(self):
978         link_name = 'Test Collection Link in home project'
979         collection = self.run_and_find_collection(
980             "Test named collection in home project",
981             ['--portable-data-hash', '--name', link_name])
982         self.assertEqual(link_name, collection['name'])
983         my_user_uuid = self.current_user()['uuid']
984         self.assertEqual(my_user_uuid, collection['owner_uuid'])
985
986     def test_put_collection_with_named_project_link(self):
987         link_name = 'Test auto Collection Link'
988         collection = self.run_and_find_collection("Test named collection",
989                                       ['--portable-data-hash',
990                                        '--name', link_name,
991                                        '--project-uuid', self.PROJECT_UUID])
992         self.assertEqual(link_name, collection['name'])
993
994     def test_exclude_filename_pattern(self):
995         tmpdir = self.make_tmpdir()
996         tmpsubdir = os.path.join(tmpdir, 'subdir')
997         os.mkdir(tmpsubdir)
998         for fname in ['file1', 'file2', 'file3']:
999             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1000                 f.write("This is %s" % fname)
1001             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1002                 f.write("This is %s" % fname)
1003         col = self.run_and_find_collection("", ['--no-progress',
1004                                                 '--exclude', '*2.txt',
1005                                                 '--exclude', 'file3.*',
1006                                                  tmpdir])
1007         self.assertNotEqual(None, col['uuid'])
1008         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1009         # None of the file2.txt & file3.txt should have been uploaded
1010         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1011         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1012         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1013
1014     def test_exclude_filepath_pattern(self):
1015         tmpdir = self.make_tmpdir()
1016         tmpsubdir = os.path.join(tmpdir, 'subdir')
1017         os.mkdir(tmpsubdir)
1018         for fname in ['file1', 'file2', 'file3']:
1019             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1020                 f.write("This is %s" % fname)
1021             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1022                 f.write("This is %s" % fname)
1023         col = self.run_and_find_collection("", ['--no-progress',
1024                                                 '--exclude', 'subdir/*2.txt',
1025                                                 '--exclude', './file1.*',
1026                                                  tmpdir])
1027         self.assertNotEqual(None, col['uuid'])
1028         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1029         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1030         self.assertNotRegex(c['manifest_text'],
1031                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1032         self.assertNotRegex(c['manifest_text'],
1033                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1034         self.assertRegex(c['manifest_text'],
1035                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1036         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1037
1038     def test_silent_mode_no_errors(self):
1039         self.authorize_with('active')
1040         tmpdir = self.make_tmpdir()
1041         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1042             f.write('hello world')
1043         pipe = subprocess.Popen(
1044             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1045             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1046             stderr=subprocess.PIPE, env=self.ENVIRON)
1047         stdout, stderr = pipe.communicate()
1048         # No console output should occur on normal operations
1049         self.assertNotRegex(stderr.decode(), r'.+')
1050         self.assertNotRegex(stdout.decode(), r'.+')
1051
1052     def test_silent_mode_does_not_avoid_error_messages(self):
1053         self.authorize_with('active')
1054         pipe = subprocess.Popen(
1055             [sys.executable, arv_put.__file__] + ['--silent',
1056                                                   '/path/not/existant'],
1057             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1058             stderr=subprocess.PIPE, env=self.ENVIRON)
1059         stdout, stderr = pipe.communicate()
1060         # Error message should be displayed when errors happen
1061         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1062         self.assertNotRegex(stdout.decode(), r'.+')
1063
1064
1065 if __name__ == '__main__':
1066     unittest.main()