10584: When evicting a Collection from the cache, ensure that get/put worker
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 import apiclient
12 import mock
13 import os
14 import pwd
15 import re
16 import shutil
17 import subprocess
18 import sys
19 import tempfile
20 import time
21 import unittest
22 import yaml
23 import threading
24 import hashlib
25 import random
26 import uuid
27
28 import arvados
29 import arvados.commands.put as arv_put
30 from . import arvados_testutil as tutil
31
32 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
33 from . import run_test_server
34
35 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
36     CACHE_ARGSET = [
37         [],
38         ['/dev/null'],
39         ['/dev/null', '--filename', 'empty'],
40         ['/tmp']
41         ]
42
43     def tearDown(self):
44         super(ArvadosPutResumeCacheTest, self).tearDown()
45         try:
46             self.last_cache.destroy()
47         except AttributeError:
48             pass
49
50     def cache_path_from_arglist(self, arglist):
51         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
52
53     def test_cache_names_stable(self):
54         for argset in self.CACHE_ARGSET:
55             self.assertEqual(self.cache_path_from_arglist(argset),
56                               self.cache_path_from_arglist(argset),
57                               "cache name changed for {}".format(argset))
58
59     def test_cache_names_unique(self):
60         results = []
61         for argset in self.CACHE_ARGSET:
62             path = self.cache_path_from_arglist(argset)
63             self.assertNotIn(path, results)
64             results.append(path)
65
66     def test_cache_names_simple(self):
67         # The goal here is to make sure the filename doesn't use characters
68         # reserved by the filesystem.  Feel free to adjust this regexp as
69         # long as it still does that.
70         bad_chars = re.compile(r'[^-\.\w]')
71         for argset in self.CACHE_ARGSET:
72             path = self.cache_path_from_arglist(argset)
73             self.assertFalse(bad_chars.search(os.path.basename(path)),
74                              "path too exotic: {}".format(path))
75
76     def test_cache_names_ignore_argument_order(self):
77         self.assertEqual(
78             self.cache_path_from_arglist(['a', 'b', 'c']),
79             self.cache_path_from_arglist(['c', 'a', 'b']))
80         self.assertEqual(
81             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
82             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
83
84     def test_cache_names_differ_for_similar_paths(self):
85         # This test needs names at / that don't exist on the real filesystem.
86         self.assertNotEqual(
87             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
88             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
89
90     def test_cache_names_ignore_irrelevant_arguments(self):
91         # Workaround: parse_arguments bails on --filename with a directory.
92         path1 = self.cache_path_from_arglist(['/tmp'])
93         args = arv_put.parse_arguments(['/tmp'])
94         args.filename = 'tmp'
95         path2 = arv_put.ResumeCache.make_path(args)
96         self.assertEqual(path1, path2,
97                          "cache path considered --filename for directory")
98         self.assertEqual(
99             self.cache_path_from_arglist(['-']),
100             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
101             "cache path considered --max-manifest-depth for file")
102
103     def test_cache_names_treat_negative_manifest_depths_identically(self):
104         base_args = ['/tmp', '--max-manifest-depth']
105         self.assertEqual(
106             self.cache_path_from_arglist(base_args + ['-1']),
107             self.cache_path_from_arglist(base_args + ['-2']))
108
109     def test_cache_names_treat_stdin_consistently(self):
110         self.assertEqual(
111             self.cache_path_from_arglist(['-', '--filename', 'test']),
112             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
113
114     def test_cache_names_identical_for_synonymous_names(self):
115         self.assertEqual(
116             self.cache_path_from_arglist(['.']),
117             self.cache_path_from_arglist([os.path.realpath('.')]))
118         testdir = self.make_tmpdir()
119         looplink = os.path.join(testdir, 'loop')
120         os.symlink(testdir, looplink)
121         self.assertEqual(
122             self.cache_path_from_arglist([testdir]),
123             self.cache_path_from_arglist([looplink]))
124
125     def test_cache_names_different_by_api_host(self):
126         config = arvados.config.settings()
127         orig_host = config.get('ARVADOS_API_HOST')
128         try:
129             name1 = self.cache_path_from_arglist(['.'])
130             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
131             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
132         finally:
133             if orig_host is None:
134                 del config['ARVADOS_API_HOST']
135             else:
136                 config['ARVADOS_API_HOST'] = orig_host
137
138     @mock.patch('arvados.keep.KeepClient.head')
139     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
140         keep_client_head.side_effect = [True]
141         thing = {}
142         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
143         with tempfile.NamedTemporaryFile() as cachefile:
144             self.last_cache = arv_put.ResumeCache(cachefile.name)
145         self.last_cache.save(thing)
146         self.last_cache.close()
147         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
148         self.assertNotEqual(None, resume_cache)
149
150     @mock.patch('arvados.keep.KeepClient.head')
151     def test_resume_cache_with_finished_streams(self, keep_client_head):
152         keep_client_head.side_effect = [True]
153         thing = {}
154         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
155         with tempfile.NamedTemporaryFile() as cachefile:
156             self.last_cache = arv_put.ResumeCache(cachefile.name)
157         self.last_cache.save(thing)
158         self.last_cache.close()
159         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
160         self.assertNotEqual(None, resume_cache)
161
162     @mock.patch('arvados.keep.KeepClient.head')
163     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
164         keep_client_head.side_effect = Exception('Locator not found')
165         thing = {}
166         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
167         with tempfile.NamedTemporaryFile() as cachefile:
168             self.last_cache = arv_put.ResumeCache(cachefile.name)
169         self.last_cache.save(thing)
170         self.last_cache.close()
171         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
172         self.assertNotEqual(None, resume_cache)
173         self.assertRaises(None, resume_cache.check_cache())
174
175     def test_basic_cache_storage(self):
176         thing = ['test', 'list']
177         with tempfile.NamedTemporaryFile() as cachefile:
178             self.last_cache = arv_put.ResumeCache(cachefile.name)
179         self.last_cache.save(thing)
180         self.assertEqual(thing, self.last_cache.load())
181
182     def test_empty_cache(self):
183         with tempfile.NamedTemporaryFile() as cachefile:
184             cache = arv_put.ResumeCache(cachefile.name)
185         self.assertRaises(ValueError, cache.load)
186
187     def test_cache_persistent(self):
188         thing = ['test', 'list']
189         path = os.path.join(self.make_tmpdir(), 'cache')
190         cache = arv_put.ResumeCache(path)
191         cache.save(thing)
192         cache.close()
193         self.last_cache = arv_put.ResumeCache(path)
194         self.assertEqual(thing, self.last_cache.load())
195
196     def test_multiple_cache_writes(self):
197         thing = ['short', 'list']
198         with tempfile.NamedTemporaryFile() as cachefile:
199             self.last_cache = arv_put.ResumeCache(cachefile.name)
200         # Start writing an object longer than the one we test, to make
201         # sure the cache file gets truncated.
202         self.last_cache.save(['long', 'long', 'list'])
203         self.last_cache.save(thing)
204         self.assertEqual(thing, self.last_cache.load())
205
206     def test_cache_is_locked(self):
207         with tempfile.NamedTemporaryFile() as cachefile:
208             cache = arv_put.ResumeCache(cachefile.name)
209             self.assertRaises(arv_put.ResumeCacheConflict,
210                               arv_put.ResumeCache, cachefile.name)
211
212     def test_cache_stays_locked(self):
213         with tempfile.NamedTemporaryFile() as cachefile:
214             self.last_cache = arv_put.ResumeCache(cachefile.name)
215             path = cachefile.name
216         self.last_cache.save('test')
217         self.assertRaises(arv_put.ResumeCacheConflict,
218                           arv_put.ResumeCache, path)
219
220     def test_destroy_cache(self):
221         cachefile = tempfile.NamedTemporaryFile(delete=False)
222         try:
223             cache = arv_put.ResumeCache(cachefile.name)
224             cache.save('test')
225             cache.destroy()
226             try:
227                 arv_put.ResumeCache(cachefile.name)
228             except arv_put.ResumeCacheConflict:
229                 self.fail("could not load cache after destroying it")
230             self.assertRaises(ValueError, cache.load)
231         finally:
232             if os.path.exists(cachefile.name):
233                 os.unlink(cachefile.name)
234
235     def test_restart_cache(self):
236         path = os.path.join(self.make_tmpdir(), 'cache')
237         cache = arv_put.ResumeCache(path)
238         cache.save('test')
239         cache.restart()
240         self.assertRaises(ValueError, cache.load)
241         self.assertRaises(arv_put.ResumeCacheConflict,
242                           arv_put.ResumeCache, path)
243
244
245 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
246                           ArvadosBaseTestCase):
247
248     def setUp(self):
249         super(ArvPutUploadJobTest, self).setUp()
250         run_test_server.authorize_with('active')
251         # Temp files creation
252         self.tempdir = tempfile.mkdtemp()
253         subdir = os.path.join(self.tempdir, 'subdir')
254         os.mkdir(subdir)
255         data = "x" * 1024 # 1 KB
256         for i in range(1, 5):
257             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
258                 f.write(data * i)
259         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
260             f.write(data * 5)
261         # Large temp file for resume test
262         _, self.large_file_name = tempfile.mkstemp()
263         fileobj = open(self.large_file_name, 'w')
264         # Make sure to write just a little more than one block
265         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
266             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
267             fileobj.write(data)
268         fileobj.close()
269         # Temp dir containing small files to be repacked
270         self.small_files_dir = tempfile.mkdtemp()
271         data = 'y' * 1024 * 1024 # 1 MB
272         for i in range(1, 70):
273             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
274                 f.write(data + str(i))
275         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
276         # Temp dir to hold a symlink to other temp dir
277         self.tempdir_with_symlink = tempfile.mkdtemp()
278         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
279         os.symlink(os.path.join(self.tempdir, '1'),
280                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
281
282     def tearDown(self):
283         super(ArvPutUploadJobTest, self).tearDown()
284         shutil.rmtree(self.tempdir)
285         os.unlink(self.large_file_name)
286         shutil.rmtree(self.small_files_dir)
287         shutil.rmtree(self.tempdir_with_symlink)
288
289     def test_symlinks_are_followed_by_default(self):
290         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
291         cwriter.start(save_collection=False)
292         self.assertIn('linkeddir', cwriter.manifest_text())
293         self.assertIn('linkedfile', cwriter.manifest_text())
294         cwriter.destroy_cache()
295
296     def test_symlinks_are_not_followed_when_requested(self):
297         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
298                                           follow_links=False)
299         cwriter.start(save_collection=False)
300         self.assertNotIn('linkeddir', cwriter.manifest_text())
301         self.assertNotIn('linkedfile', cwriter.manifest_text())
302         cwriter.destroy_cache()
303
304     def test_passing_nonexistant_path_raise_exception(self):
305         uuid_str = str(uuid.uuid4())
306         cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
307         with self.assertRaises(arv_put.PathDoesNotExistError):
308             cwriter.start(save_collection=False)
309
310     def test_writer_works_without_cache(self):
311         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
312         cwriter.start(save_collection=False)
313         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
314
315     def test_writer_works_with_cache(self):
316         with tempfile.NamedTemporaryFile() as f:
317             f.write(b'foo')
318             f.flush()
319             cwriter = arv_put.ArvPutUploadJob([f.name])
320             cwriter.start(save_collection=False)
321             self.assertEqual(0, cwriter.bytes_skipped)
322             self.assertEqual(3, cwriter.bytes_written)
323             # Don't destroy the cache, and start another upload
324             cwriter_new = arv_put.ArvPutUploadJob([f.name])
325             cwriter_new.start(save_collection=False)
326             cwriter_new.destroy_cache()
327             self.assertEqual(3, cwriter_new.bytes_skipped)
328             self.assertEqual(3, cwriter_new.bytes_written)
329
330     def make_progress_tester(self):
331         progression = []
332         def record_func(written, expected):
333             progression.append((written, expected))
334         return progression, record_func
335
336     def test_progress_reporting(self):
337         with tempfile.NamedTemporaryFile() as f:
338             f.write(b'foo')
339             f.flush()
340             for expect_count in (None, 8):
341                 progression, reporter = self.make_progress_tester()
342                 cwriter = arv_put.ArvPutUploadJob([f.name],
343                     reporter=reporter, bytes_expected=expect_count)
344                 cwriter.start(save_collection=False)
345                 cwriter.destroy_cache()
346                 self.assertIn((3, expect_count), progression)
347
348     def test_writer_upload_directory(self):
349         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
350         cwriter.start(save_collection=False)
351         cwriter.destroy_cache()
352         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
353
354     def test_resume_large_file_upload(self):
355         def wrapped_write(*args, **kwargs):
356             data = args[1]
357             # Exit only on last block
358             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
359                 # Simulate a checkpoint before quitting. Ensure block commit.
360                 self.writer._update(final=True)
361                 raise SystemExit("Simulated error")
362             return self.arvfile_write(*args, **kwargs)
363
364         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
365                         autospec=True) as mocked_write:
366             mocked_write.side_effect = wrapped_write
367             writer = arv_put.ArvPutUploadJob([self.large_file_name],
368                                              replication_desired=1)
369             # We'll be accessing from inside the wrapper
370             self.writer = writer
371             with self.assertRaises(SystemExit):
372                 writer.start(save_collection=False)
373             # Confirm that the file was partially uploaded
374             self.assertGreater(writer.bytes_written, 0)
375             self.assertLess(writer.bytes_written,
376                             os.path.getsize(self.large_file_name))
377         # Retry the upload
378         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
379                                           replication_desired=1)
380         writer2.start(save_collection=False)
381         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
382                          os.path.getsize(self.large_file_name))
383         writer2.destroy_cache()
384         del(self.writer)
385
386     # Test for bug #11002
387     def test_graceful_exit_while_repacking_small_blocks(self):
388         def wrapped_commit(*args, **kwargs):
389             raise SystemExit("Simulated error")
390
391         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
392                         autospec=True) as mocked_commit:
393             mocked_commit.side_effect = wrapped_commit
394             # Upload a little more than 1 block, wrapped_commit will make the first block
395             # commit to fail.
396             # arv-put should not exit with an exception by trying to commit the collection
397             # as it's in an inconsistent state.
398             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
399                                              replication_desired=1)
400             try:
401                 with self.assertRaises(SystemExit):
402                     writer.start(save_collection=False)
403             except arvados.arvfile.UnownedBlockError:
404                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
405         writer.destroy_cache()
406
407     def test_no_resume_when_asked(self):
408         def wrapped_write(*args, **kwargs):
409             data = args[1]
410             # Exit only on last block
411             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
412                 # Simulate a checkpoint before quitting.
413                 self.writer._update()
414                 raise SystemExit("Simulated error")
415             return self.arvfile_write(*args, **kwargs)
416
417         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
418                         autospec=True) as mocked_write:
419             mocked_write.side_effect = wrapped_write
420             writer = arv_put.ArvPutUploadJob([self.large_file_name],
421                                              replication_desired=1)
422             # We'll be accessing from inside the wrapper
423             self.writer = writer
424             with self.assertRaises(SystemExit):
425                 writer.start(save_collection=False)
426             # Confirm that the file was partially uploaded
427             self.assertGreater(writer.bytes_written, 0)
428             self.assertLess(writer.bytes_written,
429                             os.path.getsize(self.large_file_name))
430         # Retry the upload, this time without resume
431         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
432                                           replication_desired=1,
433                                           resume=False)
434         writer2.start(save_collection=False)
435         self.assertEqual(writer2.bytes_skipped, 0)
436         self.assertEqual(writer2.bytes_written,
437                          os.path.getsize(self.large_file_name))
438         writer2.destroy_cache()
439         del(self.writer)
440
441     def test_no_resume_when_no_cache(self):
442         def wrapped_write(*args, **kwargs):
443             data = args[1]
444             # Exit only on last block
445             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
446                 # Simulate a checkpoint before quitting.
447                 self.writer._update()
448                 raise SystemExit("Simulated error")
449             return self.arvfile_write(*args, **kwargs)
450
451         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
452                         autospec=True) as mocked_write:
453             mocked_write.side_effect = wrapped_write
454             writer = arv_put.ArvPutUploadJob([self.large_file_name],
455                                              replication_desired=1)
456             # We'll be accessing from inside the wrapper
457             self.writer = writer
458             with self.assertRaises(SystemExit):
459                 writer.start(save_collection=False)
460             # Confirm that the file was partially uploaded
461             self.assertGreater(writer.bytes_written, 0)
462             self.assertLess(writer.bytes_written,
463                             os.path.getsize(self.large_file_name))
464         # Retry the upload, this time without cache usage
465         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
466                                           replication_desired=1,
467                                           resume=False,
468                                           use_cache=False)
469         writer2.start(save_collection=False)
470         self.assertEqual(writer2.bytes_skipped, 0)
471         self.assertEqual(writer2.bytes_written,
472                          os.path.getsize(self.large_file_name))
473         writer2.destroy_cache()
474         del(self.writer)
475
476     def test_dry_run_feature(self):
477         def wrapped_write(*args, **kwargs):
478             data = args[1]
479             # Exit only on last block
480             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
481                 # Simulate a checkpoint before quitting.
482                 self.writer._update()
483                 raise SystemExit("Simulated error")
484             return self.arvfile_write(*args, **kwargs)
485
486         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
487                         autospec=True) as mocked_write:
488             mocked_write.side_effect = wrapped_write
489             writer = arv_put.ArvPutUploadJob([self.large_file_name],
490                                              replication_desired=1)
491             # We'll be accessing from inside the wrapper
492             self.writer = writer
493             with self.assertRaises(SystemExit):
494                 writer.start(save_collection=False)
495             # Confirm that the file was partially uploaded
496             self.assertGreater(writer.bytes_written, 0)
497             self.assertLess(writer.bytes_written,
498                             os.path.getsize(self.large_file_name))
499         # Retry the upload using dry_run to check if there is a pending upload
500         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
501                                           replication_desired=1,
502                                           dry_run=True)
503         with self.assertRaises(arv_put.ArvPutUploadIsPending):
504             writer2.start(save_collection=False)
505         # Complete the pending upload
506         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
507                                           replication_desired=1)
508         writer3.start(save_collection=False)
509         # Confirm there's no pending upload with dry_run=True
510         writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
511                                           replication_desired=1,
512                                           dry_run=True)
513         with self.assertRaises(arv_put.ArvPutUploadNotPending):
514             writer4.start(save_collection=False)
515         writer4.destroy_cache()
516         # Test obvious cases
517         with self.assertRaises(arv_put.ArvPutUploadIsPending):
518             arv_put.ArvPutUploadJob([self.large_file_name],
519                                     replication_desired=1,
520                                     dry_run=True,
521                                     resume=False,
522                                     use_cache=False)
523         with self.assertRaises(arv_put.ArvPutUploadIsPending):
524             arv_put.ArvPutUploadJob([self.large_file_name],
525                                     replication_desired=1,
526                                     dry_run=True,
527                                     resume=False)
528         del(self.writer)
529
530 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
531     TEST_SIZE = os.path.getsize(__file__)
532
533     def test_expected_bytes_for_file(self):
534         self.assertEqual(self.TEST_SIZE,
535                           arv_put.expected_bytes_for([__file__]))
536
537     def test_expected_bytes_for_tree(self):
538         tree = self.make_tmpdir()
539         shutil.copyfile(__file__, os.path.join(tree, 'one'))
540         shutil.copyfile(__file__, os.path.join(tree, 'two'))
541         self.assertEqual(self.TEST_SIZE * 2,
542                           arv_put.expected_bytes_for([tree]))
543         self.assertEqual(self.TEST_SIZE * 3,
544                           arv_put.expected_bytes_for([tree, __file__]))
545
546     def test_expected_bytes_for_device(self):
547         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
548         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
549
550
551 class ArvadosPutReportTest(ArvadosBaseTestCase):
552     def test_machine_progress(self):
553         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
554             expect = ": {} written {} total\n".format(
555                 count, -1 if (total is None) else total)
556             self.assertTrue(
557                 arv_put.machine_progress(count, total).endswith(expect))
558
559     def test_known_human_progress(self):
560         for count, total in [(0, 1), (2, 4), (45, 60)]:
561             expect = '{:.1%}'.format(1.0*count/total)
562             actual = arv_put.human_progress(count, total)
563             self.assertTrue(actual.startswith('\r'))
564             self.assertIn(expect, actual)
565
566     def test_unknown_human_progress(self):
567         for count in [1, 20, 300, 4000, 50000]:
568             self.assertTrue(re.search(r'\b{}\b'.format(count),
569                                       arv_put.human_progress(count, None)))
570
571
572 class ArvadosPutTest(run_test_server.TestCaseWithServers,
573                      ArvadosBaseTestCase,
574                      tutil.VersionChecker):
575     MAIN_SERVER = {}
576     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
577
578     def call_main_with_args(self, args):
579         self.main_stdout = tutil.StringIO()
580         self.main_stderr = tutil.StringIO()
581         return arv_put.main(args, self.main_stdout, self.main_stderr)
582
583     def call_main_on_test_file(self, args=[]):
584         with self.make_test_file() as testfile:
585             path = testfile.name
586             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
587         self.assertTrue(
588             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
589                                         '098f6bcd4621d373cade4e832627b4f6')),
590             "did not find file stream in Keep store")
591
592     def setUp(self):
593         super(ArvadosPutTest, self).setUp()
594         run_test_server.authorize_with('active')
595         arv_put.api_client = None
596
597     def tearDown(self):
598         for outbuf in ['main_stdout', 'main_stderr']:
599             if hasattr(self, outbuf):
600                 getattr(self, outbuf).close()
601                 delattr(self, outbuf)
602         super(ArvadosPutTest, self).tearDown()
603
604     def test_version_argument(self):
605         with tutil.redirected_streams(
606                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
607             with self.assertRaises(SystemExit):
608                 self.call_main_with_args(['--version'])
609         self.assertVersionOutput(out, err)
610
611     def test_simple_file_put(self):
612         self.call_main_on_test_file()
613
614     def test_put_with_unwriteable_cache_dir(self):
615         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
616         cachedir = self.make_tmpdir()
617         os.chmod(cachedir, 0o0)
618         arv_put.ResumeCache.CACHE_DIR = cachedir
619         try:
620             self.call_main_on_test_file()
621         finally:
622             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
623             os.chmod(cachedir, 0o700)
624
625     def test_put_with_unwritable_cache_subdir(self):
626         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
627         cachedir = self.make_tmpdir()
628         os.chmod(cachedir, 0o0)
629         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
630         try:
631             self.call_main_on_test_file()
632         finally:
633             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
634             os.chmod(cachedir, 0o700)
635
636     def test_put_block_replication(self):
637         self.call_main_on_test_file()
638         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
639             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
640             self.call_main_on_test_file(['--replication', '1'])
641             self.call_main_on_test_file(['--replication', '4'])
642             self.call_main_on_test_file(['--replication', '5'])
643             self.assertEqual(
644                 [x[-1].get('copies') for x in put_mock.call_args_list],
645                 [1, 4, 5])
646
647     def test_normalize(self):
648         testfile1 = self.make_test_file()
649         testfile2 = self.make_test_file()
650         test_paths = [testfile1.name, testfile2.name]
651         # Reverse-sort the paths, so normalization must change their order.
652         test_paths.sort(reverse=True)
653         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
654                                  test_paths)
655         manifest = self.main_stdout.getvalue()
656         # Assert the second file we specified appears first in the manifest.
657         file_indices = [manifest.find(':' + os.path.basename(path))
658                         for path in test_paths]
659         self.assertGreater(*file_indices)
660
661     def test_error_name_without_collection(self):
662         self.assertRaises(SystemExit, self.call_main_with_args,
663                           ['--name', 'test without Collection',
664                            '--stream', '/dev/null'])
665
666     def test_error_when_project_not_found(self):
667         self.assertRaises(SystemExit,
668                           self.call_main_with_args,
669                           ['--project-uuid', self.Z_UUID])
670
671     def test_error_bad_project_uuid(self):
672         self.assertRaises(SystemExit,
673                           self.call_main_with_args,
674                           ['--project-uuid', self.Z_UUID, '--stream'])
675
676     def test_api_error_handling(self):
677         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
678         coll_save_mock.side_effect = arvados.errors.ApiError(
679             fake_httplib2_response(403), b'{}')
680         with mock.patch('arvados.collection.Collection.save_new',
681                         new=coll_save_mock):
682             with self.assertRaises(SystemExit) as exc_test:
683                 self.call_main_with_args(['/dev/null'])
684             self.assertLess(0, exc_test.exception.args[0])
685             self.assertLess(0, coll_save_mock.call_count)
686             self.assertEqual("", self.main_stdout.getvalue())
687
688
689 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
690                             ArvadosBaseTestCase):
691     def _getKeepServerConfig():
692         for config_file, mandatory in [
693                 ['application.yml', False], ['application.default.yml', True]]:
694             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
695                                 "api", "config", config_file)
696             if not mandatory and not os.path.exists(path):
697                 continue
698             with open(path) as f:
699                 rails_config = yaml.load(f.read())
700                 for config_section in ['test', 'common']:
701                     try:
702                         key = rails_config[config_section]["blob_signing_key"]
703                     except (KeyError, TypeError):
704                         pass
705                     else:
706                         return {'blob_signing_key': key,
707                                 'enforce_permissions': True}
708         return {'blog_signing_key': None, 'enforce_permissions': False}
709
710     MAIN_SERVER = {}
711     KEEP_SERVER = _getKeepServerConfig()
712     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
713
714     @classmethod
715     def setUpClass(cls):
716         super(ArvPutIntegrationTest, cls).setUpClass()
717         cls.ENVIRON = os.environ.copy()
718         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
719
720     def setUp(self):
721         super(ArvPutIntegrationTest, self).setUp()
722         arv_put.api_client = None
723
724     def authorize_with(self, token_name):
725         run_test_server.authorize_with(token_name)
726         for v in ["ARVADOS_API_HOST",
727                   "ARVADOS_API_HOST_INSECURE",
728                   "ARVADOS_API_TOKEN"]:
729             self.ENVIRON[v] = arvados.config.settings()[v]
730         arv_put.api_client = arvados.api('v1')
731
732     def current_user(self):
733         return arv_put.api_client.users().current().execute()
734
735     def test_check_real_project_found(self):
736         self.authorize_with('active')
737         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
738                         "did not correctly find test fixture project")
739
740     def test_check_error_finding_nonexistent_uuid(self):
741         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
742         self.authorize_with('active')
743         try:
744             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
745                                                   0)
746         except ValueError as error:
747             self.assertIn(BAD_UUID, str(error))
748         else:
749             self.assertFalse(result, "incorrectly found nonexistent project")
750
751     def test_check_error_finding_nonexistent_project(self):
752         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
753         self.authorize_with('active')
754         with self.assertRaises(apiclient.errors.HttpError):
755             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
756                                                   0)
757
758     def test_short_put_from_stdin(self):
759         # Have to run this as an integration test since arv-put can't
760         # read from the tests' stdin.
761         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
762         # case, because the /proc entry is already gone by the time it tries.
763         pipe = subprocess.Popen(
764             [sys.executable, arv_put.__file__, '--stream'],
765             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
766             stderr=subprocess.STDOUT, env=self.ENVIRON)
767         pipe.stdin.write(b'stdin test\n')
768         pipe.stdin.close()
769         deadline = time.time() + 5
770         while (pipe.poll() is None) and (time.time() < deadline):
771             time.sleep(.1)
772         returncode = pipe.poll()
773         if returncode is None:
774             pipe.terminate()
775             self.fail("arv-put did not PUT from stdin within 5 seconds")
776         elif returncode != 0:
777             sys.stdout.write(pipe.stdout.read())
778             self.fail("arv-put returned exit code {}".format(returncode))
779         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
780                       pipe.stdout.read().decode())
781
782     def test_ArvPutSignedManifest(self):
783         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
784         # the newly created manifest from the API server, testing to confirm
785         # that the block locators in the returned manifest are signed.
786         self.authorize_with('active')
787
788         # Before doing anything, demonstrate that the collection
789         # we're about to create is not present in our test fixture.
790         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
791         with self.assertRaises(apiclient.errors.HttpError):
792             notfound = arv_put.api_client.collections().get(
793                 uuid=manifest_uuid).execute()
794
795         datadir = self.make_tmpdir()
796         with open(os.path.join(datadir, "foo"), "w") as f:
797             f.write("The quick brown fox jumped over the lazy dog")
798         p = subprocess.Popen([sys.executable, arv_put.__file__,
799                               os.path.join(datadir, 'foo')],
800                              stdout=subprocess.PIPE,
801                              stderr=subprocess.PIPE,
802                              env=self.ENVIRON)
803         (out, err) = p.communicate()
804         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
805         self.assertEqual(p.returncode, 0)
806
807         # The manifest text stored in the API server under the same
808         # manifest UUID must use signed locators.
809         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
810         self.assertRegex(
811             c['manifest_text'],
812             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
813
814         os.remove(os.path.join(datadir, "foo"))
815         os.rmdir(datadir)
816
817     def run_and_find_collection(self, text, extra_args=[]):
818         self.authorize_with('active')
819         pipe = subprocess.Popen(
820             [sys.executable, arv_put.__file__] + extra_args,
821             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
822             stderr=subprocess.PIPE, env=self.ENVIRON)
823         stdout, stderr = pipe.communicate(text.encode())
824         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
825         search_key = ('portable_data_hash'
826                       if '--portable-data-hash' in extra_args else 'uuid')
827         collection_list = arvados.api('v1').collections().list(
828             filters=[[search_key, '=', stdout.decode().strip()]]
829         ).execute().get('items', [])
830         self.assertEqual(1, len(collection_list))
831         return collection_list[0]
832
833     def test_put_collection_with_later_update(self):
834         tmpdir = self.make_tmpdir()
835         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
836             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
837         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
838         self.assertNotEqual(None, col['uuid'])
839         # Add a new file to the directory
840         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
841             f.write('The quick brown fox jumped over the lazy dog')
842         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
843         self.assertEqual(col['uuid'], updated_col['uuid'])
844         # Get the manifest and check that the new file is being included
845         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
846         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
847
848     def test_upload_directory_reference_without_trailing_slash(self):
849         tmpdir1 = self.make_tmpdir()
850         tmpdir2 = self.make_tmpdir()
851         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
852             f.write('This is foo')
853         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
854             f.write('This is not foo')
855         # Upload one directory and one file
856         col = self.run_and_find_collection("", ['--no-progress',
857                                                 tmpdir1,
858                                                 os.path.join(tmpdir2, 'bar')])
859         self.assertNotEqual(None, col['uuid'])
860         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
861         # Check that 'foo' was written inside a subcollection
862         # OTOH, 'bar' should have been directly uploaded on the root collection
863         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
864
865     def test_upload_directory_reference_with_trailing_slash(self):
866         tmpdir1 = self.make_tmpdir()
867         tmpdir2 = self.make_tmpdir()
868         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
869             f.write('This is foo')
870         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
871             f.write('This is not foo')
872         # Upload one directory (with trailing slash) and one file
873         col = self.run_and_find_collection("", ['--no-progress',
874                                                 tmpdir1 + os.sep,
875                                                 os.path.join(tmpdir2, 'bar')])
876         self.assertNotEqual(None, col['uuid'])
877         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
878         # Check that 'foo' and 'bar' were written at the same level
879         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
880
881     def test_put_collection_with_high_redundancy(self):
882         # Write empty data: we're not testing CollectionWriter, just
883         # making sure collections.create tells the API server what our
884         # desired replication level is.
885         collection = self.run_and_find_collection("", ['--replication', '4'])
886         self.assertEqual(4, collection['replication_desired'])
887
888     def test_put_collection_with_default_redundancy(self):
889         collection = self.run_and_find_collection("")
890         self.assertEqual(None, collection['replication_desired'])
891
892     def test_put_collection_with_unnamed_project_link(self):
893         link = self.run_and_find_collection(
894             "Test unnamed collection",
895             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
896         username = pwd.getpwuid(os.getuid()).pw_name
897         self.assertRegex(
898             link['name'],
899             r'^Saved at .* by {}@'.format(re.escape(username)))
900
901     def test_put_collection_with_name_and_no_project(self):
902         link_name = 'Test Collection Link in home project'
903         collection = self.run_and_find_collection(
904             "Test named collection in home project",
905             ['--portable-data-hash', '--name', link_name])
906         self.assertEqual(link_name, collection['name'])
907         my_user_uuid = self.current_user()['uuid']
908         self.assertEqual(my_user_uuid, collection['owner_uuid'])
909
910     def test_put_collection_with_named_project_link(self):
911         link_name = 'Test auto Collection Link'
912         collection = self.run_and_find_collection("Test named collection",
913                                       ['--portable-data-hash',
914                                        '--name', link_name,
915                                        '--project-uuid', self.PROJECT_UUID])
916         self.assertEqual(link_name, collection['name'])
917
918
919 if __name__ == '__main__':
920     unittest.main()