11002: Added missing assertion to test.
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import io
6 import mock
7 import os
8 import pwd
9 import re
10 import shutil
11 import subprocess
12 import sys
13 import tempfile
14 import time
15 import unittest
16 import yaml
17 import threading
18 import hashlib
19 import random
20
21 from cStringIO import StringIO
22
23 import arvados
24 import arvados.commands.put as arv_put
25 import arvados_testutil as tutil
26
27 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
28 import run_test_server
29
30 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
31     CACHE_ARGSET = [
32         [],
33         ['/dev/null'],
34         ['/dev/null', '--filename', 'empty'],
35         ['/tmp']
36         ]
37
38     def tearDown(self):
39         super(ArvadosPutResumeCacheTest, self).tearDown()
40         try:
41             self.last_cache.destroy()
42         except AttributeError:
43             pass
44
45     def cache_path_from_arglist(self, arglist):
46         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
47
48     def test_cache_names_stable(self):
49         for argset in self.CACHE_ARGSET:
50             self.assertEqual(self.cache_path_from_arglist(argset),
51                               self.cache_path_from_arglist(argset),
52                               "cache name changed for {}".format(argset))
53
54     def test_cache_names_unique(self):
55         results = []
56         for argset in self.CACHE_ARGSET:
57             path = self.cache_path_from_arglist(argset)
58             self.assertNotIn(path, results)
59             results.append(path)
60
61     def test_cache_names_simple(self):
62         # The goal here is to make sure the filename doesn't use characters
63         # reserved by the filesystem.  Feel free to adjust this regexp as
64         # long as it still does that.
65         bad_chars = re.compile(r'[^-\.\w]')
66         for argset in self.CACHE_ARGSET:
67             path = self.cache_path_from_arglist(argset)
68             self.assertFalse(bad_chars.search(os.path.basename(path)),
69                              "path too exotic: {}".format(path))
70
71     def test_cache_names_ignore_argument_order(self):
72         self.assertEqual(
73             self.cache_path_from_arglist(['a', 'b', 'c']),
74             self.cache_path_from_arglist(['c', 'a', 'b']))
75         self.assertEqual(
76             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
77             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
78
79     def test_cache_names_differ_for_similar_paths(self):
80         # This test needs names at / that don't exist on the real filesystem.
81         self.assertNotEqual(
82             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
83             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
84
85     def test_cache_names_ignore_irrelevant_arguments(self):
86         # Workaround: parse_arguments bails on --filename with a directory.
87         path1 = self.cache_path_from_arglist(['/tmp'])
88         args = arv_put.parse_arguments(['/tmp'])
89         args.filename = 'tmp'
90         path2 = arv_put.ResumeCache.make_path(args)
91         self.assertEqual(path1, path2,
92                          "cache path considered --filename for directory")
93         self.assertEqual(
94             self.cache_path_from_arglist(['-']),
95             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
96             "cache path considered --max-manifest-depth for file")
97
98     def test_cache_names_treat_negative_manifest_depths_identically(self):
99         base_args = ['/tmp', '--max-manifest-depth']
100         self.assertEqual(
101             self.cache_path_from_arglist(base_args + ['-1']),
102             self.cache_path_from_arglist(base_args + ['-2']))
103
104     def test_cache_names_treat_stdin_consistently(self):
105         self.assertEqual(
106             self.cache_path_from_arglist(['-', '--filename', 'test']),
107             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
108
109     def test_cache_names_identical_for_synonymous_names(self):
110         self.assertEqual(
111             self.cache_path_from_arglist(['.']),
112             self.cache_path_from_arglist([os.path.realpath('.')]))
113         testdir = self.make_tmpdir()
114         looplink = os.path.join(testdir, 'loop')
115         os.symlink(testdir, looplink)
116         self.assertEqual(
117             self.cache_path_from_arglist([testdir]),
118             self.cache_path_from_arglist([looplink]))
119
120     def test_cache_names_different_by_api_host(self):
121         config = arvados.config.settings()
122         orig_host = config.get('ARVADOS_API_HOST')
123         try:
124             name1 = self.cache_path_from_arglist(['.'])
125             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
126             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
127         finally:
128             if orig_host is None:
129                 del config['ARVADOS_API_HOST']
130             else:
131                 config['ARVADOS_API_HOST'] = orig_host
132
133     @mock.patch('arvados.keep.KeepClient.head')
134     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
135         keep_client_head.side_effect = [True]
136         thing = {}
137         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
138         with tempfile.NamedTemporaryFile() as cachefile:
139             self.last_cache = arv_put.ResumeCache(cachefile.name)
140         self.last_cache.save(thing)
141         self.last_cache.close()
142         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
143         self.assertNotEqual(None, resume_cache)
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_finished_streams(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
159         keep_client_head.side_effect = Exception('Locator not found')
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168         self.assertRaises(None, resume_cache.check_cache())
169
170     def test_basic_cache_storage(self):
171         thing = ['test', 'list']
172         with tempfile.NamedTemporaryFile() as cachefile:
173             self.last_cache = arv_put.ResumeCache(cachefile.name)
174         self.last_cache.save(thing)
175         self.assertEqual(thing, self.last_cache.load())
176
177     def test_empty_cache(self):
178         with tempfile.NamedTemporaryFile() as cachefile:
179             cache = arv_put.ResumeCache(cachefile.name)
180         self.assertRaises(ValueError, cache.load)
181
182     def test_cache_persistent(self):
183         thing = ['test', 'list']
184         path = os.path.join(self.make_tmpdir(), 'cache')
185         cache = arv_put.ResumeCache(path)
186         cache.save(thing)
187         cache.close()
188         self.last_cache = arv_put.ResumeCache(path)
189         self.assertEqual(thing, self.last_cache.load())
190
191     def test_multiple_cache_writes(self):
192         thing = ['short', 'list']
193         with tempfile.NamedTemporaryFile() as cachefile:
194             self.last_cache = arv_put.ResumeCache(cachefile.name)
195         # Start writing an object longer than the one we test, to make
196         # sure the cache file gets truncated.
197         self.last_cache.save(['long', 'long', 'list'])
198         self.last_cache.save(thing)
199         self.assertEqual(thing, self.last_cache.load())
200
201     def test_cache_is_locked(self):
202         with tempfile.NamedTemporaryFile() as cachefile:
203             cache = arv_put.ResumeCache(cachefile.name)
204             self.assertRaises(arv_put.ResumeCacheConflict,
205                               arv_put.ResumeCache, cachefile.name)
206
207     def test_cache_stays_locked(self):
208         with tempfile.NamedTemporaryFile() as cachefile:
209             self.last_cache = arv_put.ResumeCache(cachefile.name)
210             path = cachefile.name
211         self.last_cache.save('test')
212         self.assertRaises(arv_put.ResumeCacheConflict,
213                           arv_put.ResumeCache, path)
214
215     def test_destroy_cache(self):
216         cachefile = tempfile.NamedTemporaryFile(delete=False)
217         try:
218             cache = arv_put.ResumeCache(cachefile.name)
219             cache.save('test')
220             cache.destroy()
221             try:
222                 arv_put.ResumeCache(cachefile.name)
223             except arv_put.ResumeCacheConflict:
224                 self.fail("could not load cache after destroying it")
225             self.assertRaises(ValueError, cache.load)
226         finally:
227             if os.path.exists(cachefile.name):
228                 os.unlink(cachefile.name)
229
230     def test_restart_cache(self):
231         path = os.path.join(self.make_tmpdir(), 'cache')
232         cache = arv_put.ResumeCache(path)
233         cache.save('test')
234         cache.restart()
235         self.assertRaises(ValueError, cache.load)
236         self.assertRaises(arv_put.ResumeCacheConflict,
237                           arv_put.ResumeCache, path)
238
239
240 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
241                           ArvadosBaseTestCase):
242
243     def setUp(self):
244         super(ArvPutUploadJobTest, self).setUp()
245         run_test_server.authorize_with('active')
246         # Temp files creation
247         self.tempdir = tempfile.mkdtemp()
248         subdir = os.path.join(self.tempdir, 'subdir')
249         os.mkdir(subdir)
250         data = "x" * 1024 # 1 KB
251         for i in range(1, 5):
252             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
253                 f.write(data * i)
254         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
255             f.write(data * 5)
256         # Large temp file for resume test
257         _, self.large_file_name = tempfile.mkstemp()
258         fileobj = open(self.large_file_name, 'w')
259         # Make sure to write just a little more than one block
260         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
261             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
262             fileobj.write(data)
263         fileobj.close()
264         # Temp dir containing small files to be repacked
265         self.small_files_dir = tempfile.mkdtemp()
266         data = 'y' * 1024 * 1024 # 1 MB
267         for i in range(1, 70):
268             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
269                 f.write(data + str(i))
270         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
271
272     def tearDown(self):
273         super(ArvPutUploadJobTest, self).tearDown()
274         shutil.rmtree(self.tempdir)
275         os.unlink(self.large_file_name)
276         shutil.rmtree(self.small_files_dir)
277
278     def test_writer_works_without_cache(self):
279         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
280         cwriter.start(save_collection=False)
281         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
282
283     def test_writer_works_with_cache(self):
284         with tempfile.NamedTemporaryFile() as f:
285             f.write('foo')
286             f.flush()
287             cwriter = arv_put.ArvPutUploadJob([f.name])
288             cwriter.start(save_collection=False)
289             self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
290             # Don't destroy the cache, and start another upload
291             cwriter_new = arv_put.ArvPutUploadJob([f.name])
292             cwriter_new.start(save_collection=False)
293             cwriter_new.destroy_cache()
294             self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
295
296     def make_progress_tester(self):
297         progression = []
298         def record_func(written, expected):
299             progression.append((written, expected))
300         return progression, record_func
301
302     def test_progress_reporting(self):
303         with tempfile.NamedTemporaryFile() as f:
304             f.write('foo')
305             f.flush()
306             for expect_count in (None, 8):
307                 progression, reporter = self.make_progress_tester()
308                 cwriter = arv_put.ArvPutUploadJob([f.name],
309                     reporter=reporter, bytes_expected=expect_count)
310                 cwriter.start(save_collection=False)
311                 cwriter.destroy_cache()
312                 self.assertIn((3, expect_count), progression)
313
314     def test_writer_upload_directory(self):
315         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
316         cwriter.start(save_collection=False)
317         cwriter.destroy_cache()
318         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
319
320     def test_resume_large_file_upload(self):
321         def wrapped_write(*args, **kwargs):
322             data = args[1]
323             # Exit only on last block
324             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
325                 raise SystemExit("Simulated error")
326             return self.arvfile_write(*args, **kwargs)
327
328         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
329                         autospec=True) as mocked_write:
330             mocked_write.side_effect = wrapped_write
331             writer = arv_put.ArvPutUploadJob([self.large_file_name],
332                                              replication_desired=1)
333             with self.assertRaises(SystemExit):
334                 writer.start(save_collection=False)
335             # Confirm that the file was partially uploaded
336             self.assertGreater(writer.bytes_written, 0)
337             self.assertLess(writer.bytes_written,
338                             os.path.getsize(self.large_file_name))
339         # Retry the upload
340         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
341                                           replication_desired=1)
342         writer2.start(save_collection=False)
343         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
344                          os.path.getsize(self.large_file_name))
345         writer2.destroy_cache()
346
347     # Test for bug #11002
348     def test_graceful_exit_while_repacking_small_blocks(self):
349         def wrapped_commit(*args, **kwargs):
350             raise SystemExit("Simulated error")
351
352         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
353                         autospec=True) as mocked_commit:
354             mocked_commit.side_effect = wrapped_commit
355             # Upload a little more than 1 block, wrapped_commit will make the first block
356             # commit to fail.
357             # arv-put should not exit with an exception by trying to commit the collection
358             # as it's in an inconsistent state.
359             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
360                                              replication_desired=1)
361             try:
362                 with self.assertRaises(SystemExit):
363                     writer.start(save_collection=False)
364             except AttributeError:
365                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
366         writer.destroy_cache()
367
368     def test_no_resume_when_asked(self):
369         def wrapped_write(*args, **kwargs):
370             data = args[1]
371             # Exit only on last block
372             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
373                 raise SystemExit("Simulated error")
374             return self.arvfile_write(*args, **kwargs)
375
376         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
377                         autospec=True) as mocked_write:
378             mocked_write.side_effect = wrapped_write
379             writer = arv_put.ArvPutUploadJob([self.large_file_name],
380                                              replication_desired=1)
381             with self.assertRaises(SystemExit):
382                 writer.start(save_collection=False)
383             # Confirm that the file was partially uploaded
384             self.assertGreater(writer.bytes_written, 0)
385             self.assertLess(writer.bytes_written,
386                             os.path.getsize(self.large_file_name))
387         # Retry the upload, this time without resume
388         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
389                                           replication_desired=1,
390                                           resume=False)
391         writer2.start(save_collection=False)
392         self.assertEqual(writer2.bytes_skipped, 0)
393         self.assertEqual(writer2.bytes_written,
394                          os.path.getsize(self.large_file_name))
395         writer2.destroy_cache()
396
397     def test_no_resume_when_no_cache(self):
398         def wrapped_write(*args, **kwargs):
399             data = args[1]
400             # Exit only on last block
401             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
402                 raise SystemExit("Simulated error")
403             return self.arvfile_write(*args, **kwargs)
404
405         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
406                         autospec=True) as mocked_write:
407             mocked_write.side_effect = wrapped_write
408             writer = arv_put.ArvPutUploadJob([self.large_file_name],
409                                              replication_desired=1)
410             with self.assertRaises(SystemExit):
411                 writer.start(save_collection=False)
412             # Confirm that the file was partially uploaded
413             self.assertGreater(writer.bytes_written, 0)
414             self.assertLess(writer.bytes_written,
415                             os.path.getsize(self.large_file_name))
416         # Retry the upload, this time without cache usage
417         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
418                                           replication_desired=1,
419                                           resume=False,
420                                           use_cache=False)
421         writer2.start(save_collection=False)
422         self.assertEqual(writer2.bytes_skipped, 0)
423         self.assertEqual(writer2.bytes_written,
424                          os.path.getsize(self.large_file_name))
425         writer2.destroy_cache()
426
427
428     def test_dry_run_feature(self):
429         def wrapped_write(*args, **kwargs):
430             data = args[1]
431             # Exit only on last block
432             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
433                 raise SystemExit("Simulated error")
434             return self.arvfile_write(*args, **kwargs)
435
436         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
437                         autospec=True) as mocked_write:
438             mocked_write.side_effect = wrapped_write
439             writer = arv_put.ArvPutUploadJob([self.large_file_name],
440                                              replication_desired=1)
441             with self.assertRaises(SystemExit):
442                 writer.start(save_collection=False)
443             # Confirm that the file was partially uploaded
444             self.assertGreater(writer.bytes_written, 0)
445             self.assertLess(writer.bytes_written,
446                             os.path.getsize(self.large_file_name))
447         # Retry the upload using dry_run to check if there is a pending upload
448         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
449                                           replication_desired=1,
450                                           dry_run=True)
451         with self.assertRaises(arv_put.ArvPutUploadIsPending):
452             writer2.start(save_collection=False)
453         # Complete the pending upload
454         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
455                                           replication_desired=1)
456         writer3.start(save_collection=False)
457         # Confirm there's no pending upload with dry_run=True
458         writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
459                                           replication_desired=1,
460                                           dry_run=True)
461         with self.assertRaises(arv_put.ArvPutUploadNotPending):
462             writer4.start(save_collection=False)
463         writer4.destroy_cache()
464         # Test obvious cases
465         with self.assertRaises(arv_put.ArvPutUploadIsPending):
466             arv_put.ArvPutUploadJob([self.large_file_name],
467                                     replication_desired=1,
468                                     dry_run=True,
469                                     resume=False,
470                                     use_cache=False)
471         with self.assertRaises(arv_put.ArvPutUploadIsPending):
472             arv_put.ArvPutUploadJob([self.large_file_name],
473                                     replication_desired=1,
474                                     dry_run=True,
475                                     resume=False)
476
477
478 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
479     TEST_SIZE = os.path.getsize(__file__)
480
481     def test_expected_bytes_for_file(self):
482         self.assertEqual(self.TEST_SIZE,
483                           arv_put.expected_bytes_for([__file__]))
484
485     def test_expected_bytes_for_tree(self):
486         tree = self.make_tmpdir()
487         shutil.copyfile(__file__, os.path.join(tree, 'one'))
488         shutil.copyfile(__file__, os.path.join(tree, 'two'))
489         self.assertEqual(self.TEST_SIZE * 2,
490                           arv_put.expected_bytes_for([tree]))
491         self.assertEqual(self.TEST_SIZE * 3,
492                           arv_put.expected_bytes_for([tree, __file__]))
493
494     def test_expected_bytes_for_device(self):
495         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
496         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
497
498
499 class ArvadosPutReportTest(ArvadosBaseTestCase):
500     def test_machine_progress(self):
501         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
502             expect = ": {} written {} total\n".format(
503                 count, -1 if (total is None) else total)
504             self.assertTrue(
505                 arv_put.machine_progress(count, total).endswith(expect))
506
507     def test_known_human_progress(self):
508         for count, total in [(0, 1), (2, 4), (45, 60)]:
509             expect = '{:.1%}'.format(float(count) / total)
510             actual = arv_put.human_progress(count, total)
511             self.assertTrue(actual.startswith('\r'))
512             self.assertIn(expect, actual)
513
514     def test_unknown_human_progress(self):
515         for count in [1, 20, 300, 4000, 50000]:
516             self.assertTrue(re.search(r'\b{}\b'.format(count),
517                                       arv_put.human_progress(count, None)))
518
519
520 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
521     MAIN_SERVER = {}
522     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
523
524     def call_main_with_args(self, args):
525         self.main_stdout = StringIO()
526         self.main_stderr = StringIO()
527         return arv_put.main(args, self.main_stdout, self.main_stderr)
528
529     def call_main_on_test_file(self, args=[]):
530         with self.make_test_file() as testfile:
531             path = testfile.name
532             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
533         self.assertTrue(
534             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
535                                         '098f6bcd4621d373cade4e832627b4f6')),
536             "did not find file stream in Keep store")
537
538     def setUp(self):
539         super(ArvadosPutTest, self).setUp()
540         run_test_server.authorize_with('active')
541         arv_put.api_client = None
542
543     def tearDown(self):
544         for outbuf in ['main_stdout', 'main_stderr']:
545             if hasattr(self, outbuf):
546                 getattr(self, outbuf).close()
547                 delattr(self, outbuf)
548         super(ArvadosPutTest, self).tearDown()
549
550     def test_version_argument(self):
551         err = io.BytesIO()
552         out = io.BytesIO()
553         with tutil.redirected_streams(stdout=out, stderr=err):
554             with self.assertRaises(SystemExit):
555                 self.call_main_with_args(['--version'])
556         self.assertEqual(out.getvalue(), '')
557         self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
558
559     def test_simple_file_put(self):
560         self.call_main_on_test_file()
561
562     def test_put_with_unwriteable_cache_dir(self):
563         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
564         cachedir = self.make_tmpdir()
565         os.chmod(cachedir, 0o0)
566         arv_put.ResumeCache.CACHE_DIR = cachedir
567         try:
568             self.call_main_on_test_file()
569         finally:
570             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
571             os.chmod(cachedir, 0o700)
572
573     def test_put_with_unwritable_cache_subdir(self):
574         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
575         cachedir = self.make_tmpdir()
576         os.chmod(cachedir, 0o0)
577         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
578         try:
579             self.call_main_on_test_file()
580         finally:
581             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
582             os.chmod(cachedir, 0o700)
583
584     def test_put_block_replication(self):
585         self.call_main_on_test_file()
586         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
587             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
588             self.call_main_on_test_file(['--replication', '1'])
589             self.call_main_on_test_file(['--replication', '4'])
590             self.call_main_on_test_file(['--replication', '5'])
591             self.assertEqual(
592                 [x[-1].get('copies') for x in put_mock.call_args_list],
593                 [1, 4, 5])
594
595     def test_normalize(self):
596         testfile1 = self.make_test_file()
597         testfile2 = self.make_test_file()
598         test_paths = [testfile1.name, testfile2.name]
599         # Reverse-sort the paths, so normalization must change their order.
600         test_paths.sort(reverse=True)
601         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
602                                  test_paths)
603         manifest = self.main_stdout.getvalue()
604         # Assert the second file we specified appears first in the manifest.
605         file_indices = [manifest.find(':' + os.path.basename(path))
606                         for path in test_paths]
607         self.assertGreater(*file_indices)
608
609     def test_error_name_without_collection(self):
610         self.assertRaises(SystemExit, self.call_main_with_args,
611                           ['--name', 'test without Collection',
612                            '--stream', '/dev/null'])
613
614     def test_error_when_project_not_found(self):
615         self.assertRaises(SystemExit,
616                           self.call_main_with_args,
617                           ['--project-uuid', self.Z_UUID])
618
619     def test_error_bad_project_uuid(self):
620         self.assertRaises(SystemExit,
621                           self.call_main_with_args,
622                           ['--project-uuid', self.Z_UUID, '--stream'])
623
624     def test_api_error_handling(self):
625         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
626         coll_save_mock.side_effect = arvados.errors.ApiError(
627             fake_httplib2_response(403), '{}')
628         with mock.patch('arvados.collection.Collection.save_new',
629                         new=coll_save_mock):
630             with self.assertRaises(SystemExit) as exc_test:
631                 self.call_main_with_args(['/dev/null'])
632             self.assertLess(0, exc_test.exception.args[0])
633             self.assertLess(0, coll_save_mock.call_count)
634             self.assertEqual("", self.main_stdout.getvalue())
635
636
637 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
638                             ArvadosBaseTestCase):
639     def _getKeepServerConfig():
640         for config_file, mandatory in [
641                 ['application.yml', False], ['application.default.yml', True]]:
642             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
643                                 "api", "config", config_file)
644             if not mandatory and not os.path.exists(path):
645                 continue
646             with open(path) as f:
647                 rails_config = yaml.load(f.read())
648                 for config_section in ['test', 'common']:
649                     try:
650                         key = rails_config[config_section]["blob_signing_key"]
651                     except (KeyError, TypeError):
652                         pass
653                     else:
654                         return {'blob_signing_key': key,
655                                 'enforce_permissions': True}
656         return {'blog_signing_key': None, 'enforce_permissions': False}
657
658     MAIN_SERVER = {}
659     KEEP_SERVER = _getKeepServerConfig()
660     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
661
662     @classmethod
663     def setUpClass(cls):
664         super(ArvPutIntegrationTest, cls).setUpClass()
665         cls.ENVIRON = os.environ.copy()
666         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
667
668     def setUp(self):
669         super(ArvPutIntegrationTest, self).setUp()
670         arv_put.api_client = None
671
672     def authorize_with(self, token_name):
673         run_test_server.authorize_with(token_name)
674         for v in ["ARVADOS_API_HOST",
675                   "ARVADOS_API_HOST_INSECURE",
676                   "ARVADOS_API_TOKEN"]:
677             self.ENVIRON[v] = arvados.config.settings()[v]
678         arv_put.api_client = arvados.api('v1')
679
680     def current_user(self):
681         return arv_put.api_client.users().current().execute()
682
683     def test_check_real_project_found(self):
684         self.authorize_with('active')
685         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
686                         "did not correctly find test fixture project")
687
688     def test_check_error_finding_nonexistent_uuid(self):
689         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
690         self.authorize_with('active')
691         try:
692             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
693                                                   0)
694         except ValueError as error:
695             self.assertIn(BAD_UUID, error.message)
696         else:
697             self.assertFalse(result, "incorrectly found nonexistent project")
698
699     def test_check_error_finding_nonexistent_project(self):
700         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
701         self.authorize_with('active')
702         with self.assertRaises(apiclient.errors.HttpError):
703             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
704                                                   0)
705
706     def test_short_put_from_stdin(self):
707         # Have to run this as an integration test since arv-put can't
708         # read from the tests' stdin.
709         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
710         # case, because the /proc entry is already gone by the time it tries.
711         pipe = subprocess.Popen(
712             [sys.executable, arv_put.__file__, '--stream'],
713             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
714             stderr=subprocess.STDOUT, env=self.ENVIRON)
715         pipe.stdin.write('stdin test\n')
716         pipe.stdin.close()
717         deadline = time.time() + 5
718         while (pipe.poll() is None) and (time.time() < deadline):
719             time.sleep(.1)
720         returncode = pipe.poll()
721         if returncode is None:
722             pipe.terminate()
723             self.fail("arv-put did not PUT from stdin within 5 seconds")
724         elif returncode != 0:
725             sys.stdout.write(pipe.stdout.read())
726             self.fail("arv-put returned exit code {}".format(returncode))
727         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
728
729     def test_ArvPutSignedManifest(self):
730         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
731         # the newly created manifest from the API server, testing to confirm
732         # that the block locators in the returned manifest are signed.
733         self.authorize_with('active')
734
735         # Before doing anything, demonstrate that the collection
736         # we're about to create is not present in our test fixture.
737         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
738         with self.assertRaises(apiclient.errors.HttpError):
739             notfound = arv_put.api_client.collections().get(
740                 uuid=manifest_uuid).execute()
741
742         datadir = self.make_tmpdir()
743         with open(os.path.join(datadir, "foo"), "w") as f:
744             f.write("The quick brown fox jumped over the lazy dog")
745         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
746                              stdout=subprocess.PIPE, env=self.ENVIRON)
747         (arvout, arverr) = p.communicate()
748         self.assertEqual(arverr, None)
749         self.assertEqual(p.returncode, 0)
750
751         # The manifest text stored in the API server under the same
752         # manifest UUID must use signed locators.
753         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
754         self.assertRegexpMatches(
755             c['manifest_text'],
756             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
757
758         os.remove(os.path.join(datadir, "foo"))
759         os.rmdir(datadir)
760
761     def run_and_find_collection(self, text, extra_args=[]):
762         self.authorize_with('active')
763         pipe = subprocess.Popen(
764             [sys.executable, arv_put.__file__] + extra_args,
765             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
766             stderr=subprocess.PIPE, env=self.ENVIRON)
767         stdout, stderr = pipe.communicate(text)
768         search_key = ('portable_data_hash'
769                       if '--portable-data-hash' in extra_args else 'uuid')
770         collection_list = arvados.api('v1').collections().list(
771             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
772         self.assertEqual(1, len(collection_list))
773         return collection_list[0]
774
775     def test_put_collection_with_later_update(self):
776         tmpdir = self.make_tmpdir()
777         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
778             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
779         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
780         self.assertNotEqual(None, col['uuid'])
781         # Add a new file to the directory
782         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
783             f.write('The quick brown fox jumped over the lazy dog')
784         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
785         self.assertEqual(col['uuid'], updated_col['uuid'])
786         # Get the manifest and check that the new file is being included
787         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
788         self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
789
790     def test_put_collection_with_high_redundancy(self):
791         # Write empty data: we're not testing CollectionWriter, just
792         # making sure collections.create tells the API server what our
793         # desired replication level is.
794         collection = self.run_and_find_collection("", ['--replication', '4'])
795         self.assertEqual(4, collection['replication_desired'])
796
797     def test_put_collection_with_default_redundancy(self):
798         collection = self.run_and_find_collection("")
799         self.assertEqual(None, collection['replication_desired'])
800
801     def test_put_collection_with_unnamed_project_link(self):
802         link = self.run_and_find_collection(
803             "Test unnamed collection",
804             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
805         username = pwd.getpwuid(os.getuid()).pw_name
806         self.assertRegexpMatches(
807             link['name'],
808             r'^Saved at .* by {}@'.format(re.escape(username)))
809
810     def test_put_collection_with_name_and_no_project(self):
811         link_name = 'Test Collection Link in home project'
812         collection = self.run_and_find_collection(
813             "Test named collection in home project",
814             ['--portable-data-hash', '--name', link_name])
815         self.assertEqual(link_name, collection['name'])
816         my_user_uuid = self.current_user()['uuid']
817         self.assertEqual(my_user_uuid, collection['owner_uuid'])
818
819     def test_put_collection_with_named_project_link(self):
820         link_name = 'Test auto Collection Link'
821         collection = self.run_and_find_collection("Test named collection",
822                                       ['--portable-data-hash',
823                                        '--name', link_name,
824                                        '--project-uuid', self.PROJECT_UUID])
825         self.assertEqual(link_name, collection['name'])
826
827
828 if __name__ == '__main__':
829     unittest.main()