9463: Finished writing tests for ArvPutUploadJob
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import mock
6 import os
7 import pwd
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import tempfile
13 import time
14 import unittest
15 import yaml
16 import threading
17 import hashlib
18 import random
19
20 from cStringIO import StringIO
21
22 import arvados
23 import arvados.commands.put as arv_put
24
25 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
26 import run_test_server
27
28 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
29     CACHE_ARGSET = [
30         [],
31         ['/dev/null'],
32         ['/dev/null', '--filename', 'empty'],
33         ['/tmp'],
34         ['/tmp', '--max-manifest-depth', '0'],
35         ['/tmp', '--max-manifest-depth', '1']
36         ]
37
38     def tearDown(self):
39         super(ArvadosPutResumeCacheTest, self).tearDown()
40         try:
41             self.last_cache.destroy()
42         except AttributeError:
43             pass
44
45     def cache_path_from_arglist(self, arglist):
46         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
47
48     def test_cache_names_stable(self):
49         for argset in self.CACHE_ARGSET:
50             self.assertEqual(self.cache_path_from_arglist(argset),
51                               self.cache_path_from_arglist(argset),
52                               "cache name changed for {}".format(argset))
53
54     def test_cache_names_unique(self):
55         results = []
56         for argset in self.CACHE_ARGSET:
57             path = self.cache_path_from_arglist(argset)
58             self.assertNotIn(path, results)
59             results.append(path)
60
61     def test_cache_names_simple(self):
62         # The goal here is to make sure the filename doesn't use characters
63         # reserved by the filesystem.  Feel free to adjust this regexp as
64         # long as it still does that.
65         bad_chars = re.compile(r'[^-\.\w]')
66         for argset in self.CACHE_ARGSET:
67             path = self.cache_path_from_arglist(argset)
68             self.assertFalse(bad_chars.search(os.path.basename(path)),
69                              "path too exotic: {}".format(path))
70
71     def test_cache_names_ignore_argument_order(self):
72         self.assertEqual(
73             self.cache_path_from_arglist(['a', 'b', 'c']),
74             self.cache_path_from_arglist(['c', 'a', 'b']))
75         self.assertEqual(
76             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
77             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
78
79     def test_cache_names_differ_for_similar_paths(self):
80         # This test needs names at / that don't exist on the real filesystem.
81         self.assertNotEqual(
82             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
83             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
84
85     def test_cache_names_ignore_irrelevant_arguments(self):
86         # Workaround: parse_arguments bails on --filename with a directory.
87         path1 = self.cache_path_from_arglist(['/tmp'])
88         args = arv_put.parse_arguments(['/tmp'])
89         args.filename = 'tmp'
90         path2 = arv_put.ResumeCache.make_path(args)
91         self.assertEqual(path1, path2,
92                          "cache path considered --filename for directory")
93         self.assertEqual(
94             self.cache_path_from_arglist(['-']),
95             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
96             "cache path considered --max-manifest-depth for file")
97
98     def test_cache_names_treat_negative_manifest_depths_identically(self):
99         base_args = ['/tmp', '--max-manifest-depth']
100         self.assertEqual(
101             self.cache_path_from_arglist(base_args + ['-1']),
102             self.cache_path_from_arglist(base_args + ['-2']))
103
104     def test_cache_names_treat_stdin_consistently(self):
105         self.assertEqual(
106             self.cache_path_from_arglist(['-', '--filename', 'test']),
107             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
108
109     def test_cache_names_identical_for_synonymous_names(self):
110         self.assertEqual(
111             self.cache_path_from_arglist(['.']),
112             self.cache_path_from_arglist([os.path.realpath('.')]))
113         testdir = self.make_tmpdir()
114         looplink = os.path.join(testdir, 'loop')
115         os.symlink(testdir, looplink)
116         self.assertEqual(
117             self.cache_path_from_arglist([testdir]),
118             self.cache_path_from_arglist([looplink]))
119
120     def test_cache_names_different_by_api_host(self):
121         config = arvados.config.settings()
122         orig_host = config.get('ARVADOS_API_HOST')
123         try:
124             name1 = self.cache_path_from_arglist(['.'])
125             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
126             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
127         finally:
128             if orig_host is None:
129                 del config['ARVADOS_API_HOST']
130             else:
131                 config['ARVADOS_API_HOST'] = orig_host
132
133     @mock.patch('arvados.keep.KeepClient.head')
134     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
135         keep_client_head.side_effect = [True]
136         thing = {}
137         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
138         with tempfile.NamedTemporaryFile() as cachefile:
139             self.last_cache = arv_put.ResumeCache(cachefile.name)
140         self.last_cache.save(thing)
141         self.last_cache.close()
142         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
143         self.assertNotEqual(None, resume_cache)
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_finished_streams(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
159         keep_client_head.side_effect = Exception('Locator not found')
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168         self.assertRaises(None, resume_cache.check_cache())
169
170     def test_basic_cache_storage(self):
171         thing = ['test', 'list']
172         with tempfile.NamedTemporaryFile() as cachefile:
173             self.last_cache = arv_put.ResumeCache(cachefile.name)
174         self.last_cache.save(thing)
175         self.assertEqual(thing, self.last_cache.load())
176
177     def test_empty_cache(self):
178         with tempfile.NamedTemporaryFile() as cachefile:
179             cache = arv_put.ResumeCache(cachefile.name)
180         self.assertRaises(ValueError, cache.load)
181
182     def test_cache_persistent(self):
183         thing = ['test', 'list']
184         path = os.path.join(self.make_tmpdir(), 'cache')
185         cache = arv_put.ResumeCache(path)
186         cache.save(thing)
187         cache.close()
188         self.last_cache = arv_put.ResumeCache(path)
189         self.assertEqual(thing, self.last_cache.load())
190
191     def test_multiple_cache_writes(self):
192         thing = ['short', 'list']
193         with tempfile.NamedTemporaryFile() as cachefile:
194             self.last_cache = arv_put.ResumeCache(cachefile.name)
195         # Start writing an object longer than the one we test, to make
196         # sure the cache file gets truncated.
197         self.last_cache.save(['long', 'long', 'list'])
198         self.last_cache.save(thing)
199         self.assertEqual(thing, self.last_cache.load())
200
201     def test_cache_is_locked(self):
202         with tempfile.NamedTemporaryFile() as cachefile:
203             cache = arv_put.ResumeCache(cachefile.name)
204             self.assertRaises(arv_put.ResumeCacheConflict,
205                               arv_put.ResumeCache, cachefile.name)
206
207     def test_cache_stays_locked(self):
208         with tempfile.NamedTemporaryFile() as cachefile:
209             self.last_cache = arv_put.ResumeCache(cachefile.name)
210             path = cachefile.name
211         self.last_cache.save('test')
212         self.assertRaises(arv_put.ResumeCacheConflict,
213                           arv_put.ResumeCache, path)
214
215     def test_destroy_cache(self):
216         cachefile = tempfile.NamedTemporaryFile(delete=False)
217         try:
218             cache = arv_put.ResumeCache(cachefile.name)
219             cache.save('test')
220             cache.destroy()
221             try:
222                 arv_put.ResumeCache(cachefile.name)
223             except arv_put.ResumeCacheConflict:
224                 self.fail("could not load cache after destroying it")
225             self.assertRaises(ValueError, cache.load)
226         finally:
227             if os.path.exists(cachefile.name):
228                 os.unlink(cachefile.name)
229
230     def test_restart_cache(self):
231         path = os.path.join(self.make_tmpdir(), 'cache')
232         cache = arv_put.ResumeCache(path)
233         cache.save('test')
234         cache.restart()
235         self.assertRaises(ValueError, cache.load)
236         self.assertRaises(arv_put.ResumeCacheConflict,
237                           arv_put.ResumeCache, path)
238
239
240 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
241                           ArvadosBaseTestCase):
242     def setUp(self):
243         super(ArvPutUploadJobTest, self).setUp()
244         run_test_server.authorize_with('active')
245         self.exit_lock = threading.Lock()
246         self.save_manifest_lock = threading.Lock()
247
248     def tearDown(self):
249         super(ArvPutUploadJobTest, self).tearDown()
250
251     def test_writer_works_without_cache(self):
252         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
253         cwriter.start()
254         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
255
256     def test_writer_works_with_cache(self):
257         with tempfile.NamedTemporaryFile() as f:
258             f.write('foo')
259             f.flush()
260             cwriter = arv_put.ArvPutUploadJob([f.name])
261             cwriter.start()
262             self.assertEqual(3, cwriter.bytes_written)
263             # Don't destroy the cache, and start another upload
264             cwriter_new = arv_put.ArvPutUploadJob([f.name])
265             cwriter_new.start()
266             self.assertEqual(0, cwriter_new.bytes_written)
267             cwriter_new.destroy_cache()
268
269     def make_progress_tester(self):
270         progression = []
271         def record_func(written, expected):
272             progression.append((written, expected))
273         return progression, record_func
274
275     def test_progress_reporting(self):
276         with tempfile.NamedTemporaryFile() as f:
277             f.write('foo')
278             f.flush()
279             for expect_count in (None, 8):
280                 progression, reporter = self.make_progress_tester()
281                 cwriter = arv_put.ArvPutUploadJob([f.name],
282                     reporter=reporter, bytes_expected=expect_count)
283                 cwriter.start()
284                 cwriter.destroy_cache()
285                 self.assertIn((3, expect_count), progression)
286
287     def test_writer_upload_directory(self):
288         tempdir = tempfile.mkdtemp()
289         subdir = os.path.join(tempdir, 'subdir')
290         os.mkdir(subdir)
291         data = "x" * 1024 # 1 KB
292         for i in range(1, 5):
293             with open(os.path.join(tempdir, str(i)), 'w') as f:
294                 f.write(data * i)
295         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
296             f.write(data * 5)
297         cwriter = arv_put.ArvPutUploadJob([tempdir])
298         cwriter.start()
299         cwriter.destroy_cache()
300         shutil.rmtree(tempdir)
301         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
302
303     def test_resume_large_file_upload(self):
304         # Proxying ArvadosFile.writeto() method to be able to synchronize it
305         # with partial manifest saves
306         orig_func = getattr(arvados.arvfile.ArvadosFile, 'writeto')
307         def wrapped_func(*args, **kwargs):
308             data = args[2]
309             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
310                 # Lock on the last block write call, waiting for the
311                 # manifest to be saved
312                 self.exit_lock.acquire()
313                 raise SystemExit('Test exception')
314             ret = orig_func(*args, **kwargs)
315             self.save_manifest_lock.release()
316             return ret
317         setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_func)
318         # Take advantage of the reporter feature to sync the partial
319         # manifest writing with the simulated upload error.
320         def fake_reporter(written, expected):
321             # Wait until there's something to save
322             self.save_manifest_lock.acquire()
323             # Once the partial manifest is saved, allow exiting
324             self.exit_lock.release()
325         # Create random data to be uploaded
326         md5_original = hashlib.md5()
327         _, filename = tempfile.mkstemp()
328         fileobj = open(filename, 'w')
329         # Make sure to write just a little more than one block
330         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
331             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
332             md5_original.update(data)
333             fileobj.write(data)
334         fileobj.close()
335         self.exit_lock.acquire()
336         self.save_manifest_lock.acquire()
337         writer = arv_put.ArvPutUploadJob([filename],
338                                          reporter=fake_reporter,
339                                          update_time=0.1)
340         # First upload: partially completed with simulated error
341         try:
342             self.assertRaises(SystemExit, writer.start())
343         except SystemExit:
344             # Avoid getting a ResumeCacheConflict on the 2nd run
345             writer._cache_file.close()
346         self.assertLess(writer.bytes_written, os.path.getsize(filename))
347
348         # Restore the ArvadosFile.writeto() method to before retrying
349         setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_func)
350         writer_new = arv_put.ArvPutUploadJob([filename])
351         writer_new.start()
352         writer_new.destroy_cache()
353         self.assertEqual(os.path.getsize(filename),
354                          writer.bytes_written + writer_new.bytes_written)
355         # Read the uploaded file to compare its md5 hash
356         md5_uploaded = hashlib.md5()
357         c = arvados.collection.Collection(writer_new.manifest_text())
358         with c.open(os.path.basename(filename), 'r') as f:
359             new_data = f.read()
360             md5_uploaded.update(new_data)
361         self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
362         # Cleaning up
363         os.unlink(filename)
364
365
366 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
367     TEST_SIZE = os.path.getsize(__file__)
368
369     def test_expected_bytes_for_file(self):
370         self.assertEqual(self.TEST_SIZE,
371                           arv_put.expected_bytes_for([__file__]))
372
373     def test_expected_bytes_for_tree(self):
374         tree = self.make_tmpdir()
375         shutil.copyfile(__file__, os.path.join(tree, 'one'))
376         shutil.copyfile(__file__, os.path.join(tree, 'two'))
377         self.assertEqual(self.TEST_SIZE * 2,
378                           arv_put.expected_bytes_for([tree]))
379         self.assertEqual(self.TEST_SIZE * 3,
380                           arv_put.expected_bytes_for([tree, __file__]))
381
382     def test_expected_bytes_for_device(self):
383         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
384         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
385
386
387 class ArvadosPutReportTest(ArvadosBaseTestCase):
388     def test_machine_progress(self):
389         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
390             expect = ": {} written {} total\n".format(
391                 count, -1 if (total is None) else total)
392             self.assertTrue(
393                 arv_put.machine_progress(count, total).endswith(expect))
394
395     def test_known_human_progress(self):
396         for count, total in [(0, 1), (2, 4), (45, 60)]:
397             expect = '{:.1%}'.format(float(count) / total)
398             actual = arv_put.human_progress(count, total)
399             self.assertTrue(actual.startswith('\r'))
400             self.assertIn(expect, actual)
401
402     def test_unknown_human_progress(self):
403         for count in [1, 20, 300, 4000, 50000]:
404             self.assertTrue(re.search(r'\b{}\b'.format(count),
405                                       arv_put.human_progress(count, None)))
406
407
408 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
409     MAIN_SERVER = {}
410     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
411
412     def call_main_with_args(self, args):
413         self.main_stdout = StringIO()
414         self.main_stderr = StringIO()
415         return arv_put.main(args, self.main_stdout, self.main_stderr)
416
417     def call_main_on_test_file(self, args=[]):
418         with self.make_test_file() as testfile:
419             path = testfile.name
420             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
421         self.assertTrue(
422             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
423                                         '098f6bcd4621d373cade4e832627b4f6')),
424             "did not find file stream in Keep store")
425
426     def setUp(self):
427         super(ArvadosPutTest, self).setUp()
428         run_test_server.authorize_with('active')
429         arv_put.api_client = None
430
431     def tearDown(self):
432         for outbuf in ['main_stdout', 'main_stderr']:
433             if hasattr(self, outbuf):
434                 getattr(self, outbuf).close()
435                 delattr(self, outbuf)
436         super(ArvadosPutTest, self).tearDown()
437
438     def test_simple_file_put(self):
439         self.call_main_on_test_file()
440
441     def test_put_with_unwriteable_cache_dir(self):
442         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
443         cachedir = self.make_tmpdir()
444         os.chmod(cachedir, 0o0)
445         arv_put.ResumeCache.CACHE_DIR = cachedir
446         try:
447             self.call_main_on_test_file()
448         finally:
449             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
450             os.chmod(cachedir, 0o700)
451
452     def test_put_with_unwritable_cache_subdir(self):
453         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
454         cachedir = self.make_tmpdir()
455         os.chmod(cachedir, 0o0)
456         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
457         try:
458             self.call_main_on_test_file()
459         finally:
460             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
461             os.chmod(cachedir, 0o700)
462
463     def test_put_block_replication(self):
464         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
465              mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
466             cache_mock.side_effect = ValueError
467             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
468             self.call_main_on_test_file(['--replication', '1'])
469             self.call_main_on_test_file(['--replication', '4'])
470             self.call_main_on_test_file(['--replication', '5'])
471             self.assertEqual(
472                 [x[-1].get('copies') for x in put_mock.call_args_list],
473                 [1, 4, 5])
474
475     def test_normalize(self):
476         testfile1 = self.make_test_file()
477         testfile2 = self.make_test_file()
478         test_paths = [testfile1.name, testfile2.name]
479         # Reverse-sort the paths, so normalization must change their order.
480         test_paths.sort(reverse=True)
481         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
482                                  test_paths)
483         manifest = self.main_stdout.getvalue()
484         # Assert the second file we specified appears first in the manifest.
485         file_indices = [manifest.find(':' + os.path.basename(path))
486                         for path in test_paths]
487         self.assertGreater(*file_indices)
488
489     def test_error_name_without_collection(self):
490         self.assertRaises(SystemExit, self.call_main_with_args,
491                           ['--name', 'test without Collection',
492                            '--stream', '/dev/null'])
493
494     def test_error_when_project_not_found(self):
495         self.assertRaises(SystemExit,
496                           self.call_main_with_args,
497                           ['--project-uuid', self.Z_UUID])
498
499     def test_error_bad_project_uuid(self):
500         self.assertRaises(SystemExit,
501                           self.call_main_with_args,
502                           ['--project-uuid', self.Z_UUID, '--stream'])
503
504     def test_api_error_handling(self):
505         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
506         coll_save_mock.side_effect = arvados.errors.ApiError(
507             fake_httplib2_response(403), '{}')
508         arvados.collection.Collection.save_new = coll_save_mock
509         with self.assertRaises(SystemExit) as exc_test:
510             self.call_main_with_args(['/dev/null'])
511         self.assertLess(0, exc_test.exception.args[0])
512         self.assertLess(0, coll_save_mock.call_count)
513         self.assertEqual("", self.main_stdout.getvalue())
514
515
516 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
517                             ArvadosBaseTestCase):
518     def _getKeepServerConfig():
519         for config_file, mandatory in [
520                 ['application.yml', False], ['application.default.yml', True]]:
521             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
522                                 "api", "config", config_file)
523             if not mandatory and not os.path.exists(path):
524                 continue
525             with open(path) as f:
526                 rails_config = yaml.load(f.read())
527                 for config_section in ['test', 'common']:
528                     try:
529                         key = rails_config[config_section]["blob_signing_key"]
530                     except (KeyError, TypeError):
531                         pass
532                     else:
533                         return {'blob_signing_key': key,
534                                 'enforce_permissions': True}
535         return {'blog_signing_key': None, 'enforce_permissions': False}
536
537     MAIN_SERVER = {}
538     KEEP_SERVER = _getKeepServerConfig()
539     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
540
541     @classmethod
542     def setUpClass(cls):
543         super(ArvPutIntegrationTest, cls).setUpClass()
544         cls.ENVIRON = os.environ.copy()
545         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
546
547     def setUp(self):
548         super(ArvPutIntegrationTest, self).setUp()
549         arv_put.api_client = None
550
551     def authorize_with(self, token_name):
552         run_test_server.authorize_with(token_name)
553         for v in ["ARVADOS_API_HOST",
554                   "ARVADOS_API_HOST_INSECURE",
555                   "ARVADOS_API_TOKEN"]:
556             self.ENVIRON[v] = arvados.config.settings()[v]
557         arv_put.api_client = arvados.api('v1')
558
559     def current_user(self):
560         return arv_put.api_client.users().current().execute()
561
562     def test_check_real_project_found(self):
563         self.authorize_with('active')
564         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
565                         "did not correctly find test fixture project")
566
567     def test_check_error_finding_nonexistent_uuid(self):
568         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
569         self.authorize_with('active')
570         try:
571             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
572                                                   0)
573         except ValueError as error:
574             self.assertIn(BAD_UUID, error.message)
575         else:
576             self.assertFalse(result, "incorrectly found nonexistent project")
577
578     def test_check_error_finding_nonexistent_project(self):
579         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
580         self.authorize_with('active')
581         with self.assertRaises(apiclient.errors.HttpError):
582             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
583                                                   0)
584
585     def test_short_put_from_stdin(self):
586         # Have to run this as an integration test since arv-put can't
587         # read from the tests' stdin.
588         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
589         # case, because the /proc entry is already gone by the time it tries.
590         pipe = subprocess.Popen(
591             [sys.executable, arv_put.__file__, '--stream'],
592             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
593             stderr=subprocess.STDOUT, env=self.ENVIRON)
594         pipe.stdin.write('stdin test\n')
595         pipe.stdin.close()
596         deadline = time.time() + 5
597         while (pipe.poll() is None) and (time.time() < deadline):
598             time.sleep(.1)
599         returncode = pipe.poll()
600         if returncode is None:
601             pipe.terminate()
602             self.fail("arv-put did not PUT from stdin within 5 seconds")
603         elif returncode != 0:
604             sys.stdout.write(pipe.stdout.read())
605             self.fail("arv-put returned exit code {}".format(returncode))
606         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
607
608     def test_ArvPutSignedManifest(self):
609         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
610         # the newly created manifest from the API server, testing to confirm
611         # that the block locators in the returned manifest are signed.
612         self.authorize_with('active')
613
614         # Before doing anything, demonstrate that the collection
615         # we're about to create is not present in our test fixture.
616         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
617         with self.assertRaises(apiclient.errors.HttpError):
618             notfound = arv_put.api_client.collections().get(
619                 uuid=manifest_uuid).execute()
620
621         datadir = self.make_tmpdir()
622         with open(os.path.join(datadir, "foo"), "w") as f:
623             f.write("The quick brown fox jumped over the lazy dog")
624         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
625                              stdout=subprocess.PIPE, env=self.ENVIRON)
626         (arvout, arverr) = p.communicate()
627         self.assertEqual(arverr, None)
628         self.assertEqual(p.returncode, 0)
629
630         # The manifest text stored in the API server under the same
631         # manifest UUID must use signed locators.
632         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
633         self.assertRegexpMatches(
634             c['manifest_text'],
635             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
636
637         os.remove(os.path.join(datadir, "foo"))
638         os.rmdir(datadir)
639
640     def run_and_find_collection(self, text, extra_args=[]):
641         self.authorize_with('active')
642         pipe = subprocess.Popen(
643             [sys.executable, arv_put.__file__] + extra_args,
644             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
645             stderr=subprocess.PIPE, env=self.ENVIRON)
646         stdout, stderr = pipe.communicate(text)
647         search_key = ('portable_data_hash'
648                       if '--portable-data-hash' in extra_args else 'uuid')
649         collection_list = arvados.api('v1').collections().list(
650             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
651         self.assertEqual(1, len(collection_list))
652         return collection_list[0]
653
654     def test_put_collection_with_high_redundancy(self):
655         # Write empty data: we're not testing CollectionWriter, just
656         # making sure collections.create tells the API server what our
657         # desired replication level is.
658         collection = self.run_and_find_collection("", ['--replication', '4'])
659         self.assertEqual(4, collection['replication_desired'])
660
661     def test_put_collection_with_default_redundancy(self):
662         collection = self.run_and_find_collection("")
663         self.assertEqual(None, collection['replication_desired'])
664
665     def test_put_collection_with_unnamed_project_link(self):
666         link = self.run_and_find_collection(
667             "Test unnamed collection",
668             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
669         username = pwd.getpwuid(os.getuid()).pw_name
670         self.assertRegexpMatches(
671             link['name'],
672             r'^Saved at .* by {}@'.format(re.escape(username)))
673
674     def test_put_collection_with_name_and_no_project(self):
675         link_name = 'Test Collection Link in home project'
676         collection = self.run_and_find_collection(
677             "Test named collection in home project",
678             ['--portable-data-hash', '--name', link_name])
679         self.assertEqual(link_name, collection['name'])
680         my_user_uuid = self.current_user()['uuid']
681         self.assertEqual(my_user_uuid, collection['owner_uuid'])
682
683     def test_put_collection_with_named_project_link(self):
684         link_name = 'Test auto Collection Link'
685         collection = self.run_and_find_collection("Test named collection",
686                                       ['--portable-data-hash',
687                                        '--name', link_name,
688                                        '--project-uuid', self.PROJECT_UUID])
689         self.assertEqual(link_name, collection['name'])
690
691
692 if __name__ == '__main__':
693     unittest.main()