9463: Polishing the last details to make the integration tests work ok
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import mock
6 import os
7 import pwd
8 import re
9 import shutil
10 import subprocess
11 import multiprocessing
12 import sys
13 import tempfile
14 import time
15 import unittest
16 import yaml
17
18 from cStringIO import StringIO
19
20 import arvados
21 import arvados.commands.put as arv_put
22
23 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
24 import run_test_server
25
26 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
27     CACHE_ARGSET = [
28         [],
29         ['/dev/null'],
30         ['/dev/null', '--filename', 'empty'],
31         ['/tmp'],
32         ['/tmp', '--max-manifest-depth', '0'],
33         ['/tmp', '--max-manifest-depth', '1']
34         ]
35
36     def tearDown(self):
37         super(ArvadosPutResumeCacheTest, self).tearDown()
38         try:
39             self.last_cache.destroy()
40         except AttributeError:
41             pass
42
43     def cache_path_from_arglist(self, arglist):
44         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
45
46     def test_cache_names_stable(self):
47         for argset in self.CACHE_ARGSET:
48             self.assertEqual(self.cache_path_from_arglist(argset),
49                               self.cache_path_from_arglist(argset),
50                               "cache name changed for {}".format(argset))
51
52     def test_cache_names_unique(self):
53         results = []
54         for argset in self.CACHE_ARGSET:
55             path = self.cache_path_from_arglist(argset)
56             self.assertNotIn(path, results)
57             results.append(path)
58
59     def test_cache_names_simple(self):
60         # The goal here is to make sure the filename doesn't use characters
61         # reserved by the filesystem.  Feel free to adjust this regexp as
62         # long as it still does that.
63         bad_chars = re.compile(r'[^-\.\w]')
64         for argset in self.CACHE_ARGSET:
65             path = self.cache_path_from_arglist(argset)
66             self.assertFalse(bad_chars.search(os.path.basename(path)),
67                              "path too exotic: {}".format(path))
68
69     def test_cache_names_ignore_argument_order(self):
70         self.assertEqual(
71             self.cache_path_from_arglist(['a', 'b', 'c']),
72             self.cache_path_from_arglist(['c', 'a', 'b']))
73         self.assertEqual(
74             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
75             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
76
77     def test_cache_names_differ_for_similar_paths(self):
78         # This test needs names at / that don't exist on the real filesystem.
79         self.assertNotEqual(
80             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
81             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
82
83     def test_cache_names_ignore_irrelevant_arguments(self):
84         # Workaround: parse_arguments bails on --filename with a directory.
85         path1 = self.cache_path_from_arglist(['/tmp'])
86         args = arv_put.parse_arguments(['/tmp'])
87         args.filename = 'tmp'
88         path2 = arv_put.ResumeCache.make_path(args)
89         self.assertEqual(path1, path2,
90                          "cache path considered --filename for directory")
91         self.assertEqual(
92             self.cache_path_from_arglist(['-']),
93             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
94             "cache path considered --max-manifest-depth for file")
95
96     def test_cache_names_treat_negative_manifest_depths_identically(self):
97         base_args = ['/tmp', '--max-manifest-depth']
98         self.assertEqual(
99             self.cache_path_from_arglist(base_args + ['-1']),
100             self.cache_path_from_arglist(base_args + ['-2']))
101
102     def test_cache_names_treat_stdin_consistently(self):
103         self.assertEqual(
104             self.cache_path_from_arglist(['-', '--filename', 'test']),
105             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
106
107     def test_cache_names_identical_for_synonymous_names(self):
108         self.assertEqual(
109             self.cache_path_from_arglist(['.']),
110             self.cache_path_from_arglist([os.path.realpath('.')]))
111         testdir = self.make_tmpdir()
112         looplink = os.path.join(testdir, 'loop')
113         os.symlink(testdir, looplink)
114         self.assertEqual(
115             self.cache_path_from_arglist([testdir]),
116             self.cache_path_from_arglist([looplink]))
117
118     def test_cache_names_different_by_api_host(self):
119         config = arvados.config.settings()
120         orig_host = config.get('ARVADOS_API_HOST')
121         try:
122             name1 = self.cache_path_from_arglist(['.'])
123             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
124             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
125         finally:
126             if orig_host is None:
127                 del config['ARVADOS_API_HOST']
128             else:
129                 config['ARVADOS_API_HOST'] = orig_host
130
131     @mock.patch('arvados.keep.KeepClient.head')
132     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
133         keep_client_head.side_effect = [True]
134         thing = {}
135         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
136         with tempfile.NamedTemporaryFile() as cachefile:
137             self.last_cache = arv_put.ResumeCache(cachefile.name)
138         self.last_cache.save(thing)
139         self.last_cache.close()
140         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
141         self.assertNotEqual(None, resume_cache)
142
143     @mock.patch('arvados.keep.KeepClient.head')
144     def test_resume_cache_with_finished_streams(self, keep_client_head):
145         keep_client_head.side_effect = [True]
146         thing = {}
147         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
148         with tempfile.NamedTemporaryFile() as cachefile:
149             self.last_cache = arv_put.ResumeCache(cachefile.name)
150         self.last_cache.save(thing)
151         self.last_cache.close()
152         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
153         self.assertNotEqual(None, resume_cache)
154
155     @mock.patch('arvados.keep.KeepClient.head')
156     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
157         keep_client_head.side_effect = Exception('Locator not found')
158         thing = {}
159         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
160         with tempfile.NamedTemporaryFile() as cachefile:
161             self.last_cache = arv_put.ResumeCache(cachefile.name)
162         self.last_cache.save(thing)
163         self.last_cache.close()
164         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
165         self.assertNotEqual(None, resume_cache)
166         self.assertRaises(None, resume_cache.check_cache())
167
168     def test_basic_cache_storage(self):
169         thing = ['test', 'list']
170         with tempfile.NamedTemporaryFile() as cachefile:
171             self.last_cache = arv_put.ResumeCache(cachefile.name)
172         self.last_cache.save(thing)
173         self.assertEqual(thing, self.last_cache.load())
174
175     def test_empty_cache(self):
176         with tempfile.NamedTemporaryFile() as cachefile:
177             cache = arv_put.ResumeCache(cachefile.name)
178         self.assertRaises(ValueError, cache.load)
179
180     def test_cache_persistent(self):
181         thing = ['test', 'list']
182         path = os.path.join(self.make_tmpdir(), 'cache')
183         cache = arv_put.ResumeCache(path)
184         cache.save(thing)
185         cache.close()
186         self.last_cache = arv_put.ResumeCache(path)
187         self.assertEqual(thing, self.last_cache.load())
188
189     def test_multiple_cache_writes(self):
190         thing = ['short', 'list']
191         with tempfile.NamedTemporaryFile() as cachefile:
192             self.last_cache = arv_put.ResumeCache(cachefile.name)
193         # Start writing an object longer than the one we test, to make
194         # sure the cache file gets truncated.
195         self.last_cache.save(['long', 'long', 'list'])
196         self.last_cache.save(thing)
197         self.assertEqual(thing, self.last_cache.load())
198
199     def test_cache_is_locked(self):
200         with tempfile.NamedTemporaryFile() as cachefile:
201             cache = arv_put.ResumeCache(cachefile.name)
202             self.assertRaises(arv_put.ResumeCacheConflict,
203                               arv_put.ResumeCache, cachefile.name)
204
205     def test_cache_stays_locked(self):
206         with tempfile.NamedTemporaryFile() as cachefile:
207             self.last_cache = arv_put.ResumeCache(cachefile.name)
208             path = cachefile.name
209         self.last_cache.save('test')
210         self.assertRaises(arv_put.ResumeCacheConflict,
211                           arv_put.ResumeCache, path)
212
213     def test_destroy_cache(self):
214         cachefile = tempfile.NamedTemporaryFile(delete=False)
215         try:
216             cache = arv_put.ResumeCache(cachefile.name)
217             cache.save('test')
218             cache.destroy()
219             try:
220                 arv_put.ResumeCache(cachefile.name)
221             except arv_put.ResumeCacheConflict:
222                 self.fail("could not load cache after destroying it")
223             self.assertRaises(ValueError, cache.load)
224         finally:
225             if os.path.exists(cachefile.name):
226                 os.unlink(cachefile.name)
227
228     def test_restart_cache(self):
229         path = os.path.join(self.make_tmpdir(), 'cache')
230         cache = arv_put.ResumeCache(path)
231         cache.save('test')
232         cache.restart()
233         self.assertRaises(ValueError, cache.load)
234         self.assertRaises(arv_put.ResumeCacheConflict,
235                           arv_put.ResumeCache, path)
236
237
238 class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
239     MAIN_SERVER = {}
240     KEEP_SERVER = {}
241     import shutil
242         
243     # def test_write_files(self):
244     #     c = arv_put.ArvPutCollection()
245     #     data = 'a' * 1024 * 1024 # 1 MB
246     #     tmpdir = tempfile.mkdtemp()
247     #     for size in [1, 10, 64, 128]:
248     #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
249     #             for _ in range(size):
250     #                 f.write(data)
251     #         c.write_file(f.name, os.path.basename(f.name))
252     #     shutil.rmtree(tmpdir)
253     #     self.assertEqual(True, c.manifest())
254     #
255     # def test_write_directory(self):
256     #     data = 'b' * 1024 * 1024
257     #     tmpdir = tempfile.mkdtemp()
258     #     for size in [1, 5, 10, 70]:
259     #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
260     #             for _ in range(size):
261     #                 f.write(data)
262     #     os.mkdir(os.path.join(tmpdir, 'subdir1'))
263     #     for size in [2, 4, 6]:
264     #         with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
265     #             for _ in range(size):
266     #                 f.write(data)
267     #     c = arv_put.ArvPutUploader([tmpdir])
268     #     shutil.rmtree(tmpdir)
269     #     self.assertEqual(True, c.manifest())
270
271     def fake_reporter(self, written, expected):
272         # Use this callback as a intra-block pause to be able to simulate an interruption
273         print "Written %d / %d bytes" % (written, expected)
274         time.sleep(10)
275     
276     def bg_uploader(self, paths):
277         return arv_put.ArvPutUploader(paths, reporter=self.fake_reporter)
278
279     # def test_resume_large_file_upload(self):
280     #     import multiprocessing
281     #     data = 'x' * 1024 * 1024 # 1 MB
282     #     _, filename = tempfile.mkstemp()
283     #     fileobj = open(filename, 'w')
284     #     for _ in range(200):
285     #         fileobj.write(data)
286     #     fileobj.close()
287     #     uploader = multiprocessing.Process(target=self.bg_uploader, args=([filename],))
288     #     uploader.start()
289     #     time.sleep(5)
290     #     uploader.terminate()
291     #     time.sleep(1)
292     #     # cache = arv_put.ArvPutCollectionCache([filename])
293     #     # print "Collection detected: %s" % cache.collection()
294     #     # c = arv_put.ArvPutCollection(locator=cache.collection(), cache=cache)
295     #     # print "UPLOADED: %d" % c.collection[os.path.basename(filename)].size()
296     #     # self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename))
297     #     os.unlink(filename)
298
299     # def test_write_directory_twice(self):
300     #     data = 'b' * 1024 * 1024
301     #     tmpdir = tempfile.mkdtemp()
302     #     for size in [1, 5, 10, 70]:
303     #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
304     #             for _ in range(size):
305     #                 f.write(data)
306     #     os.mkdir(os.path.join(tmpdir, 'subdir1'))
307     #     for size in [2, 4, 6]:
308     #         with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
309     #             for _ in range(size):
310     #                 f.write(data)
311     #     c = arv_put.ArvPutUploader([tmpdir])
312     #     d = arv_put.ArvPutUploader([tmpdir])
313     #     print "ESCRIBIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
314     #     shutil.rmtree(tmpdir)
315     #     self.assertEqual(0, d.bytes_written())
316         
317
318 class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
319                                      ArvadosBaseTestCase):
320     def setUp(self):
321         super(ArvadosPutCollectionWriterTest, self).setUp()
322         run_test_server.authorize_with('active')
323         with tempfile.NamedTemporaryFile(delete=False) as cachefile:
324             self.cache = arv_put.ResumeCache(cachefile.name)
325             self.cache_filename = cachefile.name
326
327     def tearDown(self):
328         super(ArvadosPutCollectionWriterTest, self).tearDown()
329         if os.path.exists(self.cache_filename):
330             self.cache.destroy()
331         self.cache.close()
332
333     def test_writer_caches(self):
334         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
335         cwriter.write_file('/dev/null')
336         cwriter.cache_state()
337         self.assertTrue(self.cache.load())
338         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
339
340     def test_writer_works_without_cache(self):
341         cwriter = arv_put.ArvPutCollectionWriter()
342         cwriter.write_file('/dev/null')
343         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
344
345     def test_writer_resumes_from_cache(self):
346         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
347         with self.make_test_file() as testfile:
348             cwriter.write_file(testfile.name, 'test')
349             cwriter.cache_state()
350             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
351                 self.cache)
352             self.assertEqual(
353                 ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
354                 new_writer.manifest_text())
355
356     def test_new_writer_from_stale_cache(self):
357         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
358         with self.make_test_file() as testfile:
359             cwriter.write_file(testfile.name, 'test')
360         new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
361         new_writer.write_file('/dev/null')
362         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
363
364     def test_new_writer_from_empty_cache(self):
365         cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
366         cwriter.write_file('/dev/null')
367         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
368
369     def test_writer_resumable_after_arbitrary_bytes(self):
370         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
371         # These bytes are intentionally not valid UTF-8.
372         with self.make_test_file('\x00\x07\xe2') as testfile:
373             cwriter.write_file(testfile.name, 'test')
374             cwriter.cache_state()
375             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
376                 self.cache)
377         self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
378
379     def make_progress_tester(self):
380         progression = []
381         def record_func(written, expected):
382             progression.append((written, expected))
383         return progression, record_func
384
385     def test_progress_reporting(self):
386         for expect_count in (None, 8):
387             progression, reporter = self.make_progress_tester()
388             cwriter = arv_put.ArvPutCollectionWriter(
389                 reporter=reporter, bytes_expected=expect_count)
390             with self.make_test_file() as testfile:
391                 cwriter.write_file(testfile.name, 'test')
392             cwriter.finish_current_stream()
393             self.assertIn((4, expect_count), progression)
394
395     def test_resume_progress(self):
396         cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
397         with self.make_test_file() as testfile:
398             # Set up a writer with some flushed bytes.
399             cwriter.write_file(testfile.name, 'test')
400             cwriter.finish_current_stream()
401             cwriter.cache_state()
402             new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
403             self.assertEqual(new_writer.bytes_written, 4)
404
405
406 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
407     TEST_SIZE = os.path.getsize(__file__)
408
409     def test_expected_bytes_for_file(self):
410         self.assertEqual(self.TEST_SIZE,
411                           arv_put.expected_bytes_for([__file__]))
412
413     def test_expected_bytes_for_tree(self):
414         tree = self.make_tmpdir()
415         shutil.copyfile(__file__, os.path.join(tree, 'one'))
416         shutil.copyfile(__file__, os.path.join(tree, 'two'))
417         self.assertEqual(self.TEST_SIZE * 2,
418                           arv_put.expected_bytes_for([tree]))
419         self.assertEqual(self.TEST_SIZE * 3,
420                           arv_put.expected_bytes_for([tree, __file__]))
421
422     def test_expected_bytes_for_device(self):
423         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
424         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
425
426
427 class ArvadosPutReportTest(ArvadosBaseTestCase):
428     def test_machine_progress(self):
429         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
430             expect = ": {} written {} total\n".format(
431                 count, -1 if (total is None) else total)
432             self.assertTrue(
433                 arv_put.machine_progress(count, total).endswith(expect))
434
435     def test_known_human_progress(self):
436         for count, total in [(0, 1), (2, 4), (45, 60)]:
437             expect = '{:.1%}'.format(float(count) / total)
438             actual = arv_put.human_progress(count, total)
439             self.assertTrue(actual.startswith('\r'))
440             self.assertIn(expect, actual)
441
442     def test_unknown_human_progress(self):
443         for count in [1, 20, 300, 4000, 50000]:
444             self.assertTrue(re.search(r'\b{}\b'.format(count),
445                                       arv_put.human_progress(count, None)))
446
447
448 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
449     MAIN_SERVER = {}
450     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
451
452     def call_main_with_args(self, args):
453         self.main_stdout = StringIO()
454         self.main_stderr = StringIO()
455         return arv_put.main(args, self.main_stdout, self.main_stderr)
456
457     def call_main_on_test_file(self, args=[]):
458         with self.make_test_file() as testfile:
459             path = testfile.name
460             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
461         self.assertTrue(
462             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
463                                         '098f6bcd4621d373cade4e832627b4f6')),
464             "did not find file stream in Keep store")
465
466     def setUp(self):
467         super(ArvadosPutTest, self).setUp()
468         run_test_server.authorize_with('active')
469         arv_put.api_client = None
470
471     def tearDown(self):
472         for outbuf in ['main_stdout', 'main_stderr']:
473             if hasattr(self, outbuf):
474                 getattr(self, outbuf).close()
475                 delattr(self, outbuf)
476         super(ArvadosPutTest, self).tearDown()
477
478     def test_simple_file_put(self):
479         self.call_main_on_test_file()
480
481     def test_put_with_unwriteable_cache_dir(self):
482         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
483         cachedir = self.make_tmpdir()
484         os.chmod(cachedir, 0o0)
485         arv_put.ResumeCache.CACHE_DIR = cachedir
486         try:
487             self.call_main_on_test_file()
488         finally:
489             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
490             os.chmod(cachedir, 0o700)
491
492     def test_put_with_unwritable_cache_subdir(self):
493         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
494         cachedir = self.make_tmpdir()
495         os.chmod(cachedir, 0o0)
496         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
497         try:
498             self.call_main_on_test_file()
499         finally:
500             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
501             os.chmod(cachedir, 0o700)
502
503     def test_put_block_replication(self):
504         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
505              mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
506             cache_mock.side_effect = ValueError
507             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
508             self.call_main_on_test_file(['--replication', '1'])
509             self.call_main_on_test_file(['--replication', '4'])
510             self.call_main_on_test_file(['--replication', '5'])
511             self.assertEqual(
512                 [x[-1].get('copies') for x in put_mock.call_args_list],
513                 [1, 4, 5])
514
515     def test_normalize(self):
516         testfile1 = self.make_test_file()
517         testfile2 = self.make_test_file()
518         test_paths = [testfile1.name, testfile2.name]
519         # Reverse-sort the paths, so normalization must change their order.
520         test_paths.sort(reverse=True)
521         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
522                                  test_paths)
523         manifest = self.main_stdout.getvalue()
524         # Assert the second file we specified appears first in the manifest.
525         file_indices = [manifest.find(':' + os.path.basename(path))
526                         for path in test_paths]
527         self.assertGreater(*file_indices)
528
529     def test_error_name_without_collection(self):
530         self.assertRaises(SystemExit, self.call_main_with_args,
531                           ['--name', 'test without Collection',
532                            '--stream', '/dev/null'])
533
534     def test_error_when_project_not_found(self):
535         self.assertRaises(SystemExit,
536                           self.call_main_with_args,
537                           ['--project-uuid', self.Z_UUID])
538
539     def test_error_bad_project_uuid(self):
540         self.assertRaises(SystemExit,
541                           self.call_main_with_args,
542                           ['--project-uuid', self.Z_UUID, '--stream'])
543
544     def test_api_error_handling(self):
545         collections_mock = mock.Mock(name='arv.collections()')
546         coll_create_mock = collections_mock().create().execute
547         coll_create_mock.side_effect = arvados.errors.ApiError(
548             fake_httplib2_response(403), '{}')
549         arv_put.api_client = arvados.api('v1')
550         arv_put.api_client.collections = collections_mock
551         with self.assertRaises(SystemExit) as exc_test:
552             self.call_main_with_args(['/dev/null'])
553         self.assertLess(0, exc_test.exception.args[0])
554         self.assertLess(0, coll_create_mock.call_count)
555         self.assertEqual("", self.main_stdout.getvalue())
556
557
558 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
559                             ArvadosBaseTestCase):
560     def _getKeepServerConfig():
561         for config_file, mandatory in [
562                 ['application.yml', False], ['application.default.yml', True]]:
563             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
564                                 "api", "config", config_file)
565             if not mandatory and not os.path.exists(path):
566                 continue
567             with open(path) as f:
568                 rails_config = yaml.load(f.read())
569                 for config_section in ['test', 'common']:
570                     try:
571                         key = rails_config[config_section]["blob_signing_key"]
572                     except (KeyError, TypeError):
573                         pass
574                     else:
575                         return {'blob_signing_key': key,
576                                 'enforce_permissions': True}
577         return {'blog_signing_key': None, 'enforce_permissions': False}
578
579     MAIN_SERVER = {}
580     KEEP_SERVER = _getKeepServerConfig()
581     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
582
583     @classmethod
584     def setUpClass(cls):
585         super(ArvPutIntegrationTest, cls).setUpClass()
586         cls.ENVIRON = os.environ.copy()
587         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
588
589     def setUp(self):
590         super(ArvPutIntegrationTest, self).setUp()
591         arv_put.api_client = None
592
593     def authorize_with(self, token_name):
594         run_test_server.authorize_with(token_name)
595         for v in ["ARVADOS_API_HOST",
596                   "ARVADOS_API_HOST_INSECURE",
597                   "ARVADOS_API_TOKEN"]:
598             self.ENVIRON[v] = arvados.config.settings()[v]
599         arv_put.api_client = arvados.api('v1')
600
601     def current_user(self):
602         return arv_put.api_client.users().current().execute()
603
604     def test_check_real_project_found(self):
605         self.authorize_with('active')
606         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
607                         "did not correctly find test fixture project")
608
609     def test_check_error_finding_nonexistent_uuid(self):
610         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
611         self.authorize_with('active')
612         try:
613             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
614                                                   0)
615         except ValueError as error:
616             self.assertIn(BAD_UUID, error.message)
617         else:
618             self.assertFalse(result, "incorrectly found nonexistent project")
619
620     def test_check_error_finding_nonexistent_project(self):
621         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
622         self.authorize_with('active')
623         with self.assertRaises(apiclient.errors.HttpError):
624             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
625                                                   0)
626
627     def test_short_put_from_stdin(self):
628         # Have to run this as an integration test since arv-put can't
629         # read from the tests' stdin.
630         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
631         # case, because the /proc entry is already gone by the time it tries.
632         pipe = subprocess.Popen(
633             [sys.executable, arv_put.__file__, '--stream'],
634             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
635             stderr=subprocess.STDOUT, env=self.ENVIRON)
636         pipe.stdin.write('stdin test\n')
637         pipe.stdin.close()
638         deadline = time.time() + 5
639         while (pipe.poll() is None) and (time.time() < deadline):
640             time.sleep(.1)
641         returncode = pipe.poll()
642         if returncode is None:
643             pipe.terminate()
644             self.fail("arv-put did not PUT from stdin within 5 seconds")
645         elif returncode != 0:
646             sys.stdout.write(pipe.stdout.read())
647             self.fail("arv-put returned exit code {}".format(returncode))
648         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
649
650     def test_ArvPutSignedManifest(self):
651         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
652         # the newly created manifest from the API server, testing to confirm
653         # that the block locators in the returned manifest are signed.
654         self.authorize_with('active')
655
656         # Before doing anything, demonstrate that the collection
657         # we're about to create is not present in our test fixture.
658         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
659         with self.assertRaises(apiclient.errors.HttpError):
660             notfound = arv_put.api_client.collections().get(
661                 uuid=manifest_uuid).execute()
662
663         datadir = self.make_tmpdir()
664         with open(os.path.join(datadir, "foo"), "w") as f:
665             f.write("The quick brown fox jumped over the lazy dog")
666         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
667                              stdout=subprocess.PIPE, env=self.ENVIRON)
668         (arvout, arverr) = p.communicate()
669         self.assertEqual(arverr, None)
670         self.assertEqual(p.returncode, 0)
671
672         # The manifest text stored in the API server under the same
673         # manifest UUID must use signed locators.
674         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
675         self.assertRegexpMatches(
676             c['manifest_text'],
677             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
678
679         os.remove(os.path.join(datadir, "foo"))
680         os.rmdir(datadir)
681
682     def run_and_find_collection(self, text, extra_args=[]):
683         self.authorize_with('active')
684         pipe = subprocess.Popen(
685             [sys.executable, arv_put.__file__] + extra_args,
686             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
687             stderr=subprocess.PIPE, env=self.ENVIRON)
688         stdout, stderr = pipe.communicate(text)
689         search_key = ('portable_data_hash'
690                       if '--portable-data-hash' in extra_args else 'uuid')
691         collection_list = arvados.api('v1').collections().list(
692             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
693         self.assertEqual(1, len(collection_list))
694         return collection_list[0]
695
696     def test_put_collection_with_high_redundancy(self):
697         # Write empty data: we're not testing CollectionWriter, just
698         # making sure collections.create tells the API server what our
699         # desired replication level is.
700         collection = self.run_and_find_collection("", ['--replication', '4'])
701         self.assertEqual(4, collection['replication_desired'])
702
703     def test_put_collection_with_default_redundancy(self):
704         collection = self.run_and_find_collection("")
705         self.assertEqual(None, collection['replication_desired'])
706
707     def test_put_collection_with_unnamed_project_link(self):
708         link = self.run_and_find_collection(
709             "Test unnamed collection",
710             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
711         username = pwd.getpwuid(os.getuid()).pw_name
712         self.assertRegexpMatches(
713             link['name'],
714             r'^Saved at .* by {}@'.format(re.escape(username)))
715
716     def test_put_collection_with_name_and_no_project(self):
717         link_name = 'Test Collection Link in home project'
718         collection = self.run_and_find_collection(
719             "Test named collection in home project",
720             ['--portable-data-hash', '--name', link_name])
721         self.assertEqual(link_name, collection['name'])
722         my_user_uuid = self.current_user()['uuid']
723         self.assertEqual(my_user_uuid, collection['owner_uuid'])
724
725     def test_put_collection_with_named_project_link(self):
726         link_name = 'Test auto Collection Link'
727         collection = self.run_and_find_collection("Test named collection",
728                                       ['--portable-data-hash',
729                                        '--name', link_name,
730                                        '--project-uuid', self.PROJECT_UUID])
731         self.assertEqual(link_name, collection['name'])
732
733
734 if __name__ == '__main__':
735     unittest.main()