11579: Check if paths exist, raise meaningful exception if not.
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import io
6 import mock
7 import os
8 import pwd
9 import re
10 import shutil
11 import subprocess
12 import sys
13 import tempfile
14 import time
15 import unittest
16 import yaml
17 import threading
18 import hashlib
19 import random
20 import uuid
21
22 from cStringIO import StringIO
23
24 import arvados
25 import arvados.commands.put as arv_put
26 import arvados_testutil as tutil
27
28 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
29 import run_test_server
30
31 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
32     CACHE_ARGSET = [
33         [],
34         ['/dev/null'],
35         ['/dev/null', '--filename', 'empty'],
36         ['/tmp']
37         ]
38
39     def tearDown(self):
40         super(ArvadosPutResumeCacheTest, self).tearDown()
41         try:
42             self.last_cache.destroy()
43         except AttributeError:
44             pass
45
46     def cache_path_from_arglist(self, arglist):
47         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
48
49     def test_cache_names_stable(self):
50         for argset in self.CACHE_ARGSET:
51             self.assertEqual(self.cache_path_from_arglist(argset),
52                               self.cache_path_from_arglist(argset),
53                               "cache name changed for {}".format(argset))
54
55     def test_cache_names_unique(self):
56         results = []
57         for argset in self.CACHE_ARGSET:
58             path = self.cache_path_from_arglist(argset)
59             self.assertNotIn(path, results)
60             results.append(path)
61
62     def test_cache_names_simple(self):
63         # The goal here is to make sure the filename doesn't use characters
64         # reserved by the filesystem.  Feel free to adjust this regexp as
65         # long as it still does that.
66         bad_chars = re.compile(r'[^-\.\w]')
67         for argset in self.CACHE_ARGSET:
68             path = self.cache_path_from_arglist(argset)
69             self.assertFalse(bad_chars.search(os.path.basename(path)),
70                              "path too exotic: {}".format(path))
71
72     def test_cache_names_ignore_argument_order(self):
73         self.assertEqual(
74             self.cache_path_from_arglist(['a', 'b', 'c']),
75             self.cache_path_from_arglist(['c', 'a', 'b']))
76         self.assertEqual(
77             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
78             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
79
80     def test_cache_names_differ_for_similar_paths(self):
81         # This test needs names at / that don't exist on the real filesystem.
82         self.assertNotEqual(
83             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
84             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
85
86     def test_cache_names_ignore_irrelevant_arguments(self):
87         # Workaround: parse_arguments bails on --filename with a directory.
88         path1 = self.cache_path_from_arglist(['/tmp'])
89         args = arv_put.parse_arguments(['/tmp'])
90         args.filename = 'tmp'
91         path2 = arv_put.ResumeCache.make_path(args)
92         self.assertEqual(path1, path2,
93                          "cache path considered --filename for directory")
94         self.assertEqual(
95             self.cache_path_from_arglist(['-']),
96             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
97             "cache path considered --max-manifest-depth for file")
98
99     def test_cache_names_treat_negative_manifest_depths_identically(self):
100         base_args = ['/tmp', '--max-manifest-depth']
101         self.assertEqual(
102             self.cache_path_from_arglist(base_args + ['-1']),
103             self.cache_path_from_arglist(base_args + ['-2']))
104
105     def test_cache_names_treat_stdin_consistently(self):
106         self.assertEqual(
107             self.cache_path_from_arglist(['-', '--filename', 'test']),
108             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
109
110     def test_cache_names_identical_for_synonymous_names(self):
111         self.assertEqual(
112             self.cache_path_from_arglist(['.']),
113             self.cache_path_from_arglist([os.path.realpath('.')]))
114         testdir = self.make_tmpdir()
115         looplink = os.path.join(testdir, 'loop')
116         os.symlink(testdir, looplink)
117         self.assertEqual(
118             self.cache_path_from_arglist([testdir]),
119             self.cache_path_from_arglist([looplink]))
120
121     def test_cache_names_different_by_api_host(self):
122         config = arvados.config.settings()
123         orig_host = config.get('ARVADOS_API_HOST')
124         try:
125             name1 = self.cache_path_from_arglist(['.'])
126             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
127             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
128         finally:
129             if orig_host is None:
130                 del config['ARVADOS_API_HOST']
131             else:
132                 config['ARVADOS_API_HOST'] = orig_host
133
134     @mock.patch('arvados.keep.KeepClient.head')
135     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
136         keep_client_head.side_effect = [True]
137         thing = {}
138         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
139         with tempfile.NamedTemporaryFile() as cachefile:
140             self.last_cache = arv_put.ResumeCache(cachefile.name)
141         self.last_cache.save(thing)
142         self.last_cache.close()
143         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
144         self.assertNotEqual(None, resume_cache)
145
146     @mock.patch('arvados.keep.KeepClient.head')
147     def test_resume_cache_with_finished_streams(self, keep_client_head):
148         keep_client_head.side_effect = [True]
149         thing = {}
150         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
151         with tempfile.NamedTemporaryFile() as cachefile:
152             self.last_cache = arv_put.ResumeCache(cachefile.name)
153         self.last_cache.save(thing)
154         self.last_cache.close()
155         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
156         self.assertNotEqual(None, resume_cache)
157
158     @mock.patch('arvados.keep.KeepClient.head')
159     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
160         keep_client_head.side_effect = Exception('Locator not found')
161         thing = {}
162         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
163         with tempfile.NamedTemporaryFile() as cachefile:
164             self.last_cache = arv_put.ResumeCache(cachefile.name)
165         self.last_cache.save(thing)
166         self.last_cache.close()
167         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
168         self.assertNotEqual(None, resume_cache)
169         self.assertRaises(None, resume_cache.check_cache())
170
171     def test_basic_cache_storage(self):
172         thing = ['test', 'list']
173         with tempfile.NamedTemporaryFile() as cachefile:
174             self.last_cache = arv_put.ResumeCache(cachefile.name)
175         self.last_cache.save(thing)
176         self.assertEqual(thing, self.last_cache.load())
177
178     def test_empty_cache(self):
179         with tempfile.NamedTemporaryFile() as cachefile:
180             cache = arv_put.ResumeCache(cachefile.name)
181         self.assertRaises(ValueError, cache.load)
182
183     def test_cache_persistent(self):
184         thing = ['test', 'list']
185         path = os.path.join(self.make_tmpdir(), 'cache')
186         cache = arv_put.ResumeCache(path)
187         cache.save(thing)
188         cache.close()
189         self.last_cache = arv_put.ResumeCache(path)
190         self.assertEqual(thing, self.last_cache.load())
191
192     def test_multiple_cache_writes(self):
193         thing = ['short', 'list']
194         with tempfile.NamedTemporaryFile() as cachefile:
195             self.last_cache = arv_put.ResumeCache(cachefile.name)
196         # Start writing an object longer than the one we test, to make
197         # sure the cache file gets truncated.
198         self.last_cache.save(['long', 'long', 'list'])
199         self.last_cache.save(thing)
200         self.assertEqual(thing, self.last_cache.load())
201
202     def test_cache_is_locked(self):
203         with tempfile.NamedTemporaryFile() as cachefile:
204             cache = arv_put.ResumeCache(cachefile.name)
205             self.assertRaises(arv_put.ResumeCacheConflict,
206                               arv_put.ResumeCache, cachefile.name)
207
208     def test_cache_stays_locked(self):
209         with tempfile.NamedTemporaryFile() as cachefile:
210             self.last_cache = arv_put.ResumeCache(cachefile.name)
211             path = cachefile.name
212         self.last_cache.save('test')
213         self.assertRaises(arv_put.ResumeCacheConflict,
214                           arv_put.ResumeCache, path)
215
216     def test_destroy_cache(self):
217         cachefile = tempfile.NamedTemporaryFile(delete=False)
218         try:
219             cache = arv_put.ResumeCache(cachefile.name)
220             cache.save('test')
221             cache.destroy()
222             try:
223                 arv_put.ResumeCache(cachefile.name)
224             except arv_put.ResumeCacheConflict:
225                 self.fail("could not load cache after destroying it")
226             self.assertRaises(ValueError, cache.load)
227         finally:
228             if os.path.exists(cachefile.name):
229                 os.unlink(cachefile.name)
230
231     def test_restart_cache(self):
232         path = os.path.join(self.make_tmpdir(), 'cache')
233         cache = arv_put.ResumeCache(path)
234         cache.save('test')
235         cache.restart()
236         self.assertRaises(ValueError, cache.load)
237         self.assertRaises(arv_put.ResumeCacheConflict,
238                           arv_put.ResumeCache, path)
239
240
241 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
242                           ArvadosBaseTestCase):
243
244     def setUp(self):
245         super(ArvPutUploadJobTest, self).setUp()
246         run_test_server.authorize_with('active')
247         # Temp files creation
248         self.tempdir = tempfile.mkdtemp()
249         subdir = os.path.join(self.tempdir, 'subdir')
250         os.mkdir(subdir)
251         data = "x" * 1024 # 1 KB
252         for i in range(1, 5):
253             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
254                 f.write(data * i)
255         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
256             f.write(data * 5)
257         # Large temp file for resume test
258         _, self.large_file_name = tempfile.mkstemp()
259         fileobj = open(self.large_file_name, 'w')
260         # Make sure to write just a little more than one block
261         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
262             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
263             fileobj.write(data)
264         fileobj.close()
265         # Temp dir containing small files to be repacked
266         self.small_files_dir = tempfile.mkdtemp()
267         data = 'y' * 1024 * 1024 # 1 MB
268         for i in range(1, 70):
269             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
270                 f.write(data + str(i))
271         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
272         # Temp dir to hold a symlink to other temp dir
273         self.tempdir_with_symlink = tempfile.mkdtemp()
274         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir1'))
275         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir2'))
276
277     def tearDown(self):
278         super(ArvPutUploadJobTest, self).tearDown()
279         shutil.rmtree(self.tempdir)
280         os.unlink(self.large_file_name)
281         shutil.rmtree(self.small_files_dir)
282         shutil.rmtree(self.tempdir_with_symlink)
283
284     def test_symlinks_are_followed_by_default(self):
285         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
286         cwriter.start(save_collection=False)
287         self.assertIn('linkeddir1', cwriter.manifest_text())
288         cwriter.destroy_cache()
289
290     def test_symlinks_are_followed_only_once(self):
291         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
292                                           follow_links=True)
293         cwriter.start(save_collection=False)
294         self.assertIn('linkeddir1', cwriter.manifest_text())
295         self.assertNotIn('linkeddir2', cwriter.manifest_text())
296         cwriter.destroy_cache()
297
298     def test_symlinks_are_not_followed_when_requested(self):
299         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
300                                           follow_links=False)
301         cwriter.start(save_collection=False)
302         self.assertNotIn('linkeddir1', cwriter.manifest_text())
303         self.assertNotIn('linkeddir2', cwriter.manifest_text())
304         cwriter.destroy_cache()
305
306     def test_passing_nonexistant_path_raise_exception(self):
307         uuid_str = str(uuid.uuid4())
308         cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
309         with self.assertRaises(arv_put.PathDoesNotExistError):
310             cwriter.start(save_collection=False)
311
312     def test_writer_works_without_cache(self):
313         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
314         cwriter.start(save_collection=False)
315         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
316
317     def test_writer_works_with_cache(self):
318         with tempfile.NamedTemporaryFile() as f:
319             f.write('foo')
320             f.flush()
321             cwriter = arv_put.ArvPutUploadJob([f.name])
322             cwriter.start(save_collection=False)
323             self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
324             # Don't destroy the cache, and start another upload
325             cwriter_new = arv_put.ArvPutUploadJob([f.name])
326             cwriter_new.start(save_collection=False)
327             cwriter_new.destroy_cache()
328             self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
329
330     def make_progress_tester(self):
331         progression = []
332         def record_func(written, expected):
333             progression.append((written, expected))
334         return progression, record_func
335
336     def test_progress_reporting(self):
337         with tempfile.NamedTemporaryFile() as f:
338             f.write('foo')
339             f.flush()
340             for expect_count in (None, 8):
341                 progression, reporter = self.make_progress_tester()
342                 cwriter = arv_put.ArvPutUploadJob([f.name],
343                     reporter=reporter, bytes_expected=expect_count)
344                 cwriter.start(save_collection=False)
345                 cwriter.destroy_cache()
346                 self.assertIn((3, expect_count), progression)
347
348     def test_writer_upload_directory(self):
349         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
350         cwriter.start(save_collection=False)
351         cwriter.destroy_cache()
352         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
353
354     def test_resume_large_file_upload(self):
355         def wrapped_write(*args, **kwargs):
356             data = args[1]
357             # Exit only on last block
358             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
359                 # Simulate a checkpoint before quitting. Ensure block commit.
360                 self.writer._update(final=True)
361                 raise SystemExit("Simulated error")
362             return self.arvfile_write(*args, **kwargs)
363
364         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
365                         autospec=True) as mocked_write:
366             mocked_write.side_effect = wrapped_write
367             writer = arv_put.ArvPutUploadJob([self.large_file_name],
368                                              replication_desired=1)
369             # We'll be accessing from inside the wrapper
370             self.writer = writer
371             with self.assertRaises(SystemExit):
372                 writer.start(save_collection=False)
373             # Confirm that the file was partially uploaded
374             self.assertGreater(writer.bytes_written, 0)
375             self.assertLess(writer.bytes_written,
376                             os.path.getsize(self.large_file_name))
377         # Retry the upload
378         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
379                                           replication_desired=1)
380         writer2.start(save_collection=False)
381         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
382                          os.path.getsize(self.large_file_name))
383         writer2.destroy_cache()
384         del(self.writer)
385
386     # Test for bug #11002
387     def test_graceful_exit_while_repacking_small_blocks(self):
388         def wrapped_commit(*args, **kwargs):
389             raise SystemExit("Simulated error")
390
391         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
392                         autospec=True) as mocked_commit:
393             mocked_commit.side_effect = wrapped_commit
394             # Upload a little more than 1 block, wrapped_commit will make the first block
395             # commit to fail.
396             # arv-put should not exit with an exception by trying to commit the collection
397             # as it's in an inconsistent state.
398             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
399                                              replication_desired=1)
400             try:
401                 with self.assertRaises(SystemExit):
402                     writer.start(save_collection=False)
403             except arvados.arvfile.UnownedBlockError:
404                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
405         writer.destroy_cache()
406
407     def test_no_resume_when_asked(self):
408         def wrapped_write(*args, **kwargs):
409             data = args[1]
410             # Exit only on last block
411             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
412                 # Simulate a checkpoint before quitting.
413                 self.writer._update()
414                 raise SystemExit("Simulated error")
415             return self.arvfile_write(*args, **kwargs)
416
417         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
418                         autospec=True) as mocked_write:
419             mocked_write.side_effect = wrapped_write
420             writer = arv_put.ArvPutUploadJob([self.large_file_name],
421                                              replication_desired=1)
422             # We'll be accessing from inside the wrapper
423             self.writer = writer
424             with self.assertRaises(SystemExit):
425                 writer.start(save_collection=False)
426             # Confirm that the file was partially uploaded
427             self.assertGreater(writer.bytes_written, 0)
428             self.assertLess(writer.bytes_written,
429                             os.path.getsize(self.large_file_name))
430         # Retry the upload, this time without resume
431         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
432                                           replication_desired=1,
433                                           resume=False)
434         writer2.start(save_collection=False)
435         self.assertEqual(writer2.bytes_skipped, 0)
436         self.assertEqual(writer2.bytes_written,
437                          os.path.getsize(self.large_file_name))
438         writer2.destroy_cache()
439         del(self.writer)
440
441     def test_no_resume_when_no_cache(self):
442         def wrapped_write(*args, **kwargs):
443             data = args[1]
444             # Exit only on last block
445             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
446                 # Simulate a checkpoint before quitting.
447                 self.writer._update()
448                 raise SystemExit("Simulated error")
449             return self.arvfile_write(*args, **kwargs)
450
451         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
452                         autospec=True) as mocked_write:
453             mocked_write.side_effect = wrapped_write
454             writer = arv_put.ArvPutUploadJob([self.large_file_name],
455                                              replication_desired=1)
456             # We'll be accessing from inside the wrapper
457             self.writer = writer
458             with self.assertRaises(SystemExit):
459                 writer.start(save_collection=False)
460             # Confirm that the file was partially uploaded
461             self.assertGreater(writer.bytes_written, 0)
462             self.assertLess(writer.bytes_written,
463                             os.path.getsize(self.large_file_name))
464         # Retry the upload, this time without cache usage
465         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
466                                           replication_desired=1,
467                                           resume=False,
468                                           use_cache=False)
469         writer2.start(save_collection=False)
470         self.assertEqual(writer2.bytes_skipped, 0)
471         self.assertEqual(writer2.bytes_written,
472                          os.path.getsize(self.large_file_name))
473         writer2.destroy_cache()
474         del(self.writer)
475
476     def test_dry_run_feature(self):
477         def wrapped_write(*args, **kwargs):
478             data = args[1]
479             # Exit only on last block
480             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
481                 # Simulate a checkpoint before quitting.
482                 self.writer._update()
483                 raise SystemExit("Simulated error")
484             return self.arvfile_write(*args, **kwargs)
485
486         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
487                         autospec=True) as mocked_write:
488             mocked_write.side_effect = wrapped_write
489             writer = arv_put.ArvPutUploadJob([self.large_file_name],
490                                              replication_desired=1)
491             # We'll be accessing from inside the wrapper
492             self.writer = writer
493             with self.assertRaises(SystemExit):
494                 writer.start(save_collection=False)
495             # Confirm that the file was partially uploaded
496             self.assertGreater(writer.bytes_written, 0)
497             self.assertLess(writer.bytes_written,
498                             os.path.getsize(self.large_file_name))
499         # Retry the upload using dry_run to check if there is a pending upload
500         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
501                                           replication_desired=1,
502                                           dry_run=True)
503         with self.assertRaises(arv_put.ArvPutUploadIsPending):
504             writer2.start(save_collection=False)
505         # Complete the pending upload
506         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
507                                           replication_desired=1)
508         writer3.start(save_collection=False)
509         # Confirm there's no pending upload with dry_run=True
510         writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
511                                           replication_desired=1,
512                                           dry_run=True)
513         with self.assertRaises(arv_put.ArvPutUploadNotPending):
514             writer4.start(save_collection=False)
515         writer4.destroy_cache()
516         # Test obvious cases
517         with self.assertRaises(arv_put.ArvPutUploadIsPending):
518             arv_put.ArvPutUploadJob([self.large_file_name],
519                                     replication_desired=1,
520                                     dry_run=True,
521                                     resume=False,
522                                     use_cache=False)
523         with self.assertRaises(arv_put.ArvPutUploadIsPending):
524             arv_put.ArvPutUploadJob([self.large_file_name],
525                                     replication_desired=1,
526                                     dry_run=True,
527                                     resume=False)
528         del(self.writer)
529
530 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
531     TEST_SIZE = os.path.getsize(__file__)
532
533     def test_expected_bytes_for_file(self):
534         self.assertEqual(self.TEST_SIZE,
535                           arv_put.expected_bytes_for([__file__]))
536
537     def test_expected_bytes_for_tree(self):
538         tree = self.make_tmpdir()
539         shutil.copyfile(__file__, os.path.join(tree, 'one'))
540         shutil.copyfile(__file__, os.path.join(tree, 'two'))
541         self.assertEqual(self.TEST_SIZE * 2,
542                           arv_put.expected_bytes_for([tree]))
543         self.assertEqual(self.TEST_SIZE * 3,
544                           arv_put.expected_bytes_for([tree, __file__]))
545
546     def test_expected_bytes_for_device(self):
547         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
548         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
549
550
551 class ArvadosPutReportTest(ArvadosBaseTestCase):
552     def test_machine_progress(self):
553         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
554             expect = ": {} written {} total\n".format(
555                 count, -1 if (total is None) else total)
556             self.assertTrue(
557                 arv_put.machine_progress(count, total).endswith(expect))
558
559     def test_known_human_progress(self):
560         for count, total in [(0, 1), (2, 4), (45, 60)]:
561             expect = '{:.1%}'.format(float(count) / total)
562             actual = arv_put.human_progress(count, total)
563             self.assertTrue(actual.startswith('\r'))
564             self.assertIn(expect, actual)
565
566     def test_unknown_human_progress(self):
567         for count in [1, 20, 300, 4000, 50000]:
568             self.assertTrue(re.search(r'\b{}\b'.format(count),
569                                       arv_put.human_progress(count, None)))
570
571
572 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
573     MAIN_SERVER = {}
574     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
575
576     def call_main_with_args(self, args):
577         self.main_stdout = StringIO()
578         self.main_stderr = StringIO()
579         return arv_put.main(args, self.main_stdout, self.main_stderr)
580
581     def call_main_on_test_file(self, args=[]):
582         with self.make_test_file() as testfile:
583             path = testfile.name
584             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
585         self.assertTrue(
586             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
587                                         '098f6bcd4621d373cade4e832627b4f6')),
588             "did not find file stream in Keep store")
589
590     def setUp(self):
591         super(ArvadosPutTest, self).setUp()
592         run_test_server.authorize_with('active')
593         arv_put.api_client = None
594
595     def tearDown(self):
596         for outbuf in ['main_stdout', 'main_stderr']:
597             if hasattr(self, outbuf):
598                 getattr(self, outbuf).close()
599                 delattr(self, outbuf)
600         super(ArvadosPutTest, self).tearDown()
601
602     def test_version_argument(self):
603         err = io.BytesIO()
604         out = io.BytesIO()
605         with tutil.redirected_streams(stdout=out, stderr=err):
606             with self.assertRaises(SystemExit):
607                 self.call_main_with_args(['--version'])
608         self.assertEqual(out.getvalue(), '')
609         self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
610
611     def test_simple_file_put(self):
612         self.call_main_on_test_file()
613
614     def test_put_with_unwriteable_cache_dir(self):
615         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
616         cachedir = self.make_tmpdir()
617         os.chmod(cachedir, 0o0)
618         arv_put.ResumeCache.CACHE_DIR = cachedir
619         try:
620             self.call_main_on_test_file()
621         finally:
622             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
623             os.chmod(cachedir, 0o700)
624
625     def test_put_with_unwritable_cache_subdir(self):
626         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
627         cachedir = self.make_tmpdir()
628         os.chmod(cachedir, 0o0)
629         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
630         try:
631             self.call_main_on_test_file()
632         finally:
633             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
634             os.chmod(cachedir, 0o700)
635
636     def test_put_block_replication(self):
637         self.call_main_on_test_file()
638         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
639             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
640             self.call_main_on_test_file(['--replication', '1'])
641             self.call_main_on_test_file(['--replication', '4'])
642             self.call_main_on_test_file(['--replication', '5'])
643             self.assertEqual(
644                 [x[-1].get('copies') for x in put_mock.call_args_list],
645                 [1, 4, 5])
646
647     def test_normalize(self):
648         testfile1 = self.make_test_file()
649         testfile2 = self.make_test_file()
650         test_paths = [testfile1.name, testfile2.name]
651         # Reverse-sort the paths, so normalization must change their order.
652         test_paths.sort(reverse=True)
653         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
654                                  test_paths)
655         manifest = self.main_stdout.getvalue()
656         # Assert the second file we specified appears first in the manifest.
657         file_indices = [manifest.find(':' + os.path.basename(path))
658                         for path in test_paths]
659         self.assertGreater(*file_indices)
660
661     def test_error_name_without_collection(self):
662         self.assertRaises(SystemExit, self.call_main_with_args,
663                           ['--name', 'test without Collection',
664                            '--stream', '/dev/null'])
665
666     def test_error_when_project_not_found(self):
667         self.assertRaises(SystemExit,
668                           self.call_main_with_args,
669                           ['--project-uuid', self.Z_UUID])
670
671     def test_error_bad_project_uuid(self):
672         self.assertRaises(SystemExit,
673                           self.call_main_with_args,
674                           ['--project-uuid', self.Z_UUID, '--stream'])
675
676     def test_api_error_handling(self):
677         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
678         coll_save_mock.side_effect = arvados.errors.ApiError(
679             fake_httplib2_response(403), '{}')
680         with mock.patch('arvados.collection.Collection.save_new',
681                         new=coll_save_mock):
682             with self.assertRaises(SystemExit) as exc_test:
683                 self.call_main_with_args(['/dev/null'])
684             self.assertLess(0, exc_test.exception.args[0])
685             self.assertLess(0, coll_save_mock.call_count)
686             self.assertEqual("", self.main_stdout.getvalue())
687
688
689 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
690                             ArvadosBaseTestCase):
691     def _getKeepServerConfig():
692         for config_file, mandatory in [
693                 ['application.yml', False], ['application.default.yml', True]]:
694             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
695                                 "api", "config", config_file)
696             if not mandatory and not os.path.exists(path):
697                 continue
698             with open(path) as f:
699                 rails_config = yaml.load(f.read())
700                 for config_section in ['test', 'common']:
701                     try:
702                         key = rails_config[config_section]["blob_signing_key"]
703                     except (KeyError, TypeError):
704                         pass
705                     else:
706                         return {'blob_signing_key': key,
707                                 'enforce_permissions': True}
708         return {'blog_signing_key': None, 'enforce_permissions': False}
709
710     MAIN_SERVER = {}
711     KEEP_SERVER = _getKeepServerConfig()
712     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
713
714     @classmethod
715     def setUpClass(cls):
716         super(ArvPutIntegrationTest, cls).setUpClass()
717         cls.ENVIRON = os.environ.copy()
718         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
719
720     def setUp(self):
721         super(ArvPutIntegrationTest, self).setUp()
722         arv_put.api_client = None
723
724     def authorize_with(self, token_name):
725         run_test_server.authorize_with(token_name)
726         for v in ["ARVADOS_API_HOST",
727                   "ARVADOS_API_HOST_INSECURE",
728                   "ARVADOS_API_TOKEN"]:
729             self.ENVIRON[v] = arvados.config.settings()[v]
730         arv_put.api_client = arvados.api('v1')
731
732     def current_user(self):
733         return arv_put.api_client.users().current().execute()
734
735     def test_check_real_project_found(self):
736         self.authorize_with('active')
737         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
738                         "did not correctly find test fixture project")
739
740     def test_check_error_finding_nonexistent_uuid(self):
741         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
742         self.authorize_with('active')
743         try:
744             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
745                                                   0)
746         except ValueError as error:
747             self.assertIn(BAD_UUID, error.message)
748         else:
749             self.assertFalse(result, "incorrectly found nonexistent project")
750
751     def test_check_error_finding_nonexistent_project(self):
752         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
753         self.authorize_with('active')
754         with self.assertRaises(apiclient.errors.HttpError):
755             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
756                                                   0)
757
758     def test_short_put_from_stdin(self):
759         # Have to run this as an integration test since arv-put can't
760         # read from the tests' stdin.
761         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
762         # case, because the /proc entry is already gone by the time it tries.
763         pipe = subprocess.Popen(
764             [sys.executable, arv_put.__file__, '--stream'],
765             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
766             stderr=subprocess.STDOUT, env=self.ENVIRON)
767         pipe.stdin.write('stdin test\n')
768         pipe.stdin.close()
769         deadline = time.time() + 5
770         while (pipe.poll() is None) and (time.time() < deadline):
771             time.sleep(.1)
772         returncode = pipe.poll()
773         if returncode is None:
774             pipe.terminate()
775             self.fail("arv-put did not PUT from stdin within 5 seconds")
776         elif returncode != 0:
777             sys.stdout.write(pipe.stdout.read())
778             self.fail("arv-put returned exit code {}".format(returncode))
779         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
780
781     def test_ArvPutSignedManifest(self):
782         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
783         # the newly created manifest from the API server, testing to confirm
784         # that the block locators in the returned manifest are signed.
785         self.authorize_with('active')
786
787         # Before doing anything, demonstrate that the collection
788         # we're about to create is not present in our test fixture.
789         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
790         with self.assertRaises(apiclient.errors.HttpError):
791             notfound = arv_put.api_client.collections().get(
792                 uuid=manifest_uuid).execute()
793
794         datadir = self.make_tmpdir()
795         with open(os.path.join(datadir, "foo"), "w") as f:
796             f.write("The quick brown fox jumped over the lazy dog")
797         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
798                              stdout=subprocess.PIPE, env=self.ENVIRON)
799         (arvout, arverr) = p.communicate()
800         self.assertEqual(arverr, None)
801         self.assertEqual(p.returncode, 0)
802
803         # The manifest text stored in the API server under the same
804         # manifest UUID must use signed locators.
805         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
806         self.assertRegexpMatches(
807             c['manifest_text'],
808             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
809
810         os.remove(os.path.join(datadir, "foo"))
811         os.rmdir(datadir)
812
813     def run_and_find_collection(self, text, extra_args=[]):
814         self.authorize_with('active')
815         pipe = subprocess.Popen(
816             [sys.executable, arv_put.__file__] + extra_args,
817             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
818             stderr=subprocess.PIPE, env=self.ENVIRON)
819         stdout, stderr = pipe.communicate(text)
820         search_key = ('portable_data_hash'
821                       if '--portable-data-hash' in extra_args else 'uuid')
822         collection_list = arvados.api('v1').collections().list(
823             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
824         self.assertEqual(1, len(collection_list))
825         return collection_list[0]
826
827     def test_put_collection_with_later_update(self):
828         tmpdir = self.make_tmpdir()
829         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
830             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
831         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
832         self.assertNotEqual(None, col['uuid'])
833         # Add a new file to the directory
834         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
835             f.write('The quick brown fox jumped over the lazy dog')
836         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
837         self.assertEqual(col['uuid'], updated_col['uuid'])
838         # Get the manifest and check that the new file is being included
839         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
840         self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
841
842     def test_put_collection_with_high_redundancy(self):
843         # Write empty data: we're not testing CollectionWriter, just
844         # making sure collections.create tells the API server what our
845         # desired replication level is.
846         collection = self.run_and_find_collection("", ['--replication', '4'])
847         self.assertEqual(4, collection['replication_desired'])
848
849     def test_put_collection_with_default_redundancy(self):
850         collection = self.run_and_find_collection("")
851         self.assertEqual(None, collection['replication_desired'])
852
853     def test_put_collection_with_unnamed_project_link(self):
854         link = self.run_and_find_collection(
855             "Test unnamed collection",
856             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
857         username = pwd.getpwuid(os.getuid()).pw_name
858         self.assertRegexpMatches(
859             link['name'],
860             r'^Saved at .* by {}@'.format(re.escape(username)))
861
862     def test_put_collection_with_name_and_no_project(self):
863         link_name = 'Test Collection Link in home project'
864         collection = self.run_and_find_collection(
865             "Test named collection in home project",
866             ['--portable-data-hash', '--name', link_name])
867         self.assertEqual(link_name, collection['name'])
868         my_user_uuid = self.current_user()['uuid']
869         self.assertEqual(my_user_uuid, collection['owner_uuid'])
870
871     def test_put_collection_with_named_project_link(self):
872         link_name = 'Test auto Collection Link'
873         collection = self.run_and_find_collection("Test named collection",
874                                       ['--portable-data-hash',
875                                        '--name', link_name,
876                                        '--project-uuid', self.PROJECT_UUID])
877         self.assertEqual(link_name, collection['name'])
878
879
880 if __name__ == '__main__':
881     unittest.main()