Merge branch 'master' into 9998-unsigned_manifest
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import io
6 import mock
7 import os
8 import pwd
9 import re
10 import shutil
11 import subprocess
12 import sys
13 import tempfile
14 import time
15 import unittest
16 import yaml
17 import threading
18 import hashlib
19 import random
20
21 from cStringIO import StringIO
22
23 import arvados
24 import arvados.commands.put as arv_put
25 import arvados_testutil as tutil
26
27 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
28 import run_test_server
29
30 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
31     CACHE_ARGSET = [
32         [],
33         ['/dev/null'],
34         ['/dev/null', '--filename', 'empty'],
35         ['/tmp']
36         ]
37
38     def tearDown(self):
39         super(ArvadosPutResumeCacheTest, self).tearDown()
40         try:
41             self.last_cache.destroy()
42         except AttributeError:
43             pass
44
45     def cache_path_from_arglist(self, arglist):
46         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
47
48     def test_cache_names_stable(self):
49         for argset in self.CACHE_ARGSET:
50             self.assertEqual(self.cache_path_from_arglist(argset),
51                               self.cache_path_from_arglist(argset),
52                               "cache name changed for {}".format(argset))
53
54     def test_cache_names_unique(self):
55         results = []
56         for argset in self.CACHE_ARGSET:
57             path = self.cache_path_from_arglist(argset)
58             self.assertNotIn(path, results)
59             results.append(path)
60
61     def test_cache_names_simple(self):
62         # The goal here is to make sure the filename doesn't use characters
63         # reserved by the filesystem.  Feel free to adjust this regexp as
64         # long as it still does that.
65         bad_chars = re.compile(r'[^-\.\w]')
66         for argset in self.CACHE_ARGSET:
67             path = self.cache_path_from_arglist(argset)
68             self.assertFalse(bad_chars.search(os.path.basename(path)),
69                              "path too exotic: {}".format(path))
70
71     def test_cache_names_ignore_argument_order(self):
72         self.assertEqual(
73             self.cache_path_from_arglist(['a', 'b', 'c']),
74             self.cache_path_from_arglist(['c', 'a', 'b']))
75         self.assertEqual(
76             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
77             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
78
79     def test_cache_names_differ_for_similar_paths(self):
80         # This test needs names at / that don't exist on the real filesystem.
81         self.assertNotEqual(
82             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
83             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
84
85     def test_cache_names_ignore_irrelevant_arguments(self):
86         # Workaround: parse_arguments bails on --filename with a directory.
87         path1 = self.cache_path_from_arglist(['/tmp'])
88         args = arv_put.parse_arguments(['/tmp'])
89         args.filename = 'tmp'
90         path2 = arv_put.ResumeCache.make_path(args)
91         self.assertEqual(path1, path2,
92                          "cache path considered --filename for directory")
93         self.assertEqual(
94             self.cache_path_from_arglist(['-']),
95             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
96             "cache path considered --max-manifest-depth for file")
97
98     def test_cache_names_treat_negative_manifest_depths_identically(self):
99         base_args = ['/tmp', '--max-manifest-depth']
100         self.assertEqual(
101             self.cache_path_from_arglist(base_args + ['-1']),
102             self.cache_path_from_arglist(base_args + ['-2']))
103
104     def test_cache_names_treat_stdin_consistently(self):
105         self.assertEqual(
106             self.cache_path_from_arglist(['-', '--filename', 'test']),
107             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
108
109     def test_cache_names_identical_for_synonymous_names(self):
110         self.assertEqual(
111             self.cache_path_from_arglist(['.']),
112             self.cache_path_from_arglist([os.path.realpath('.')]))
113         testdir = self.make_tmpdir()
114         looplink = os.path.join(testdir, 'loop')
115         os.symlink(testdir, looplink)
116         self.assertEqual(
117             self.cache_path_from_arglist([testdir]),
118             self.cache_path_from_arglist([looplink]))
119
120     def test_cache_names_different_by_api_host(self):
121         config = arvados.config.settings()
122         orig_host = config.get('ARVADOS_API_HOST')
123         try:
124             name1 = self.cache_path_from_arglist(['.'])
125             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
126             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
127         finally:
128             if orig_host is None:
129                 del config['ARVADOS_API_HOST']
130             else:
131                 config['ARVADOS_API_HOST'] = orig_host
132
133     @mock.patch('arvados.keep.KeepClient.head')
134     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
135         keep_client_head.side_effect = [True]
136         thing = {}
137         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
138         with tempfile.NamedTemporaryFile() as cachefile:
139             self.last_cache = arv_put.ResumeCache(cachefile.name)
140         self.last_cache.save(thing)
141         self.last_cache.close()
142         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
143         self.assertNotEqual(None, resume_cache)
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_finished_streams(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
159         keep_client_head.side_effect = Exception('Locator not found')
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168         self.assertRaises(None, resume_cache.check_cache())
169
170     def test_basic_cache_storage(self):
171         thing = ['test', 'list']
172         with tempfile.NamedTemporaryFile() as cachefile:
173             self.last_cache = arv_put.ResumeCache(cachefile.name)
174         self.last_cache.save(thing)
175         self.assertEqual(thing, self.last_cache.load())
176
177     def test_empty_cache(self):
178         with tempfile.NamedTemporaryFile() as cachefile:
179             cache = arv_put.ResumeCache(cachefile.name)
180         self.assertRaises(ValueError, cache.load)
181
182     def test_cache_persistent(self):
183         thing = ['test', 'list']
184         path = os.path.join(self.make_tmpdir(), 'cache')
185         cache = arv_put.ResumeCache(path)
186         cache.save(thing)
187         cache.close()
188         self.last_cache = arv_put.ResumeCache(path)
189         self.assertEqual(thing, self.last_cache.load())
190
191     def test_multiple_cache_writes(self):
192         thing = ['short', 'list']
193         with tempfile.NamedTemporaryFile() as cachefile:
194             self.last_cache = arv_put.ResumeCache(cachefile.name)
195         # Start writing an object longer than the one we test, to make
196         # sure the cache file gets truncated.
197         self.last_cache.save(['long', 'long', 'list'])
198         self.last_cache.save(thing)
199         self.assertEqual(thing, self.last_cache.load())
200
201     def test_cache_is_locked(self):
202         with tempfile.NamedTemporaryFile() as cachefile:
203             cache = arv_put.ResumeCache(cachefile.name)
204             self.assertRaises(arv_put.ResumeCacheConflict,
205                               arv_put.ResumeCache, cachefile.name)
206
207     def test_cache_stays_locked(self):
208         with tempfile.NamedTemporaryFile() as cachefile:
209             self.last_cache = arv_put.ResumeCache(cachefile.name)
210             path = cachefile.name
211         self.last_cache.save('test')
212         self.assertRaises(arv_put.ResumeCacheConflict,
213                           arv_put.ResumeCache, path)
214
215     def test_destroy_cache(self):
216         cachefile = tempfile.NamedTemporaryFile(delete=False)
217         try:
218             cache = arv_put.ResumeCache(cachefile.name)
219             cache.save('test')
220             cache.destroy()
221             try:
222                 arv_put.ResumeCache(cachefile.name)
223             except arv_put.ResumeCacheConflict:
224                 self.fail("could not load cache after destroying it")
225             self.assertRaises(ValueError, cache.load)
226         finally:
227             if os.path.exists(cachefile.name):
228                 os.unlink(cachefile.name)
229
230     def test_restart_cache(self):
231         path = os.path.join(self.make_tmpdir(), 'cache')
232         cache = arv_put.ResumeCache(path)
233         cache.save('test')
234         cache.restart()
235         self.assertRaises(ValueError, cache.load)
236         self.assertRaises(arv_put.ResumeCacheConflict,
237                           arv_put.ResumeCache, path)
238
239
240 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
241                           ArvadosBaseTestCase):
242
243     def setUp(self):
244         super(ArvPutUploadJobTest, self).setUp()
245         run_test_server.authorize_with('active')
246         # Temp files creation
247         self.tempdir = tempfile.mkdtemp()
248         subdir = os.path.join(self.tempdir, 'subdir')
249         os.mkdir(subdir)
250         data = "x" * 1024 # 1 KB
251         for i in range(1, 5):
252             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
253                 f.write(data * i)
254         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
255             f.write(data * 5)
256         # Large temp file for resume test
257         _, self.large_file_name = tempfile.mkstemp()
258         fileobj = open(self.large_file_name, 'w')
259         # Make sure to write just a little more than one block
260         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
261             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
262             fileobj.write(data)
263         fileobj.close()
264         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
265
266     def tearDown(self):
267         super(ArvPutUploadJobTest, self).tearDown()
268         shutil.rmtree(self.tempdir)
269         os.unlink(self.large_file_name)
270
271     def test_writer_works_without_cache(self):
272         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
273         cwriter.start(save_collection=False)
274         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
275
276     def test_writer_works_with_cache(self):
277         with tempfile.NamedTemporaryFile() as f:
278             f.write('foo')
279             f.flush()
280             cwriter = arv_put.ArvPutUploadJob([f.name])
281             cwriter.start(save_collection=False)
282             self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
283             # Don't destroy the cache, and start another upload
284             cwriter_new = arv_put.ArvPutUploadJob([f.name])
285             cwriter_new.start(save_collection=False)
286             cwriter_new.destroy_cache()
287             self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
288
289     def make_progress_tester(self):
290         progression = []
291         def record_func(written, expected):
292             progression.append((written, expected))
293         return progression, record_func
294
295     def test_progress_reporting(self):
296         with tempfile.NamedTemporaryFile() as f:
297             f.write('foo')
298             f.flush()
299             for expect_count in (None, 8):
300                 progression, reporter = self.make_progress_tester()
301                 cwriter = arv_put.ArvPutUploadJob([f.name],
302                     reporter=reporter, bytes_expected=expect_count)
303                 cwriter.start(save_collection=False)
304                 cwriter.destroy_cache()
305                 self.assertIn((3, expect_count), progression)
306
307     def test_writer_upload_directory(self):
308         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
309         cwriter.start(save_collection=False)
310         cwriter.destroy_cache()
311         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
312
313     def test_resume_large_file_upload(self):
314         def wrapped_write(*args, **kwargs):
315             data = args[1]
316             # Exit only on last block
317             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
318                 raise SystemExit("Simulated error")
319             return self.arvfile_write(*args, **kwargs)
320
321         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
322                         autospec=True) as mocked_write:
323             mocked_write.side_effect = wrapped_write
324             writer = arv_put.ArvPutUploadJob([self.large_file_name],
325                                              replication_desired=1)
326             with self.assertRaises(SystemExit):
327                 writer.start(save_collection=False)
328             # Confirm that the file was partially uploaded
329             self.assertGreater(writer.bytes_written, 0)
330             self.assertLess(writer.bytes_written,
331                             os.path.getsize(self.large_file_name))
332         # Retry the upload
333         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
334                                           replication_desired=1)
335         writer2.start(save_collection=False)
336         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
337                          os.path.getsize(self.large_file_name))
338         writer2.destroy_cache()
339
340     def test_no_resume_when_asked(self):
341         def wrapped_write(*args, **kwargs):
342             data = args[1]
343             # Exit only on last block
344             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
345                 raise SystemExit("Simulated error")
346             return self.arvfile_write(*args, **kwargs)
347
348         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
349                         autospec=True) as mocked_write:
350             mocked_write.side_effect = wrapped_write
351             writer = arv_put.ArvPutUploadJob([self.large_file_name],
352                                              replication_desired=1)
353             with self.assertRaises(SystemExit):
354                 writer.start(save_collection=False)
355             # Confirm that the file was partially uploaded
356             self.assertGreater(writer.bytes_written, 0)
357             self.assertLess(writer.bytes_written,
358                             os.path.getsize(self.large_file_name))
359         # Retry the upload, this time without resume
360         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
361                                           replication_desired=1,
362                                           resume=False)
363         writer2.start(save_collection=False)
364         self.assertEqual(writer2.bytes_skipped, 0)
365         self.assertEqual(writer2.bytes_written,
366                          os.path.getsize(self.large_file_name))
367         writer2.destroy_cache()
368
369     def test_no_resume_when_no_cache(self):
370         def wrapped_write(*args, **kwargs):
371             data = args[1]
372             # Exit only on last block
373             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
374                 raise SystemExit("Simulated error")
375             return self.arvfile_write(*args, **kwargs)
376
377         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
378                         autospec=True) as mocked_write:
379             mocked_write.side_effect = wrapped_write
380             writer = arv_put.ArvPutUploadJob([self.large_file_name],
381                                              replication_desired=1)
382             with self.assertRaises(SystemExit):
383                 writer.start(save_collection=False)
384             # Confirm that the file was partially uploaded
385             self.assertGreater(writer.bytes_written, 0)
386             self.assertLess(writer.bytes_written,
387                             os.path.getsize(self.large_file_name))
388         # Retry the upload, this time without cache usage
389         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
390                                           replication_desired=1,
391                                           resume=False,
392                                           use_cache=False)
393         writer2.start(save_collection=False)
394         self.assertEqual(writer2.bytes_skipped, 0)
395         self.assertEqual(writer2.bytes_written,
396                          os.path.getsize(self.large_file_name))
397         writer2.destroy_cache()
398
399
400     def test_dry_run_feature(self):
401         def wrapped_write(*args, **kwargs):
402             data = args[1]
403             # Exit only on last block
404             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
405                 raise SystemExit("Simulated error")
406             return self.arvfile_write(*args, **kwargs)
407
408         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
409                         autospec=True) as mocked_write:
410             mocked_write.side_effect = wrapped_write
411             writer = arv_put.ArvPutUploadJob([self.large_file_name],
412                                              replication_desired=1)
413             with self.assertRaises(SystemExit):
414                 writer.start(save_collection=False)
415             # Confirm that the file was partially uploaded
416             self.assertGreater(writer.bytes_written, 0)
417             self.assertLess(writer.bytes_written,
418                             os.path.getsize(self.large_file_name))
419         # Retry the upload using dry_run to check if there is a pending upload
420         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
421                                           replication_desired=1,
422                                           dry_run=True)
423         with self.assertRaises(arv_put.ArvPutUploadIsPending):
424             writer2.start(save_collection=False)
425         # Complete the pending upload
426         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
427                                           replication_desired=1)
428         writer3.start(save_collection=False)
429         # Confirm there's no pending upload with dry_run=True
430         writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
431                                           replication_desired=1,
432                                           dry_run=True)
433         with self.assertRaises(arv_put.ArvPutUploadNotPending):
434             writer4.start(save_collection=False)
435         writer4.destroy_cache()
436         # Test obvious cases
437         with self.assertRaises(arv_put.ArvPutUploadIsPending):
438             arv_put.ArvPutUploadJob([self.large_file_name],
439                                     replication_desired=1,
440                                     dry_run=True,
441                                     resume=False,
442                                     use_cache=False)
443         with self.assertRaises(arv_put.ArvPutUploadIsPending):
444             arv_put.ArvPutUploadJob([self.large_file_name],
445                                     replication_desired=1,
446                                     dry_run=True,
447                                     resume=False)
448
449
450 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
451     TEST_SIZE = os.path.getsize(__file__)
452
453     def test_expected_bytes_for_file(self):
454         self.assertEqual(self.TEST_SIZE,
455                           arv_put.expected_bytes_for([__file__]))
456
457     def test_expected_bytes_for_tree(self):
458         tree = self.make_tmpdir()
459         shutil.copyfile(__file__, os.path.join(tree, 'one'))
460         shutil.copyfile(__file__, os.path.join(tree, 'two'))
461         self.assertEqual(self.TEST_SIZE * 2,
462                           arv_put.expected_bytes_for([tree]))
463         self.assertEqual(self.TEST_SIZE * 3,
464                           arv_put.expected_bytes_for([tree, __file__]))
465
466     def test_expected_bytes_for_device(self):
467         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
468         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
469
470
471 class ArvadosPutReportTest(ArvadosBaseTestCase):
472     def test_machine_progress(self):
473         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
474             expect = ": {} written {} total\n".format(
475                 count, -1 if (total is None) else total)
476             self.assertTrue(
477                 arv_put.machine_progress(count, total).endswith(expect))
478
479     def test_known_human_progress(self):
480         for count, total in [(0, 1), (2, 4), (45, 60)]:
481             expect = '{:.1%}'.format(float(count) / total)
482             actual = arv_put.human_progress(count, total)
483             self.assertTrue(actual.startswith('\r'))
484             self.assertIn(expect, actual)
485
486     def test_unknown_human_progress(self):
487         for count in [1, 20, 300, 4000, 50000]:
488             self.assertTrue(re.search(r'\b{}\b'.format(count),
489                                       arv_put.human_progress(count, None)))
490
491
492 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
493     MAIN_SERVER = {}
494     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
495
496     def call_main_with_args(self, args):
497         self.main_stdout = StringIO()
498         self.main_stderr = StringIO()
499         return arv_put.main(args, self.main_stdout, self.main_stderr)
500
501     def call_main_on_test_file(self, args=[]):
502         with self.make_test_file() as testfile:
503             path = testfile.name
504             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
505         self.assertTrue(
506             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
507                                         '098f6bcd4621d373cade4e832627b4f6')),
508             "did not find file stream in Keep store")
509
510     def setUp(self):
511         super(ArvadosPutTest, self).setUp()
512         run_test_server.authorize_with('active')
513         arv_put.api_client = None
514
515     def tearDown(self):
516         for outbuf in ['main_stdout', 'main_stderr']:
517             if hasattr(self, outbuf):
518                 getattr(self, outbuf).close()
519                 delattr(self, outbuf)
520         super(ArvadosPutTest, self).tearDown()
521
522     def test_version_argument(self):
523         err = io.BytesIO()
524         out = io.BytesIO()
525         with tutil.redirected_streams(stdout=out, stderr=err):
526             with self.assertRaises(SystemExit):
527                 self.call_main_with_args(['--version'])
528         self.assertEqual(out.getvalue(), '')
529         self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
530
531     def test_simple_file_put(self):
532         self.call_main_on_test_file()
533
534     def test_put_with_unwriteable_cache_dir(self):
535         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
536         cachedir = self.make_tmpdir()
537         os.chmod(cachedir, 0o0)
538         arv_put.ResumeCache.CACHE_DIR = cachedir
539         try:
540             self.call_main_on_test_file()
541         finally:
542             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
543             os.chmod(cachedir, 0o700)
544
545     def test_put_with_unwritable_cache_subdir(self):
546         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
547         cachedir = self.make_tmpdir()
548         os.chmod(cachedir, 0o0)
549         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
550         try:
551             self.call_main_on_test_file()
552         finally:
553             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
554             os.chmod(cachedir, 0o700)
555
556     def test_put_block_replication(self):
557         self.call_main_on_test_file()
558         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
559             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
560             self.call_main_on_test_file(['--replication', '1'])
561             self.call_main_on_test_file(['--replication', '4'])
562             self.call_main_on_test_file(['--replication', '5'])
563             self.assertEqual(
564                 [x[-1].get('copies') for x in put_mock.call_args_list],
565                 [1, 4, 5])
566
567     def test_normalize(self):
568         testfile1 = self.make_test_file()
569         testfile2 = self.make_test_file()
570         test_paths = [testfile1.name, testfile2.name]
571         # Reverse-sort the paths, so normalization must change their order.
572         test_paths.sort(reverse=True)
573         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
574                                  test_paths)
575         manifest = self.main_stdout.getvalue()
576         # Assert the second file we specified appears first in the manifest.
577         file_indices = [manifest.find(':' + os.path.basename(path))
578                         for path in test_paths]
579         self.assertGreater(*file_indices)
580
581     def test_error_name_without_collection(self):
582         self.assertRaises(SystemExit, self.call_main_with_args,
583                           ['--name', 'test without Collection',
584                            '--stream', '/dev/null'])
585
586     def test_error_when_project_not_found(self):
587         self.assertRaises(SystemExit,
588                           self.call_main_with_args,
589                           ['--project-uuid', self.Z_UUID])
590
591     def test_error_bad_project_uuid(self):
592         self.assertRaises(SystemExit,
593                           self.call_main_with_args,
594                           ['--project-uuid', self.Z_UUID, '--stream'])
595
596     def test_api_error_handling(self):
597         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
598         coll_save_mock.side_effect = arvados.errors.ApiError(
599             fake_httplib2_response(403), '{}')
600         with mock.patch('arvados.collection.Collection.save_new',
601                         new=coll_save_mock):
602             with self.assertRaises(SystemExit) as exc_test:
603                 self.call_main_with_args(['/dev/null'])
604             self.assertLess(0, exc_test.exception.args[0])
605             self.assertLess(0, coll_save_mock.call_count)
606             self.assertEqual("", self.main_stdout.getvalue())
607
608
609 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
610                             ArvadosBaseTestCase):
611     def _getKeepServerConfig():
612         for config_file, mandatory in [
613                 ['application.yml', False], ['application.default.yml', True]]:
614             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
615                                 "api", "config", config_file)
616             if not mandatory and not os.path.exists(path):
617                 continue
618             with open(path) as f:
619                 rails_config = yaml.load(f.read())
620                 for config_section in ['test', 'common']:
621                     try:
622                         key = rails_config[config_section]["blob_signing_key"]
623                     except (KeyError, TypeError):
624                         pass
625                     else:
626                         return {'blob_signing_key': key,
627                                 'enforce_permissions': True}
628         return {'blog_signing_key': None, 'enforce_permissions': False}
629
630     MAIN_SERVER = {}
631     KEEP_SERVER = _getKeepServerConfig()
632     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
633
634     @classmethod
635     def setUpClass(cls):
636         super(ArvPutIntegrationTest, cls).setUpClass()
637         cls.ENVIRON = os.environ.copy()
638         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
639
640     def setUp(self):
641         super(ArvPutIntegrationTest, self).setUp()
642         arv_put.api_client = None
643
644     def authorize_with(self, token_name):
645         run_test_server.authorize_with(token_name)
646         for v in ["ARVADOS_API_HOST",
647                   "ARVADOS_API_HOST_INSECURE",
648                   "ARVADOS_API_TOKEN"]:
649             self.ENVIRON[v] = arvados.config.settings()[v]
650         arv_put.api_client = arvados.api('v1')
651
652     def current_user(self):
653         return arv_put.api_client.users().current().execute()
654
655     def test_check_real_project_found(self):
656         self.authorize_with('active')
657         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
658                         "did not correctly find test fixture project")
659
660     def test_check_error_finding_nonexistent_uuid(self):
661         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
662         self.authorize_with('active')
663         try:
664             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
665                                                   0)
666         except ValueError as error:
667             self.assertIn(BAD_UUID, error.message)
668         else:
669             self.assertFalse(result, "incorrectly found nonexistent project")
670
671     def test_check_error_finding_nonexistent_project(self):
672         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
673         self.authorize_with('active')
674         with self.assertRaises(apiclient.errors.HttpError):
675             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
676                                                   0)
677
678     def test_short_put_from_stdin(self):
679         # Have to run this as an integration test since arv-put can't
680         # read from the tests' stdin.
681         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
682         # case, because the /proc entry is already gone by the time it tries.
683         pipe = subprocess.Popen(
684             [sys.executable, arv_put.__file__, '--stream'],
685             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
686             stderr=subprocess.STDOUT, env=self.ENVIRON)
687         pipe.stdin.write('stdin test\n')
688         pipe.stdin.close()
689         deadline = time.time() + 5
690         while (pipe.poll() is None) and (time.time() < deadline):
691             time.sleep(.1)
692         returncode = pipe.poll()
693         if returncode is None:
694             pipe.terminate()
695             self.fail("arv-put did not PUT from stdin within 5 seconds")
696         elif returncode != 0:
697             sys.stdout.write(pipe.stdout.read())
698             self.fail("arv-put returned exit code {}".format(returncode))
699         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
700
701     def test_ArvPutSignedManifest(self):
702         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
703         # the newly created manifest from the API server, testing to confirm
704         # that the block locators in the returned manifest are signed.
705         self.authorize_with('active')
706
707         # Before doing anything, demonstrate that the collection
708         # we're about to create is not present in our test fixture.
709         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
710         with self.assertRaises(apiclient.errors.HttpError):
711             notfound = arv_put.api_client.collections().get(
712                 uuid=manifest_uuid).execute()
713
714         datadir = self.make_tmpdir()
715         with open(os.path.join(datadir, "foo"), "w") as f:
716             f.write("The quick brown fox jumped over the lazy dog")
717         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
718                              stdout=subprocess.PIPE, env=self.ENVIRON)
719         (arvout, arverr) = p.communicate()
720         self.assertEqual(arverr, None)
721         self.assertEqual(p.returncode, 0)
722
723         # The manifest text stored in the API server under the same
724         # manifest UUID must use signed locators.
725         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
726         self.assertRegexpMatches(
727             c['manifest_text'],
728             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
729
730         os.remove(os.path.join(datadir, "foo"))
731         os.rmdir(datadir)
732
733     def run_and_find_collection(self, text, extra_args=[]):
734         self.authorize_with('active')
735         pipe = subprocess.Popen(
736             [sys.executable, arv_put.__file__] + extra_args,
737             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
738             stderr=subprocess.PIPE, env=self.ENVIRON)
739         stdout, stderr = pipe.communicate(text)
740         search_key = ('portable_data_hash'
741                       if '--portable-data-hash' in extra_args else 'uuid')
742         collection_list = arvados.api('v1').collections().list(
743             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
744         self.assertEqual(1, len(collection_list))
745         return collection_list[0]
746
747     def test_put_collection_with_later_update(self):
748         tmpdir = self.make_tmpdir()
749         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
750             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
751         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
752         self.assertNotEqual(None, col['uuid'])
753         # Add a new file to the directory
754         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
755             f.write('The quick brown fox jumped over the lazy dog')
756         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
757         self.assertEqual(col['uuid'], updated_col['uuid'])
758         # Get the manifest and check that the new file is being included
759         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
760         self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
761
762     def test_put_collection_with_high_redundancy(self):
763         # Write empty data: we're not testing CollectionWriter, just
764         # making sure collections.create tells the API server what our
765         # desired replication level is.
766         collection = self.run_and_find_collection("", ['--replication', '4'])
767         self.assertEqual(4, collection['replication_desired'])
768
769     def test_put_collection_with_default_redundancy(self):
770         collection = self.run_and_find_collection("")
771         self.assertEqual(None, collection['replication_desired'])
772
773     def test_put_collection_with_unnamed_project_link(self):
774         link = self.run_and_find_collection(
775             "Test unnamed collection",
776             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
777         username = pwd.getpwuid(os.getuid()).pw_name
778         self.assertRegexpMatches(
779             link['name'],
780             r'^Saved at .* by {}@'.format(re.escape(username)))
781
782     def test_put_collection_with_name_and_no_project(self):
783         link_name = 'Test Collection Link in home project'
784         collection = self.run_and_find_collection(
785             "Test named collection in home project",
786             ['--portable-data-hash', '--name', link_name])
787         self.assertEqual(link_name, collection['name'])
788         my_user_uuid = self.current_user()['uuid']
789         self.assertEqual(my_user_uuid, collection['owner_uuid'])
790
791     def test_put_collection_with_named_project_link(self):
792         link_name = 'Test auto Collection Link'
793         collection = self.run_and_find_collection("Test named collection",
794                                       ['--portable-data-hash',
795                                        '--name', link_name,
796                                        '--project-uuid', self.PROJECT_UUID])
797         self.assertEqual(link_name, collection['name'])
798
799
800 if __name__ == '__main__':
801     unittest.main()