10587: Removed use of multiprocessing module on --version tests. Added a common stder...
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import io
6 import mock
7 import os
8 import pwd
9 import re
10 import shutil
11 import subprocess
12 import sys
13 import tempfile
14 import time
15 import unittest
16 import yaml
17 import threading
18 import hashlib
19 import random
20
21 from cStringIO import StringIO
22
23 import arvados
24 import arvados.commands.put as arv_put
25 import arvados_testutil as tutil
26
27 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
28 import run_test_server
29
30 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
31     CACHE_ARGSET = [
32         [],
33         ['/dev/null'],
34         ['/dev/null', '--filename', 'empty'],
35         ['/tmp'],
36         ['/tmp', '--max-manifest-depth', '0'],
37         ['/tmp', '--max-manifest-depth', '1']
38         ]
39
40     def tearDown(self):
41         super(ArvadosPutResumeCacheTest, self).tearDown()
42         try:
43             self.last_cache.destroy()
44         except AttributeError:
45             pass
46
47     def cache_path_from_arglist(self, arglist):
48         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
49
50     def test_cache_names_stable(self):
51         for argset in self.CACHE_ARGSET:
52             self.assertEqual(self.cache_path_from_arglist(argset),
53                               self.cache_path_from_arglist(argset),
54                               "cache name changed for {}".format(argset))
55
56     def test_cache_names_unique(self):
57         results = []
58         for argset in self.CACHE_ARGSET:
59             path = self.cache_path_from_arglist(argset)
60             self.assertNotIn(path, results)
61             results.append(path)
62
63     def test_cache_names_simple(self):
64         # The goal here is to make sure the filename doesn't use characters
65         # reserved by the filesystem.  Feel free to adjust this regexp as
66         # long as it still does that.
67         bad_chars = re.compile(r'[^-\.\w]')
68         for argset in self.CACHE_ARGSET:
69             path = self.cache_path_from_arglist(argset)
70             self.assertFalse(bad_chars.search(os.path.basename(path)),
71                              "path too exotic: {}".format(path))
72
73     def test_cache_names_ignore_argument_order(self):
74         self.assertEqual(
75             self.cache_path_from_arglist(['a', 'b', 'c']),
76             self.cache_path_from_arglist(['c', 'a', 'b']))
77         self.assertEqual(
78             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
79             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
80
81     def test_cache_names_differ_for_similar_paths(self):
82         # This test needs names at / that don't exist on the real filesystem.
83         self.assertNotEqual(
84             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
85             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
86
87     def test_cache_names_ignore_irrelevant_arguments(self):
88         # Workaround: parse_arguments bails on --filename with a directory.
89         path1 = self.cache_path_from_arglist(['/tmp'])
90         args = arv_put.parse_arguments(['/tmp'])
91         args.filename = 'tmp'
92         path2 = arv_put.ResumeCache.make_path(args)
93         self.assertEqual(path1, path2,
94                          "cache path considered --filename for directory")
95         self.assertEqual(
96             self.cache_path_from_arglist(['-']),
97             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
98             "cache path considered --max-manifest-depth for file")
99
100     def test_cache_names_treat_negative_manifest_depths_identically(self):
101         base_args = ['/tmp', '--max-manifest-depth']
102         self.assertEqual(
103             self.cache_path_from_arglist(base_args + ['-1']),
104             self.cache_path_from_arglist(base_args + ['-2']))
105
106     def test_cache_names_treat_stdin_consistently(self):
107         self.assertEqual(
108             self.cache_path_from_arglist(['-', '--filename', 'test']),
109             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
110
111     def test_cache_names_identical_for_synonymous_names(self):
112         self.assertEqual(
113             self.cache_path_from_arglist(['.']),
114             self.cache_path_from_arglist([os.path.realpath('.')]))
115         testdir = self.make_tmpdir()
116         looplink = os.path.join(testdir, 'loop')
117         os.symlink(testdir, looplink)
118         self.assertEqual(
119             self.cache_path_from_arglist([testdir]),
120             self.cache_path_from_arglist([looplink]))
121
122     def test_cache_names_different_by_api_host(self):
123         config = arvados.config.settings()
124         orig_host = config.get('ARVADOS_API_HOST')
125         try:
126             name1 = self.cache_path_from_arglist(['.'])
127             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
128             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
129         finally:
130             if orig_host is None:
131                 del config['ARVADOS_API_HOST']
132             else:
133                 config['ARVADOS_API_HOST'] = orig_host
134
135     @mock.patch('arvados.keep.KeepClient.head')
136     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
137         keep_client_head.side_effect = [True]
138         thing = {}
139         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
140         with tempfile.NamedTemporaryFile() as cachefile:
141             self.last_cache = arv_put.ResumeCache(cachefile.name)
142         self.last_cache.save(thing)
143         self.last_cache.close()
144         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
145         self.assertNotEqual(None, resume_cache)
146
147     @mock.patch('arvados.keep.KeepClient.head')
148     def test_resume_cache_with_finished_streams(self, keep_client_head):
149         keep_client_head.side_effect = [True]
150         thing = {}
151         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
152         with tempfile.NamedTemporaryFile() as cachefile:
153             self.last_cache = arv_put.ResumeCache(cachefile.name)
154         self.last_cache.save(thing)
155         self.last_cache.close()
156         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
157         self.assertNotEqual(None, resume_cache)
158
159     @mock.patch('arvados.keep.KeepClient.head')
160     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
161         keep_client_head.side_effect = Exception('Locator not found')
162         thing = {}
163         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
164         with tempfile.NamedTemporaryFile() as cachefile:
165             self.last_cache = arv_put.ResumeCache(cachefile.name)
166         self.last_cache.save(thing)
167         self.last_cache.close()
168         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
169         self.assertNotEqual(None, resume_cache)
170         self.assertRaises(None, resume_cache.check_cache())
171
172     def test_basic_cache_storage(self):
173         thing = ['test', 'list']
174         with tempfile.NamedTemporaryFile() as cachefile:
175             self.last_cache = arv_put.ResumeCache(cachefile.name)
176         self.last_cache.save(thing)
177         self.assertEqual(thing, self.last_cache.load())
178
179     def test_empty_cache(self):
180         with tempfile.NamedTemporaryFile() as cachefile:
181             cache = arv_put.ResumeCache(cachefile.name)
182         self.assertRaises(ValueError, cache.load)
183
184     def test_cache_persistent(self):
185         thing = ['test', 'list']
186         path = os.path.join(self.make_tmpdir(), 'cache')
187         cache = arv_put.ResumeCache(path)
188         cache.save(thing)
189         cache.close()
190         self.last_cache = arv_put.ResumeCache(path)
191         self.assertEqual(thing, self.last_cache.load())
192
193     def test_multiple_cache_writes(self):
194         thing = ['short', 'list']
195         with tempfile.NamedTemporaryFile() as cachefile:
196             self.last_cache = arv_put.ResumeCache(cachefile.name)
197         # Start writing an object longer than the one we test, to make
198         # sure the cache file gets truncated.
199         self.last_cache.save(['long', 'long', 'list'])
200         self.last_cache.save(thing)
201         self.assertEqual(thing, self.last_cache.load())
202
203     def test_cache_is_locked(self):
204         with tempfile.NamedTemporaryFile() as cachefile:
205             cache = arv_put.ResumeCache(cachefile.name)
206             self.assertRaises(arv_put.ResumeCacheConflict,
207                               arv_put.ResumeCache, cachefile.name)
208
209     def test_cache_stays_locked(self):
210         with tempfile.NamedTemporaryFile() as cachefile:
211             self.last_cache = arv_put.ResumeCache(cachefile.name)
212             path = cachefile.name
213         self.last_cache.save('test')
214         self.assertRaises(arv_put.ResumeCacheConflict,
215                           arv_put.ResumeCache, path)
216
217     def test_destroy_cache(self):
218         cachefile = tempfile.NamedTemporaryFile(delete=False)
219         try:
220             cache = arv_put.ResumeCache(cachefile.name)
221             cache.save('test')
222             cache.destroy()
223             try:
224                 arv_put.ResumeCache(cachefile.name)
225             except arv_put.ResumeCacheConflict:
226                 self.fail("could not load cache after destroying it")
227             self.assertRaises(ValueError, cache.load)
228         finally:
229             if os.path.exists(cachefile.name):
230                 os.unlink(cachefile.name)
231
232     def test_restart_cache(self):
233         path = os.path.join(self.make_tmpdir(), 'cache')
234         cache = arv_put.ResumeCache(path)
235         cache.save('test')
236         cache.restart()
237         self.assertRaises(ValueError, cache.load)
238         self.assertRaises(arv_put.ResumeCacheConflict,
239                           arv_put.ResumeCache, path)
240
241
242 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
243                           ArvadosBaseTestCase):
244     def setUp(self):
245         super(ArvPutUploadJobTest, self).setUp()
246         run_test_server.authorize_with('active')
247         # Temp files creation
248         self.tempdir = tempfile.mkdtemp()
249         subdir = os.path.join(self.tempdir, 'subdir')
250         os.mkdir(subdir)
251         data = "x" * 1024 # 1 KB
252         for i in range(1, 5):
253             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
254                 f.write(data * i)
255         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
256             f.write(data * 5)
257         # Large temp file for resume test
258         _, self.large_file_name = tempfile.mkstemp()
259         fileobj = open(self.large_file_name, 'w')
260         # Make sure to write just a little more than one block
261         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
262             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
263             fileobj.write(data)
264         fileobj.close()
265         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
266
267     def tearDown(self):
268         super(ArvPutUploadJobTest, self).tearDown()
269         shutil.rmtree(self.tempdir)
270         os.unlink(self.large_file_name)
271
272     def test_writer_works_without_cache(self):
273         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
274         cwriter.start()
275         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
276
277     def test_writer_works_with_cache(self):
278         with tempfile.NamedTemporaryFile() as f:
279             f.write('foo')
280             f.flush()
281             cwriter = arv_put.ArvPutUploadJob([f.name])
282             cwriter.start()
283             self.assertEqual(3, cwriter.bytes_written)
284             # Don't destroy the cache, and start another upload
285             cwriter_new = arv_put.ArvPutUploadJob([f.name])
286             cwriter_new.start()
287             cwriter_new.destroy_cache()
288             self.assertEqual(0, cwriter_new.bytes_written)
289
290     def make_progress_tester(self):
291         progression = []
292         def record_func(written, expected):
293             progression.append((written, expected))
294         return progression, record_func
295
296     def test_progress_reporting(self):
297         with tempfile.NamedTemporaryFile() as f:
298             f.write('foo')
299             f.flush()
300             for expect_count in (None, 8):
301                 progression, reporter = self.make_progress_tester()
302                 cwriter = arv_put.ArvPutUploadJob([f.name],
303                     reporter=reporter, bytes_expected=expect_count)
304                 cwriter.start()
305                 cwriter.destroy_cache()
306                 self.assertIn((3, expect_count), progression)
307
308     def test_writer_upload_directory(self):
309         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
310         cwriter.start()
311         cwriter.destroy_cache()
312         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
313
314     def test_resume_large_file_upload(self):
315         def wrapped_write(*args, **kwargs):
316             data = args[1]
317             # Exit only on last block
318             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
319                 raise SystemExit("Simulated error")
320             return self.arvfile_write(*args, **kwargs)
321
322         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
323                         autospec=True) as mocked_write:
324             mocked_write.side_effect = wrapped_write
325             writer = arv_put.ArvPutUploadJob([self.large_file_name],
326                                              replication_desired=1)
327             with self.assertRaises(SystemExit):
328                 writer.start()
329                 self.assertLess(writer.bytes_written,
330                                 os.path.getsize(self.large_file_name))
331         # Retry the upload
332         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
333                                           replication_desired=1)
334         writer2.start()
335         self.assertEqual(writer.bytes_written + writer2.bytes_written,
336                          os.path.getsize(self.large_file_name))
337         writer2.destroy_cache()
338
339
340 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
341     TEST_SIZE = os.path.getsize(__file__)
342
343     def test_expected_bytes_for_file(self):
344         self.assertEqual(self.TEST_SIZE,
345                           arv_put.expected_bytes_for([__file__]))
346
347     def test_expected_bytes_for_tree(self):
348         tree = self.make_tmpdir()
349         shutil.copyfile(__file__, os.path.join(tree, 'one'))
350         shutil.copyfile(__file__, os.path.join(tree, 'two'))
351         self.assertEqual(self.TEST_SIZE * 2,
352                           arv_put.expected_bytes_for([tree]))
353         self.assertEqual(self.TEST_SIZE * 3,
354                           arv_put.expected_bytes_for([tree, __file__]))
355
356     def test_expected_bytes_for_device(self):
357         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
358         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
359
360
361 class ArvadosPutReportTest(ArvadosBaseTestCase):
362     def test_machine_progress(self):
363         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
364             expect = ": {} written {} total\n".format(
365                 count, -1 if (total is None) else total)
366             self.assertTrue(
367                 arv_put.machine_progress(count, total).endswith(expect))
368
369     def test_known_human_progress(self):
370         for count, total in [(0, 1), (2, 4), (45, 60)]:
371             expect = '{:.1%}'.format(float(count) / total)
372             actual = arv_put.human_progress(count, total)
373             self.assertTrue(actual.startswith('\r'))
374             self.assertIn(expect, actual)
375
376     def test_unknown_human_progress(self):
377         for count in [1, 20, 300, 4000, 50000]:
378             self.assertTrue(re.search(r'\b{}\b'.format(count),
379                                       arv_put.human_progress(count, None)))
380
381
382 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
383     MAIN_SERVER = {}
384     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
385
386     def call_main_with_args(self, args):
387         self.main_stdout = StringIO()
388         self.main_stderr = StringIO()
389         return arv_put.main(args, self.main_stdout, self.main_stderr)
390
391     def call_main_on_test_file(self, args=[]):
392         with self.make_test_file() as testfile:
393             path = testfile.name
394             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
395         self.assertTrue(
396             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
397                                         '098f6bcd4621d373cade4e832627b4f6')),
398             "did not find file stream in Keep store")
399
400     def setUp(self):
401         super(ArvadosPutTest, self).setUp()
402         run_test_server.authorize_with('active')
403         arv_put.api_client = None
404
405     def tearDown(self):
406         for outbuf in ['main_stdout', 'main_stderr']:
407             if hasattr(self, outbuf):
408                 getattr(self, outbuf).close()
409                 delattr(self, outbuf)
410         super(ArvadosPutTest, self).tearDown()
411
412     def test_version_argument(self):
413         err = io.BytesIO()
414         out = io.BytesIO()
415         with tutil.redirected_streams(stdout=out, stderr=err):
416             with self.assertRaises(SystemExit):
417                 self.call_main_with_args(['--version'])
418         self.assertEqual(out.getvalue(), '')
419         self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
420
421     def test_simple_file_put(self):
422         self.call_main_on_test_file()
423
424     def test_put_with_unwriteable_cache_dir(self):
425         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
426         cachedir = self.make_tmpdir()
427         os.chmod(cachedir, 0o0)
428         arv_put.ResumeCache.CACHE_DIR = cachedir
429         try:
430             self.call_main_on_test_file()
431         finally:
432             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
433             os.chmod(cachedir, 0o700)
434
435     def test_put_with_unwritable_cache_subdir(self):
436         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
437         cachedir = self.make_tmpdir()
438         os.chmod(cachedir, 0o0)
439         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
440         try:
441             self.call_main_on_test_file()
442         finally:
443             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
444             os.chmod(cachedir, 0o700)
445
446     def test_put_block_replication(self):
447         self.call_main_on_test_file()
448         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
449             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
450             self.call_main_on_test_file(['--replication', '1'])
451             self.call_main_on_test_file(['--replication', '4'])
452             self.call_main_on_test_file(['--replication', '5'])
453             self.assertEqual(
454                 [x[-1].get('copies') for x in put_mock.call_args_list],
455                 [1, 4, 5])
456
457     def test_normalize(self):
458         testfile1 = self.make_test_file()
459         testfile2 = self.make_test_file()
460         test_paths = [testfile1.name, testfile2.name]
461         # Reverse-sort the paths, so normalization must change their order.
462         test_paths.sort(reverse=True)
463         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
464                                  test_paths)
465         manifest = self.main_stdout.getvalue()
466         # Assert the second file we specified appears first in the manifest.
467         file_indices = [manifest.find(':' + os.path.basename(path))
468                         for path in test_paths]
469         self.assertGreater(*file_indices)
470
471     def test_error_name_without_collection(self):
472         self.assertRaises(SystemExit, self.call_main_with_args,
473                           ['--name', 'test without Collection',
474                            '--stream', '/dev/null'])
475
476     def test_error_when_project_not_found(self):
477         self.assertRaises(SystemExit,
478                           self.call_main_with_args,
479                           ['--project-uuid', self.Z_UUID])
480
481     def test_error_bad_project_uuid(self):
482         self.assertRaises(SystemExit,
483                           self.call_main_with_args,
484                           ['--project-uuid', self.Z_UUID, '--stream'])
485
486     def test_api_error_handling(self):
487         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
488         coll_save_mock.side_effect = arvados.errors.ApiError(
489             fake_httplib2_response(403), '{}')
490         with mock.patch('arvados.collection.Collection.save_new',
491                         new=coll_save_mock):
492             with self.assertRaises(SystemExit) as exc_test:
493                 self.call_main_with_args(['/dev/null'])
494             self.assertLess(0, exc_test.exception.args[0])
495             self.assertLess(0, coll_save_mock.call_count)
496             self.assertEqual("", self.main_stdout.getvalue())
497
498
499 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
500                             ArvadosBaseTestCase):
501     def _getKeepServerConfig():
502         for config_file, mandatory in [
503                 ['application.yml', False], ['application.default.yml', True]]:
504             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
505                                 "api", "config", config_file)
506             if not mandatory and not os.path.exists(path):
507                 continue
508             with open(path) as f:
509                 rails_config = yaml.load(f.read())
510                 for config_section in ['test', 'common']:
511                     try:
512                         key = rails_config[config_section]["blob_signing_key"]
513                     except (KeyError, TypeError):
514                         pass
515                     else:
516                         return {'blob_signing_key': key,
517                                 'enforce_permissions': True}
518         return {'blog_signing_key': None, 'enforce_permissions': False}
519
520     MAIN_SERVER = {}
521     KEEP_SERVER = _getKeepServerConfig()
522     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
523
524     @classmethod
525     def setUpClass(cls):
526         super(ArvPutIntegrationTest, cls).setUpClass()
527         cls.ENVIRON = os.environ.copy()
528         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
529
530     def setUp(self):
531         super(ArvPutIntegrationTest, self).setUp()
532         arv_put.api_client = None
533
534     def authorize_with(self, token_name):
535         run_test_server.authorize_with(token_name)
536         for v in ["ARVADOS_API_HOST",
537                   "ARVADOS_API_HOST_INSECURE",
538                   "ARVADOS_API_TOKEN"]:
539             self.ENVIRON[v] = arvados.config.settings()[v]
540         arv_put.api_client = arvados.api('v1')
541
542     def current_user(self):
543         return arv_put.api_client.users().current().execute()
544
545     def test_check_real_project_found(self):
546         self.authorize_with('active')
547         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
548                         "did not correctly find test fixture project")
549
550     def test_check_error_finding_nonexistent_uuid(self):
551         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
552         self.authorize_with('active')
553         try:
554             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
555                                                   0)
556         except ValueError as error:
557             self.assertIn(BAD_UUID, error.message)
558         else:
559             self.assertFalse(result, "incorrectly found nonexistent project")
560
561     def test_check_error_finding_nonexistent_project(self):
562         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
563         self.authorize_with('active')
564         with self.assertRaises(apiclient.errors.HttpError):
565             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
566                                                   0)
567
568     def test_short_put_from_stdin(self):
569         # Have to run this as an integration test since arv-put can't
570         # read from the tests' stdin.
571         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
572         # case, because the /proc entry is already gone by the time it tries.
573         pipe = subprocess.Popen(
574             [sys.executable, arv_put.__file__, '--stream'],
575             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
576             stderr=subprocess.STDOUT, env=self.ENVIRON)
577         pipe.stdin.write('stdin test\n')
578         pipe.stdin.close()
579         deadline = time.time() + 5
580         while (pipe.poll() is None) and (time.time() < deadline):
581             time.sleep(.1)
582         returncode = pipe.poll()
583         if returncode is None:
584             pipe.terminate()
585             self.fail("arv-put did not PUT from stdin within 5 seconds")
586         elif returncode != 0:
587             sys.stdout.write(pipe.stdout.read())
588             self.fail("arv-put returned exit code {}".format(returncode))
589         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
590
591     def test_ArvPutSignedManifest(self):
592         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
593         # the newly created manifest from the API server, testing to confirm
594         # that the block locators in the returned manifest are signed.
595         self.authorize_with('active')
596
597         # Before doing anything, demonstrate that the collection
598         # we're about to create is not present in our test fixture.
599         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
600         with self.assertRaises(apiclient.errors.HttpError):
601             notfound = arv_put.api_client.collections().get(
602                 uuid=manifest_uuid).execute()
603
604         datadir = self.make_tmpdir()
605         with open(os.path.join(datadir, "foo"), "w") as f:
606             f.write("The quick brown fox jumped over the lazy dog")
607         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
608                              stdout=subprocess.PIPE, env=self.ENVIRON)
609         (arvout, arverr) = p.communicate()
610         self.assertEqual(arverr, None)
611         self.assertEqual(p.returncode, 0)
612
613         # The manifest text stored in the API server under the same
614         # manifest UUID must use signed locators.
615         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
616         self.assertRegexpMatches(
617             c['manifest_text'],
618             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
619
620         os.remove(os.path.join(datadir, "foo"))
621         os.rmdir(datadir)
622
623     def run_and_find_collection(self, text, extra_args=[]):
624         self.authorize_with('active')
625         pipe = subprocess.Popen(
626             [sys.executable, arv_put.__file__] + extra_args,
627             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
628             stderr=subprocess.PIPE, env=self.ENVIRON)
629         stdout, stderr = pipe.communicate(text)
630         search_key = ('portable_data_hash'
631                       if '--portable-data-hash' in extra_args else 'uuid')
632         collection_list = arvados.api('v1').collections().list(
633             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
634         self.assertEqual(1, len(collection_list))
635         return collection_list[0]
636
637     def test_put_collection_with_high_redundancy(self):
638         # Write empty data: we're not testing CollectionWriter, just
639         # making sure collections.create tells the API server what our
640         # desired replication level is.
641         collection = self.run_and_find_collection("", ['--replication', '4'])
642         self.assertEqual(4, collection['replication_desired'])
643
644     def test_put_collection_with_default_redundancy(self):
645         collection = self.run_and_find_collection("")
646         self.assertEqual(None, collection['replication_desired'])
647
648     def test_put_collection_with_unnamed_project_link(self):
649         link = self.run_and_find_collection(
650             "Test unnamed collection",
651             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
652         username = pwd.getpwuid(os.getuid()).pw_name
653         self.assertRegexpMatches(
654             link['name'],
655             r'^Saved at .* by {}@'.format(re.escape(username)))
656
657     def test_put_collection_with_name_and_no_project(self):
658         link_name = 'Test Collection Link in home project'
659         collection = self.run_and_find_collection(
660             "Test named collection in home project",
661             ['--portable-data-hash', '--name', link_name])
662         self.assertEqual(link_name, collection['name'])
663         my_user_uuid = self.current_user()['uuid']
664         self.assertEqual(my_user_uuid, collection['owner_uuid'])
665
666     def test_put_collection_with_named_project_link(self):
667         link_name = 'Test auto Collection Link'
668         collection = self.run_and_find_collection("Test named collection",
669                                       ['--portable-data-hash',
670                                        '--name', link_name,
671                                        '--project-uuid', self.PROJECT_UUID])
672         self.assertEqual(link_name, collection['name'])
673
674
675 if __name__ == '__main__':
676     unittest.main()