10383: Added 2 tests to check that no resuming take place when using either --no...
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import mock
6 import os
7 import pwd
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import tempfile
13 import time
14 import unittest
15 import yaml
16 import threading
17 import hashlib
18 import random
19
20 from cStringIO import StringIO
21
22 import arvados
23 import arvados.commands.put as arv_put
24 import arvados_testutil as tutil
25
26 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
27 import run_test_server
28
29 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
30     CACHE_ARGSET = [
31         [],
32         ['/dev/null'],
33         ['/dev/null', '--filename', 'empty'],
34         ['/tmp']
35         ]
36
37     def tearDown(self):
38         super(ArvadosPutResumeCacheTest, self).tearDown()
39         try:
40             self.last_cache.destroy()
41         except AttributeError:
42             pass
43
44     def cache_path_from_arglist(self, arglist):
45         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
46
47     def test_cache_names_stable(self):
48         for argset in self.CACHE_ARGSET:
49             self.assertEqual(self.cache_path_from_arglist(argset),
50                               self.cache_path_from_arglist(argset),
51                               "cache name changed for {}".format(argset))
52
53     def test_cache_names_unique(self):
54         results = []
55         for argset in self.CACHE_ARGSET:
56             path = self.cache_path_from_arglist(argset)
57             self.assertNotIn(path, results)
58             results.append(path)
59
60     def test_cache_names_simple(self):
61         # The goal here is to make sure the filename doesn't use characters
62         # reserved by the filesystem.  Feel free to adjust this regexp as
63         # long as it still does that.
64         bad_chars = re.compile(r'[^-\.\w]')
65         for argset in self.CACHE_ARGSET:
66             path = self.cache_path_from_arglist(argset)
67             self.assertFalse(bad_chars.search(os.path.basename(path)),
68                              "path too exotic: {}".format(path))
69
70     def test_cache_names_ignore_argument_order(self):
71         self.assertEqual(
72             self.cache_path_from_arglist(['a', 'b', 'c']),
73             self.cache_path_from_arglist(['c', 'a', 'b']))
74         self.assertEqual(
75             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
76             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
77
78     def test_cache_names_differ_for_similar_paths(self):
79         # This test needs names at / that don't exist on the real filesystem.
80         self.assertNotEqual(
81             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
82             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
83
84     def test_cache_names_ignore_irrelevant_arguments(self):
85         # Workaround: parse_arguments bails on --filename with a directory.
86         path1 = self.cache_path_from_arglist(['/tmp'])
87         args = arv_put.parse_arguments(['/tmp'])
88         args.filename = 'tmp'
89         path2 = arv_put.ResumeCache.make_path(args)
90         self.assertEqual(path1, path2,
91                          "cache path considered --filename for directory")
92         self.assertEqual(
93             self.cache_path_from_arglist(['-']),
94             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
95             "cache path considered --max-manifest-depth for file")
96
97     def test_cache_names_treat_negative_manifest_depths_identically(self):
98         base_args = ['/tmp', '--max-manifest-depth']
99         self.assertEqual(
100             self.cache_path_from_arglist(base_args + ['-1']),
101             self.cache_path_from_arglist(base_args + ['-2']))
102
103     def test_cache_names_treat_stdin_consistently(self):
104         self.assertEqual(
105             self.cache_path_from_arglist(['-', '--filename', 'test']),
106             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
107
108     def test_cache_names_identical_for_synonymous_names(self):
109         self.assertEqual(
110             self.cache_path_from_arglist(['.']),
111             self.cache_path_from_arglist([os.path.realpath('.')]))
112         testdir = self.make_tmpdir()
113         looplink = os.path.join(testdir, 'loop')
114         os.symlink(testdir, looplink)
115         self.assertEqual(
116             self.cache_path_from_arglist([testdir]),
117             self.cache_path_from_arglist([looplink]))
118
119     def test_cache_names_different_by_api_host(self):
120         config = arvados.config.settings()
121         orig_host = config.get('ARVADOS_API_HOST')
122         try:
123             name1 = self.cache_path_from_arglist(['.'])
124             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
125             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
126         finally:
127             if orig_host is None:
128                 del config['ARVADOS_API_HOST']
129             else:
130                 config['ARVADOS_API_HOST'] = orig_host
131
132     @mock.patch('arvados.keep.KeepClient.head')
133     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
134         keep_client_head.side_effect = [True]
135         thing = {}
136         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
137         with tempfile.NamedTemporaryFile() as cachefile:
138             self.last_cache = arv_put.ResumeCache(cachefile.name)
139         self.last_cache.save(thing)
140         self.last_cache.close()
141         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
142         self.assertNotEqual(None, resume_cache)
143
144     @mock.patch('arvados.keep.KeepClient.head')
145     def test_resume_cache_with_finished_streams(self, keep_client_head):
146         keep_client_head.side_effect = [True]
147         thing = {}
148         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
149         with tempfile.NamedTemporaryFile() as cachefile:
150             self.last_cache = arv_put.ResumeCache(cachefile.name)
151         self.last_cache.save(thing)
152         self.last_cache.close()
153         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
154         self.assertNotEqual(None, resume_cache)
155
156     @mock.patch('arvados.keep.KeepClient.head')
157     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
158         keep_client_head.side_effect = Exception('Locator not found')
159         thing = {}
160         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
161         with tempfile.NamedTemporaryFile() as cachefile:
162             self.last_cache = arv_put.ResumeCache(cachefile.name)
163         self.last_cache.save(thing)
164         self.last_cache.close()
165         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
166         self.assertNotEqual(None, resume_cache)
167         self.assertRaises(None, resume_cache.check_cache())
168
169     def test_basic_cache_storage(self):
170         thing = ['test', 'list']
171         with tempfile.NamedTemporaryFile() as cachefile:
172             self.last_cache = arv_put.ResumeCache(cachefile.name)
173         self.last_cache.save(thing)
174         self.assertEqual(thing, self.last_cache.load())
175
176     def test_empty_cache(self):
177         with tempfile.NamedTemporaryFile() as cachefile:
178             cache = arv_put.ResumeCache(cachefile.name)
179         self.assertRaises(ValueError, cache.load)
180
181     def test_cache_persistent(self):
182         thing = ['test', 'list']
183         path = os.path.join(self.make_tmpdir(), 'cache')
184         cache = arv_put.ResumeCache(path)
185         cache.save(thing)
186         cache.close()
187         self.last_cache = arv_put.ResumeCache(path)
188         self.assertEqual(thing, self.last_cache.load())
189
190     def test_multiple_cache_writes(self):
191         thing = ['short', 'list']
192         with tempfile.NamedTemporaryFile() as cachefile:
193             self.last_cache = arv_put.ResumeCache(cachefile.name)
194         # Start writing an object longer than the one we test, to make
195         # sure the cache file gets truncated.
196         self.last_cache.save(['long', 'long', 'list'])
197         self.last_cache.save(thing)
198         self.assertEqual(thing, self.last_cache.load())
199
200     def test_cache_is_locked(self):
201         with tempfile.NamedTemporaryFile() as cachefile:
202             cache = arv_put.ResumeCache(cachefile.name)
203             self.assertRaises(arv_put.ResumeCacheConflict,
204                               arv_put.ResumeCache, cachefile.name)
205
206     def test_cache_stays_locked(self):
207         with tempfile.NamedTemporaryFile() as cachefile:
208             self.last_cache = arv_put.ResumeCache(cachefile.name)
209             path = cachefile.name
210         self.last_cache.save('test')
211         self.assertRaises(arv_put.ResumeCacheConflict,
212                           arv_put.ResumeCache, path)
213
214     def test_destroy_cache(self):
215         cachefile = tempfile.NamedTemporaryFile(delete=False)
216         try:
217             cache = arv_put.ResumeCache(cachefile.name)
218             cache.save('test')
219             cache.destroy()
220             try:
221                 arv_put.ResumeCache(cachefile.name)
222             except arv_put.ResumeCacheConflict:
223                 self.fail("could not load cache after destroying it")
224             self.assertRaises(ValueError, cache.load)
225         finally:
226             if os.path.exists(cachefile.name):
227                 os.unlink(cachefile.name)
228
229     def test_restart_cache(self):
230         path = os.path.join(self.make_tmpdir(), 'cache')
231         cache = arv_put.ResumeCache(path)
232         cache.save('test')
233         cache.restart()
234         self.assertRaises(ValueError, cache.load)
235         self.assertRaises(arv_put.ResumeCacheConflict,
236                           arv_put.ResumeCache, path)
237
238
239 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
240                           ArvadosBaseTestCase):
241
242     def setUp(self):
243         super(ArvPutUploadJobTest, self).setUp()
244         run_test_server.authorize_with('active')
245         # Temp files creation
246         self.tempdir = tempfile.mkdtemp()
247         subdir = os.path.join(self.tempdir, 'subdir')
248         os.mkdir(subdir)
249         data = "x" * 1024 # 1 KB
250         for i in range(1, 5):
251             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
252                 f.write(data * i)
253         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
254             f.write(data * 5)
255         # Large temp file for resume test
256         _, self.large_file_name = tempfile.mkstemp()
257         fileobj = open(self.large_file_name, 'w')
258         # Make sure to write just a little more than one block
259         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
260             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
261             fileobj.write(data)
262         fileobj.close()
263         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
264
265     def tearDown(self):
266         super(ArvPutUploadJobTest, self).tearDown()
267         shutil.rmtree(self.tempdir)
268         os.unlink(self.large_file_name)
269
270     def test_writer_works_without_cache(self):
271         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
272         cwriter.start(save_collection=False)
273         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
274
275     def test_writer_works_with_cache(self):
276         with tempfile.NamedTemporaryFile() as f:
277             f.write('foo')
278             f.flush()
279             cwriter = arv_put.ArvPutUploadJob([f.name])
280             cwriter.start(save_collection=False)
281             self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
282             # Don't destroy the cache, and start another upload
283             cwriter_new = arv_put.ArvPutUploadJob([f.name])
284             cwriter_new.start(save_collection=False)
285             cwriter_new.destroy_cache()
286             self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
287
288     def make_progress_tester(self):
289         progression = []
290         def record_func(written, expected):
291             progression.append((written, expected))
292         return progression, record_func
293
294     def test_progress_reporting(self):
295         with tempfile.NamedTemporaryFile() as f:
296             f.write('foo')
297             f.flush()
298             for expect_count in (None, 8):
299                 progression, reporter = self.make_progress_tester()
300                 cwriter = arv_put.ArvPutUploadJob([f.name],
301                     reporter=reporter, bytes_expected=expect_count)
302                 cwriter.start(save_collection=False)
303                 cwriter.destroy_cache()
304                 self.assertIn((3, expect_count), progression)
305
306     def test_writer_upload_directory(self):
307         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
308         cwriter.start(save_collection=False)
309         cwriter.destroy_cache()
310         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
311
312     def test_resume_large_file_upload(self):
313         def wrapped_write(*args, **kwargs):
314             data = args[1]
315             # Exit only on last block
316             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
317                 raise SystemExit("Simulated error")
318             return self.arvfile_write(*args, **kwargs)
319
320         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
321                         autospec=True) as mocked_write:
322             mocked_write.side_effect = wrapped_write
323             writer = arv_put.ArvPutUploadJob([self.large_file_name],
324                                              replication_desired=1)
325             with self.assertRaises(SystemExit):
326                 writer.start(save_collection=False)
327             # Confirm that the file was partially uploaded
328             self.assertGreater(writer.bytes_written, 0)
329             self.assertLess(writer.bytes_written,
330                             os.path.getsize(self.large_file_name))
331         # Retry the upload
332         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
333                                           replication_desired=1)
334         writer2.start(save_collection=False)
335         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
336                          os.path.getsize(self.large_file_name))
337         writer2.destroy_cache()
338
339     def test_no_resume_when_asked(self):
340         def wrapped_write(*args, **kwargs):
341             data = args[1]
342             # Exit only on last block
343             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
344                 raise SystemExit("Simulated error")
345             return self.arvfile_write(*args, **kwargs)
346
347         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
348                         autospec=True) as mocked_write:
349             mocked_write.side_effect = wrapped_write
350             writer = arv_put.ArvPutUploadJob([self.large_file_name],
351                                              replication_desired=1)
352             with self.assertRaises(SystemExit):
353                 writer.start(save_collection=False)
354             # Confirm that the file was partially uploaded
355             self.assertGreater(writer.bytes_written, 0)
356             self.assertLess(writer.bytes_written,
357                             os.path.getsize(self.large_file_name))
358         # Retry the upload, this time without resume
359         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
360                                           replication_desired=1,
361                                           resume=False)
362         writer2.start(save_collection=False)
363         self.assertEqual(writer2.bytes_skipped, 0)
364         self.assertEqual(writer2.bytes_written,
365                          os.path.getsize(self.large_file_name))
366         writer2.destroy_cache()
367
368     def test_no_resume_when_no_cache(self):
369         def wrapped_write(*args, **kwargs):
370             data = args[1]
371             # Exit only on last block
372             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
373                 raise SystemExit("Simulated error")
374             return self.arvfile_write(*args, **kwargs)
375
376         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
377                         autospec=True) as mocked_write:
378             mocked_write.side_effect = wrapped_write
379             writer = arv_put.ArvPutUploadJob([self.large_file_name],
380                                              replication_desired=1)
381             with self.assertRaises(SystemExit):
382                 writer.start(save_collection=False)
383             # Confirm that the file was partially uploaded
384             self.assertGreater(writer.bytes_written, 0)
385             self.assertLess(writer.bytes_written,
386                             os.path.getsize(self.large_file_name))
387         # Retry the upload, this time without cache usage
388         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
389                                           replication_desired=1,
390                                           resume=False,
391                                           use_cache=False)
392         writer2.start(save_collection=False)
393         self.assertEqual(writer2.bytes_skipped, 0)
394         self.assertEqual(writer2.bytes_written,
395                          os.path.getsize(self.large_file_name))
396         writer2.destroy_cache()
397
398
399 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
400     TEST_SIZE = os.path.getsize(__file__)
401
402     def test_expected_bytes_for_file(self):
403         self.assertEqual(self.TEST_SIZE,
404                           arv_put.expected_bytes_for([__file__]))
405
406     def test_expected_bytes_for_tree(self):
407         tree = self.make_tmpdir()
408         shutil.copyfile(__file__, os.path.join(tree, 'one'))
409         shutil.copyfile(__file__, os.path.join(tree, 'two'))
410         self.assertEqual(self.TEST_SIZE * 2,
411                           arv_put.expected_bytes_for([tree]))
412         self.assertEqual(self.TEST_SIZE * 3,
413                           arv_put.expected_bytes_for([tree, __file__]))
414
415     def test_expected_bytes_for_device(self):
416         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
417         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
418
419
420 class ArvadosPutReportTest(ArvadosBaseTestCase):
421     def test_machine_progress(self):
422         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
423             expect = ": {} written {} total\n".format(
424                 count, -1 if (total is None) else total)
425             self.assertTrue(
426                 arv_put.machine_progress(count, total).endswith(expect))
427
428     def test_known_human_progress(self):
429         for count, total in [(0, 1), (2, 4), (45, 60)]:
430             expect = '{:.1%}'.format(float(count) / total)
431             actual = arv_put.human_progress(count, total)
432             self.assertTrue(actual.startswith('\r'))
433             self.assertIn(expect, actual)
434
435     def test_unknown_human_progress(self):
436         for count in [1, 20, 300, 4000, 50000]:
437             self.assertTrue(re.search(r'\b{}\b'.format(count),
438                                       arv_put.human_progress(count, None)))
439
440
441 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
442     MAIN_SERVER = {}
443     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
444
445     def call_main_with_args(self, args):
446         self.main_stdout = StringIO()
447         self.main_stderr = StringIO()
448         return arv_put.main(args, self.main_stdout, self.main_stderr)
449
450     def call_main_on_test_file(self, args=[]):
451         with self.make_test_file() as testfile:
452             path = testfile.name
453             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
454         self.assertTrue(
455             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
456                                         '098f6bcd4621d373cade4e832627b4f6')),
457             "did not find file stream in Keep store")
458
459     def setUp(self):
460         super(ArvadosPutTest, self).setUp()
461         run_test_server.authorize_with('active')
462         arv_put.api_client = None
463
464     def tearDown(self):
465         for outbuf in ['main_stdout', 'main_stderr']:
466             if hasattr(self, outbuf):
467                 getattr(self, outbuf).close()
468                 delattr(self, outbuf)
469         super(ArvadosPutTest, self).tearDown()
470
471     def test_simple_file_put(self):
472         self.call_main_on_test_file()
473
474     def test_put_with_unwriteable_cache_dir(self):
475         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
476         cachedir = self.make_tmpdir()
477         os.chmod(cachedir, 0o0)
478         arv_put.ResumeCache.CACHE_DIR = cachedir
479         try:
480             self.call_main_on_test_file()
481         finally:
482             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
483             os.chmod(cachedir, 0o700)
484
485     def test_put_with_unwritable_cache_subdir(self):
486         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
487         cachedir = self.make_tmpdir()
488         os.chmod(cachedir, 0o0)
489         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
490         try:
491             self.call_main_on_test_file()
492         finally:
493             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
494             os.chmod(cachedir, 0o700)
495
496     def test_put_block_replication(self):
497         self.call_main_on_test_file()
498         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
499             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
500             self.call_main_on_test_file(['--replication', '1'])
501             self.call_main_on_test_file(['--replication', '4'])
502             self.call_main_on_test_file(['--replication', '5'])
503             self.assertEqual(
504                 [x[-1].get('copies') for x in put_mock.call_args_list],
505                 [1, 4, 5])
506
507     def test_normalize(self):
508         testfile1 = self.make_test_file()
509         testfile2 = self.make_test_file()
510         test_paths = [testfile1.name, testfile2.name]
511         # Reverse-sort the paths, so normalization must change their order.
512         test_paths.sort(reverse=True)
513         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
514                                  test_paths)
515         manifest = self.main_stdout.getvalue()
516         # Assert the second file we specified appears first in the manifest.
517         file_indices = [manifest.find(':' + os.path.basename(path))
518                         for path in test_paths]
519         self.assertGreater(*file_indices)
520
521     def test_error_name_without_collection(self):
522         self.assertRaises(SystemExit, self.call_main_with_args,
523                           ['--name', 'test without Collection',
524                            '--stream', '/dev/null'])
525
526     def test_error_when_project_not_found(self):
527         self.assertRaises(SystemExit,
528                           self.call_main_with_args,
529                           ['--project-uuid', self.Z_UUID])
530
531     def test_error_bad_project_uuid(self):
532         self.assertRaises(SystemExit,
533                           self.call_main_with_args,
534                           ['--project-uuid', self.Z_UUID, '--stream'])
535
536     def test_api_error_handling(self):
537         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
538         coll_save_mock.side_effect = arvados.errors.ApiError(
539             fake_httplib2_response(403), '{}')
540         with mock.patch('arvados.collection.Collection.save_new',
541                         new=coll_save_mock):
542             with self.assertRaises(SystemExit) as exc_test:
543                 self.call_main_with_args(['/dev/null'])
544             self.assertLess(0, exc_test.exception.args[0])
545             self.assertLess(0, coll_save_mock.call_count)
546             self.assertEqual("", self.main_stdout.getvalue())
547
548
549 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
550                             ArvadosBaseTestCase):
551     def _getKeepServerConfig():
552         for config_file, mandatory in [
553                 ['application.yml', False], ['application.default.yml', True]]:
554             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
555                                 "api", "config", config_file)
556             if not mandatory and not os.path.exists(path):
557                 continue
558             with open(path) as f:
559                 rails_config = yaml.load(f.read())
560                 for config_section in ['test', 'common']:
561                     try:
562                         key = rails_config[config_section]["blob_signing_key"]
563                     except (KeyError, TypeError):
564                         pass
565                     else:
566                         return {'blob_signing_key': key,
567                                 'enforce_permissions': True}
568         return {'blog_signing_key': None, 'enforce_permissions': False}
569
570     MAIN_SERVER = {}
571     KEEP_SERVER = _getKeepServerConfig()
572     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
573
574     @classmethod
575     def setUpClass(cls):
576         super(ArvPutIntegrationTest, cls).setUpClass()
577         cls.ENVIRON = os.environ.copy()
578         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
579
580     def setUp(self):
581         super(ArvPutIntegrationTest, self).setUp()
582         arv_put.api_client = None
583
584     def authorize_with(self, token_name):
585         run_test_server.authorize_with(token_name)
586         for v in ["ARVADOS_API_HOST",
587                   "ARVADOS_API_HOST_INSECURE",
588                   "ARVADOS_API_TOKEN"]:
589             self.ENVIRON[v] = arvados.config.settings()[v]
590         arv_put.api_client = arvados.api('v1')
591
592     def current_user(self):
593         return arv_put.api_client.users().current().execute()
594
595     def test_check_real_project_found(self):
596         self.authorize_with('active')
597         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
598                         "did not correctly find test fixture project")
599
600     def test_check_error_finding_nonexistent_uuid(self):
601         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
602         self.authorize_with('active')
603         try:
604             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
605                                                   0)
606         except ValueError as error:
607             self.assertIn(BAD_UUID, error.message)
608         else:
609             self.assertFalse(result, "incorrectly found nonexistent project")
610
611     def test_check_error_finding_nonexistent_project(self):
612         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
613         self.authorize_with('active')
614         with self.assertRaises(apiclient.errors.HttpError):
615             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
616                                                   0)
617
618     def test_short_put_from_stdin(self):
619         # Have to run this as an integration test since arv-put can't
620         # read from the tests' stdin.
621         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
622         # case, because the /proc entry is already gone by the time it tries.
623         pipe = subprocess.Popen(
624             [sys.executable, arv_put.__file__, '--stream'],
625             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
626             stderr=subprocess.STDOUT, env=self.ENVIRON)
627         pipe.stdin.write('stdin test\n')
628         pipe.stdin.close()
629         deadline = time.time() + 5
630         while (pipe.poll() is None) and (time.time() < deadline):
631             time.sleep(.1)
632         returncode = pipe.poll()
633         if returncode is None:
634             pipe.terminate()
635             self.fail("arv-put did not PUT from stdin within 5 seconds")
636         elif returncode != 0:
637             sys.stdout.write(pipe.stdout.read())
638             self.fail("arv-put returned exit code {}".format(returncode))
639         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
640
641     def test_ArvPutSignedManifest(self):
642         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
643         # the newly created manifest from the API server, testing to confirm
644         # that the block locators in the returned manifest are signed.
645         self.authorize_with('active')
646
647         # Before doing anything, demonstrate that the collection
648         # we're about to create is not present in our test fixture.
649         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
650         with self.assertRaises(apiclient.errors.HttpError):
651             notfound = arv_put.api_client.collections().get(
652                 uuid=manifest_uuid).execute()
653
654         datadir = self.make_tmpdir()
655         with open(os.path.join(datadir, "foo"), "w") as f:
656             f.write("The quick brown fox jumped over the lazy dog")
657         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
658                              stdout=subprocess.PIPE, env=self.ENVIRON)
659         (arvout, arverr) = p.communicate()
660         self.assertEqual(arverr, None)
661         self.assertEqual(p.returncode, 0)
662
663         # The manifest text stored in the API server under the same
664         # manifest UUID must use signed locators.
665         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
666         self.assertRegexpMatches(
667             c['manifest_text'],
668             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
669
670         os.remove(os.path.join(datadir, "foo"))
671         os.rmdir(datadir)
672
673     def run_and_find_collection(self, text, extra_args=[]):
674         self.authorize_with('active')
675         pipe = subprocess.Popen(
676             [sys.executable, arv_put.__file__] + extra_args,
677             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
678             stderr=subprocess.PIPE, env=self.ENVIRON)
679         stdout, stderr = pipe.communicate(text)
680         search_key = ('portable_data_hash'
681                       if '--portable-data-hash' in extra_args else 'uuid')
682         collection_list = arvados.api('v1').collections().list(
683             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
684         self.assertEqual(1, len(collection_list))
685         return collection_list[0]
686
687     def test_put_collection_with_later_update(self):
688         tmpdir = self.make_tmpdir()
689         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
690             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
691         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
692         self.assertNotEqual(None, col['uuid'])
693         # Add a new file to the directory
694         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
695             f.write('The quick brown fox jumped over the lazy dog')
696         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
697         self.assertEqual(col['uuid'], updated_col['uuid'])
698         # Get the manifest and check that the new file is being included
699         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
700         self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
701
702     def test_put_collection_with_high_redundancy(self):
703         # Write empty data: we're not testing CollectionWriter, just
704         # making sure collections.create tells the API server what our
705         # desired replication level is.
706         collection = self.run_and_find_collection("", ['--replication', '4'])
707         self.assertEqual(4, collection['replication_desired'])
708
709     def test_put_collection_with_default_redundancy(self):
710         collection = self.run_and_find_collection("")
711         self.assertEqual(None, collection['replication_desired'])
712
713     def test_put_collection_with_unnamed_project_link(self):
714         link = self.run_and_find_collection(
715             "Test unnamed collection",
716             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
717         username = pwd.getpwuid(os.getuid()).pw_name
718         self.assertRegexpMatches(
719             link['name'],
720             r'^Saved at .* by {}@'.format(re.escape(username)))
721
722     def test_put_collection_with_name_and_no_project(self):
723         link_name = 'Test Collection Link in home project'
724         collection = self.run_and_find_collection(
725             "Test named collection in home project",
726             ['--portable-data-hash', '--name', link_name])
727         self.assertEqual(link_name, collection['name'])
728         my_user_uuid = self.current_user()['uuid']
729         self.assertEqual(my_user_uuid, collection['owner_uuid'])
730
731     def test_put_collection_with_named_project_link(self):
732         link_name = 'Test auto Collection Link'
733         collection = self.run_and_find_collection("Test named collection",
734                                       ['--portable-data-hash',
735                                        '--name', link_name,
736                                        '--project-uuid', self.PROJECT_UUID])
737         self.assertEqual(link_name, collection['name'])
738
739
740 if __name__ == '__main__':
741     unittest.main()