Merge branch '9857-cwl-acceptlist-re' refs #9857
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import mock
6 import os
7 import pwd
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import tempfile
13 import time
14 import unittest
15 import yaml
16
17 from cStringIO import StringIO
18
19 import arvados
20 import arvados.commands.put as arv_put
21
22 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
23 import run_test_server
24
25 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
26     CACHE_ARGSET = [
27         [],
28         ['/dev/null'],
29         ['/dev/null', '--filename', 'empty'],
30         ['/tmp'],
31         ['/tmp', '--max-manifest-depth', '0'],
32         ['/tmp', '--max-manifest-depth', '1']
33         ]
34
35     def tearDown(self):
36         super(ArvadosPutResumeCacheTest, self).tearDown()
37         try:
38             self.last_cache.destroy()
39         except AttributeError:
40             pass
41
42     def cache_path_from_arglist(self, arglist):
43         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
44
45     def test_cache_names_stable(self):
46         for argset in self.CACHE_ARGSET:
47             self.assertEqual(self.cache_path_from_arglist(argset),
48                               self.cache_path_from_arglist(argset),
49                               "cache name changed for {}".format(argset))
50
51     def test_cache_names_unique(self):
52         results = []
53         for argset in self.CACHE_ARGSET:
54             path = self.cache_path_from_arglist(argset)
55             self.assertNotIn(path, results)
56             results.append(path)
57
58     def test_cache_names_simple(self):
59         # The goal here is to make sure the filename doesn't use characters
60         # reserved by the filesystem.  Feel free to adjust this regexp as
61         # long as it still does that.
62         bad_chars = re.compile(r'[^-\.\w]')
63         for argset in self.CACHE_ARGSET:
64             path = self.cache_path_from_arglist(argset)
65             self.assertFalse(bad_chars.search(os.path.basename(path)),
66                              "path too exotic: {}".format(path))
67
68     def test_cache_names_ignore_argument_order(self):
69         self.assertEqual(
70             self.cache_path_from_arglist(['a', 'b', 'c']),
71             self.cache_path_from_arglist(['c', 'a', 'b']))
72         self.assertEqual(
73             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
74             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
75
76     def test_cache_names_differ_for_similar_paths(self):
77         # This test needs names at / that don't exist on the real filesystem.
78         self.assertNotEqual(
79             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
80             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
81
82     def test_cache_names_ignore_irrelevant_arguments(self):
83         # Workaround: parse_arguments bails on --filename with a directory.
84         path1 = self.cache_path_from_arglist(['/tmp'])
85         args = arv_put.parse_arguments(['/tmp'])
86         args.filename = 'tmp'
87         path2 = arv_put.ResumeCache.make_path(args)
88         self.assertEqual(path1, path2,
89                          "cache path considered --filename for directory")
90         self.assertEqual(
91             self.cache_path_from_arglist(['-']),
92             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
93             "cache path considered --max-manifest-depth for file")
94
95     def test_cache_names_treat_negative_manifest_depths_identically(self):
96         base_args = ['/tmp', '--max-manifest-depth']
97         self.assertEqual(
98             self.cache_path_from_arglist(base_args + ['-1']),
99             self.cache_path_from_arglist(base_args + ['-2']))
100
101     def test_cache_names_treat_stdin_consistently(self):
102         self.assertEqual(
103             self.cache_path_from_arglist(['-', '--filename', 'test']),
104             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
105
106     def test_cache_names_identical_for_synonymous_names(self):
107         self.assertEqual(
108             self.cache_path_from_arglist(['.']),
109             self.cache_path_from_arglist([os.path.realpath('.')]))
110         testdir = self.make_tmpdir()
111         looplink = os.path.join(testdir, 'loop')
112         os.symlink(testdir, looplink)
113         self.assertEqual(
114             self.cache_path_from_arglist([testdir]),
115             self.cache_path_from_arglist([looplink]))
116
117     def test_cache_names_different_by_api_host(self):
118         config = arvados.config.settings()
119         orig_host = config.get('ARVADOS_API_HOST')
120         try:
121             name1 = self.cache_path_from_arglist(['.'])
122             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
123             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
124         finally:
125             if orig_host is None:
126                 del config['ARVADOS_API_HOST']
127             else:
128                 config['ARVADOS_API_HOST'] = orig_host
129
130     @mock.patch('arvados.keep.KeepClient.head')
131     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
132         keep_client_head.side_effect = [True]
133         thing = {}
134         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
135         with tempfile.NamedTemporaryFile() as cachefile:
136             self.last_cache = arv_put.ResumeCache(cachefile.name)
137         self.last_cache.save(thing)
138         self.last_cache.close()
139         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
140         self.assertNotEqual(None, resume_cache)
141
142     @mock.patch('arvados.keep.KeepClient.head')
143     def test_resume_cache_with_finished_streams(self, keep_client_head):
144         keep_client_head.side_effect = [True]
145         thing = {}
146         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
147         with tempfile.NamedTemporaryFile() as cachefile:
148             self.last_cache = arv_put.ResumeCache(cachefile.name)
149         self.last_cache.save(thing)
150         self.last_cache.close()
151         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
152         self.assertNotEqual(None, resume_cache)
153
154     @mock.patch('arvados.keep.KeepClient.head')
155     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
156         keep_client_head.side_effect = Exception('Locator not found')
157         thing = {}
158         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
159         with tempfile.NamedTemporaryFile() as cachefile:
160             self.last_cache = arv_put.ResumeCache(cachefile.name)
161         self.last_cache.save(thing)
162         self.last_cache.close()
163         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
164         self.assertNotEqual(None, resume_cache)
165         self.assertRaises(None, resume_cache.check_cache())
166
167     def test_basic_cache_storage(self):
168         thing = ['test', 'list']
169         with tempfile.NamedTemporaryFile() as cachefile:
170             self.last_cache = arv_put.ResumeCache(cachefile.name)
171         self.last_cache.save(thing)
172         self.assertEqual(thing, self.last_cache.load())
173
174     def test_empty_cache(self):
175         with tempfile.NamedTemporaryFile() as cachefile:
176             cache = arv_put.ResumeCache(cachefile.name)
177         self.assertRaises(ValueError, cache.load)
178
179     def test_cache_persistent(self):
180         thing = ['test', 'list']
181         path = os.path.join(self.make_tmpdir(), 'cache')
182         cache = arv_put.ResumeCache(path)
183         cache.save(thing)
184         cache.close()
185         self.last_cache = arv_put.ResumeCache(path)
186         self.assertEqual(thing, self.last_cache.load())
187
188     def test_multiple_cache_writes(self):
189         thing = ['short', 'list']
190         with tempfile.NamedTemporaryFile() as cachefile:
191             self.last_cache = arv_put.ResumeCache(cachefile.name)
192         # Start writing an object longer than the one we test, to make
193         # sure the cache file gets truncated.
194         self.last_cache.save(['long', 'long', 'list'])
195         self.last_cache.save(thing)
196         self.assertEqual(thing, self.last_cache.load())
197
198     def test_cache_is_locked(self):
199         with tempfile.NamedTemporaryFile() as cachefile:
200             cache = arv_put.ResumeCache(cachefile.name)
201             self.assertRaises(arv_put.ResumeCacheConflict,
202                               arv_put.ResumeCache, cachefile.name)
203
204     def test_cache_stays_locked(self):
205         with tempfile.NamedTemporaryFile() as cachefile:
206             self.last_cache = arv_put.ResumeCache(cachefile.name)
207             path = cachefile.name
208         self.last_cache.save('test')
209         self.assertRaises(arv_put.ResumeCacheConflict,
210                           arv_put.ResumeCache, path)
211
212     def test_destroy_cache(self):
213         cachefile = tempfile.NamedTemporaryFile(delete=False)
214         try:
215             cache = arv_put.ResumeCache(cachefile.name)
216             cache.save('test')
217             cache.destroy()
218             try:
219                 arv_put.ResumeCache(cachefile.name)
220             except arv_put.ResumeCacheConflict:
221                 self.fail("could not load cache after destroying it")
222             self.assertRaises(ValueError, cache.load)
223         finally:
224             if os.path.exists(cachefile.name):
225                 os.unlink(cachefile.name)
226
227     def test_restart_cache(self):
228         path = os.path.join(self.make_tmpdir(), 'cache')
229         cache = arv_put.ResumeCache(path)
230         cache.save('test')
231         cache.restart()
232         self.assertRaises(ValueError, cache.load)
233         self.assertRaises(arv_put.ResumeCacheConflict,
234                           arv_put.ResumeCache, path)
235
236
237 class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
238                                      ArvadosBaseTestCase):
239     def setUp(self):
240         super(ArvadosPutCollectionWriterTest, self).setUp()
241         run_test_server.authorize_with('active')
242         with tempfile.NamedTemporaryFile(delete=False) as cachefile:
243             self.cache = arv_put.ResumeCache(cachefile.name)
244             self.cache_filename = cachefile.name
245
246     def tearDown(self):
247         super(ArvadosPutCollectionWriterTest, self).tearDown()
248         if os.path.exists(self.cache_filename):
249             self.cache.destroy()
250         self.cache.close()
251
252     def test_writer_caches(self):
253         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
254         cwriter.write_file('/dev/null')
255         cwriter.cache_state()
256         self.assertTrue(self.cache.load())
257         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
258
259     def test_writer_works_without_cache(self):
260         cwriter = arv_put.ArvPutCollectionWriter()
261         cwriter.write_file('/dev/null')
262         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
263
264     def test_writer_resumes_from_cache(self):
265         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
266         with self.make_test_file() as testfile:
267             cwriter.write_file(testfile.name, 'test')
268             cwriter.cache_state()
269             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
270                 self.cache)
271             self.assertEqual(
272                 ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
273                 new_writer.manifest_text())
274
275     def test_new_writer_from_stale_cache(self):
276         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
277         with self.make_test_file() as testfile:
278             cwriter.write_file(testfile.name, 'test')
279         new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
280         new_writer.write_file('/dev/null')
281         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
282
283     def test_new_writer_from_empty_cache(self):
284         cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
285         cwriter.write_file('/dev/null')
286         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
287
288     def test_writer_resumable_after_arbitrary_bytes(self):
289         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
290         # These bytes are intentionally not valid UTF-8.
291         with self.make_test_file('\x00\x07\xe2') as testfile:
292             cwriter.write_file(testfile.name, 'test')
293             cwriter.cache_state()
294             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
295                 self.cache)
296         self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
297
298     def make_progress_tester(self):
299         progression = []
300         def record_func(written, expected):
301             progression.append((written, expected))
302         return progression, record_func
303
304     def test_progress_reporting(self):
305         for expect_count in (None, 8):
306             progression, reporter = self.make_progress_tester()
307             cwriter = arv_put.ArvPutCollectionWriter(
308                 reporter=reporter, bytes_expected=expect_count)
309             with self.make_test_file() as testfile:
310                 cwriter.write_file(testfile.name, 'test')
311             cwriter.finish_current_stream()
312             self.assertIn((4, expect_count), progression)
313
314     def test_resume_progress(self):
315         cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
316         with self.make_test_file() as testfile:
317             # Set up a writer with some flushed bytes.
318             cwriter.write_file(testfile.name, 'test')
319             cwriter.finish_current_stream()
320             cwriter.cache_state()
321             new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
322             self.assertEqual(new_writer.bytes_written, 4)
323
324
325 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
326     TEST_SIZE = os.path.getsize(__file__)
327
328     def test_expected_bytes_for_file(self):
329         self.assertEqual(self.TEST_SIZE,
330                           arv_put.expected_bytes_for([__file__]))
331
332     def test_expected_bytes_for_tree(self):
333         tree = self.make_tmpdir()
334         shutil.copyfile(__file__, os.path.join(tree, 'one'))
335         shutil.copyfile(__file__, os.path.join(tree, 'two'))
336         self.assertEqual(self.TEST_SIZE * 2,
337                           arv_put.expected_bytes_for([tree]))
338         self.assertEqual(self.TEST_SIZE * 3,
339                           arv_put.expected_bytes_for([tree, __file__]))
340
341     def test_expected_bytes_for_device(self):
342         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
343         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
344
345
346 class ArvadosPutReportTest(ArvadosBaseTestCase):
347     def test_machine_progress(self):
348         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
349             expect = ": {} written {} total\n".format(
350                 count, -1 if (total is None) else total)
351             self.assertTrue(
352                 arv_put.machine_progress(count, total).endswith(expect))
353
354     def test_known_human_progress(self):
355         for count, total in [(0, 1), (2, 4), (45, 60)]:
356             expect = '{:.1%}'.format(float(count) / total)
357             actual = arv_put.human_progress(count, total)
358             self.assertTrue(actual.startswith('\r'))
359             self.assertIn(expect, actual)
360
361     def test_unknown_human_progress(self):
362         for count in [1, 20, 300, 4000, 50000]:
363             self.assertTrue(re.search(r'\b{}\b'.format(count),
364                                       arv_put.human_progress(count, None)))
365
366
367 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
368     MAIN_SERVER = {}
369     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
370
371     def call_main_with_args(self, args):
372         self.main_stdout = StringIO()
373         self.main_stderr = StringIO()
374         return arv_put.main(args, self.main_stdout, self.main_stderr)
375
376     def call_main_on_test_file(self, args=[]):
377         with self.make_test_file() as testfile:
378             path = testfile.name
379             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
380         self.assertTrue(
381             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
382                                         '098f6bcd4621d373cade4e832627b4f6')),
383             "did not find file stream in Keep store")
384
385     def setUp(self):
386         super(ArvadosPutTest, self).setUp()
387         run_test_server.authorize_with('active')
388         arv_put.api_client = None
389
390     def tearDown(self):
391         for outbuf in ['main_stdout', 'main_stderr']:
392             if hasattr(self, outbuf):
393                 getattr(self, outbuf).close()
394                 delattr(self, outbuf)
395         super(ArvadosPutTest, self).tearDown()
396
397     def test_simple_file_put(self):
398         self.call_main_on_test_file()
399
400     def test_put_with_unwriteable_cache_dir(self):
401         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
402         cachedir = self.make_tmpdir()
403         os.chmod(cachedir, 0o0)
404         arv_put.ResumeCache.CACHE_DIR = cachedir
405         try:
406             self.call_main_on_test_file()
407         finally:
408             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
409             os.chmod(cachedir, 0o700)
410
411     def test_put_with_unwritable_cache_subdir(self):
412         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
413         cachedir = self.make_tmpdir()
414         os.chmod(cachedir, 0o0)
415         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
416         try:
417             self.call_main_on_test_file()
418         finally:
419             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
420             os.chmod(cachedir, 0o700)
421
422     def test_put_block_replication(self):
423         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
424              mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
425             cache_mock.side_effect = ValueError
426             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
427             self.call_main_on_test_file(['--replication', '1'])
428             self.call_main_on_test_file(['--replication', '4'])
429             self.call_main_on_test_file(['--replication', '5'])
430             self.assertEqual(
431                 [x[-1].get('copies') for x in put_mock.call_args_list],
432                 [1, 4, 5])
433
434     def test_normalize(self):
435         testfile1 = self.make_test_file()
436         testfile2 = self.make_test_file()
437         test_paths = [testfile1.name, testfile2.name]
438         # Reverse-sort the paths, so normalization must change their order.
439         test_paths.sort(reverse=True)
440         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
441                                  test_paths)
442         manifest = self.main_stdout.getvalue()
443         # Assert the second file we specified appears first in the manifest.
444         file_indices = [manifest.find(':' + os.path.basename(path))
445                         for path in test_paths]
446         self.assertGreater(*file_indices)
447
448     def test_error_name_without_collection(self):
449         self.assertRaises(SystemExit, self.call_main_with_args,
450                           ['--name', 'test without Collection',
451                            '--stream', '/dev/null'])
452
453     def test_error_when_project_not_found(self):
454         self.assertRaises(SystemExit,
455                           self.call_main_with_args,
456                           ['--project-uuid', self.Z_UUID])
457
458     def test_error_bad_project_uuid(self):
459         self.assertRaises(SystemExit,
460                           self.call_main_with_args,
461                           ['--project-uuid', self.Z_UUID, '--stream'])
462
463     def test_api_error_handling(self):
464         collections_mock = mock.Mock(name='arv.collections()')
465         coll_create_mock = collections_mock().create().execute
466         coll_create_mock.side_effect = arvados.errors.ApiError(
467             fake_httplib2_response(403), '{}')
468         arv_put.api_client = arvados.api('v1')
469         arv_put.api_client.collections = collections_mock
470         with self.assertRaises(SystemExit) as exc_test:
471             self.call_main_with_args(['/dev/null'])
472         self.assertLess(0, exc_test.exception.args[0])
473         self.assertLess(0, coll_create_mock.call_count)
474         self.assertEqual("", self.main_stdout.getvalue())
475
476
477 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
478                             ArvadosBaseTestCase):
479     def _getKeepServerConfig():
480         for config_file, mandatory in [
481                 ['application.yml', False], ['application.default.yml', True]]:
482             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
483                                 "api", "config", config_file)
484             if not mandatory and not os.path.exists(path):
485                 continue
486             with open(path) as f:
487                 rails_config = yaml.load(f.read())
488                 for config_section in ['test', 'common']:
489                     try:
490                         key = rails_config[config_section]["blob_signing_key"]
491                     except (KeyError, TypeError):
492                         pass
493                     else:
494                         return {'blob_signing_key': key,
495                                 'enforce_permissions': True}
496         return {'blog_signing_key': None, 'enforce_permissions': False}
497
498     MAIN_SERVER = {}
499     KEEP_SERVER = _getKeepServerConfig()
500     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
501
502     @classmethod
503     def setUpClass(cls):
504         super(ArvPutIntegrationTest, cls).setUpClass()
505         cls.ENVIRON = os.environ.copy()
506         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
507
508     def setUp(self):
509         super(ArvPutIntegrationTest, self).setUp()
510         arv_put.api_client = None
511
512     def authorize_with(self, token_name):
513         run_test_server.authorize_with(token_name)
514         for v in ["ARVADOS_API_HOST",
515                   "ARVADOS_API_HOST_INSECURE",
516                   "ARVADOS_API_TOKEN"]:
517             self.ENVIRON[v] = arvados.config.settings()[v]
518         arv_put.api_client = arvados.api('v1')
519
520     def current_user(self):
521         return arv_put.api_client.users().current().execute()
522
523     def test_check_real_project_found(self):
524         self.authorize_with('active')
525         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
526                         "did not correctly find test fixture project")
527
528     def test_check_error_finding_nonexistent_uuid(self):
529         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
530         self.authorize_with('active')
531         try:
532             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
533                                                   0)
534         except ValueError as error:
535             self.assertIn(BAD_UUID, error.message)
536         else:
537             self.assertFalse(result, "incorrectly found nonexistent project")
538
539     def test_check_error_finding_nonexistent_project(self):
540         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
541         self.authorize_with('active')
542         with self.assertRaises(apiclient.errors.HttpError):
543             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
544                                                   0)
545
546     def test_short_put_from_stdin(self):
547         # Have to run this as an integration test since arv-put can't
548         # read from the tests' stdin.
549         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
550         # case, because the /proc entry is already gone by the time it tries.
551         pipe = subprocess.Popen(
552             [sys.executable, arv_put.__file__, '--stream'],
553             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
554             stderr=subprocess.STDOUT, env=self.ENVIRON)
555         pipe.stdin.write('stdin test\n')
556         pipe.stdin.close()
557         deadline = time.time() + 5
558         while (pipe.poll() is None) and (time.time() < deadline):
559             time.sleep(.1)
560         returncode = pipe.poll()
561         if returncode is None:
562             pipe.terminate()
563             self.fail("arv-put did not PUT from stdin within 5 seconds")
564         elif returncode != 0:
565             sys.stdout.write(pipe.stdout.read())
566             self.fail("arv-put returned exit code {}".format(returncode))
567         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
568
569     def test_ArvPutSignedManifest(self):
570         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
571         # the newly created manifest from the API server, testing to confirm
572         # that the block locators in the returned manifest are signed.
573         self.authorize_with('active')
574
575         # Before doing anything, demonstrate that the collection
576         # we're about to create is not present in our test fixture.
577         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
578         with self.assertRaises(apiclient.errors.HttpError):
579             notfound = arv_put.api_client.collections().get(
580                 uuid=manifest_uuid).execute()
581
582         datadir = self.make_tmpdir()
583         with open(os.path.join(datadir, "foo"), "w") as f:
584             f.write("The quick brown fox jumped over the lazy dog")
585         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
586                              stdout=subprocess.PIPE, env=self.ENVIRON)
587         (arvout, arverr) = p.communicate()
588         self.assertEqual(arverr, None)
589         self.assertEqual(p.returncode, 0)
590
591         # The manifest text stored in the API server under the same
592         # manifest UUID must use signed locators.
593         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
594         self.assertRegexpMatches(
595             c['manifest_text'],
596             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
597
598         os.remove(os.path.join(datadir, "foo"))
599         os.rmdir(datadir)
600
601     def run_and_find_collection(self, text, extra_args=[]):
602         self.authorize_with('active')
603         pipe = subprocess.Popen(
604             [sys.executable, arv_put.__file__] + extra_args,
605             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
606             stderr=subprocess.PIPE, env=self.ENVIRON)
607         stdout, stderr = pipe.communicate(text)
608         search_key = ('portable_data_hash'
609                       if '--portable-data-hash' in extra_args else 'uuid')
610         collection_list = arvados.api('v1').collections().list(
611             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
612         self.assertEqual(1, len(collection_list))
613         return collection_list[0]
614
615     def test_put_collection_with_high_redundancy(self):
616         # Write empty data: we're not testing CollectionWriter, just
617         # making sure collections.create tells the API server what our
618         # desired replication level is.
619         collection = self.run_and_find_collection("", ['--replication', '4'])
620         self.assertEqual(4, collection['replication_desired'])
621
622     def test_put_collection_with_default_redundancy(self):
623         collection = self.run_and_find_collection("")
624         self.assertEqual(None, collection['replication_desired'])
625
626     def test_put_collection_with_unnamed_project_link(self):
627         link = self.run_and_find_collection(
628             "Test unnamed collection",
629             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
630         username = pwd.getpwuid(os.getuid()).pw_name
631         self.assertRegexpMatches(
632             link['name'],
633             r'^Saved at .* by {}@'.format(re.escape(username)))
634
635     def test_put_collection_with_name_and_no_project(self):
636         link_name = 'Test Collection Link in home project'
637         collection = self.run_and_find_collection(
638             "Test named collection in home project",
639             ['--portable-data-hash', '--name', link_name])
640         self.assertEqual(link_name, collection['name'])
641         my_user_uuid = self.current_user()['uuid']
642         self.assertEqual(my_user_uuid, collection['owner_uuid'])
643
644     def test_put_collection_with_named_project_link(self):
645         link_name = 'Test auto Collection Link'
646         collection = self.run_and_find_collection("Test named collection",
647                                       ['--portable-data-hash',
648                                        '--name', link_name,
649                                        '--project-uuid', self.PROJECT_UUID])
650         self.assertEqual(link_name, collection['name'])
651
652
653 if __name__ == '__main__':
654     unittest.main()