9463: New approach on this arv-put revamping: the ArvPutUploadJob class writes to...
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import mock
6 import os
7 import pwd
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import tempfile
13 import time
14 import unittest
15 import yaml
16 import multiprocessing
17 import shutil
18 import hashlib
19 import random
20
21 from cStringIO import StringIO
22
23 import arvados
24 import arvados.commands.put as arv_put
25
26 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
27 import run_test_server
28
29 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
30     CACHE_ARGSET = [
31         [],
32         ['/dev/null'],
33         ['/dev/null', '--filename', 'empty'],
34         ['/tmp'],
35         ['/tmp', '--max-manifest-depth', '0'],
36         ['/tmp', '--max-manifest-depth', '1']
37         ]
38
39     def tearDown(self):
40         super(ArvadosPutResumeCacheTest, self).tearDown()
41         try:
42             self.last_cache.destroy()
43         except AttributeError:
44             pass
45
46     def cache_path_from_arglist(self, arglist):
47         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
48
49     def test_cache_names_stable(self):
50         for argset in self.CACHE_ARGSET:
51             self.assertEqual(self.cache_path_from_arglist(argset),
52                               self.cache_path_from_arglist(argset),
53                               "cache name changed for {}".format(argset))
54
55     def test_cache_names_unique(self):
56         results = []
57         for argset in self.CACHE_ARGSET:
58             path = self.cache_path_from_arglist(argset)
59             self.assertNotIn(path, results)
60             results.append(path)
61
62     def test_cache_names_simple(self):
63         # The goal here is to make sure the filename doesn't use characters
64         # reserved by the filesystem.  Feel free to adjust this regexp as
65         # long as it still does that.
66         bad_chars = re.compile(r'[^-\.\w]')
67         for argset in self.CACHE_ARGSET:
68             path = self.cache_path_from_arglist(argset)
69             self.assertFalse(bad_chars.search(os.path.basename(path)),
70                              "path too exotic: {}".format(path))
71
72     def test_cache_names_ignore_argument_order(self):
73         self.assertEqual(
74             self.cache_path_from_arglist(['a', 'b', 'c']),
75             self.cache_path_from_arglist(['c', 'a', 'b']))
76         self.assertEqual(
77             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
78             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
79
80     def test_cache_names_differ_for_similar_paths(self):
81         # This test needs names at / that don't exist on the real filesystem.
82         self.assertNotEqual(
83             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
84             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
85
86     def test_cache_names_ignore_irrelevant_arguments(self):
87         # Workaround: parse_arguments bails on --filename with a directory.
88         path1 = self.cache_path_from_arglist(['/tmp'])
89         args = arv_put.parse_arguments(['/tmp'])
90         args.filename = 'tmp'
91         path2 = arv_put.ResumeCache.make_path(args)
92         self.assertEqual(path1, path2,
93                          "cache path considered --filename for directory")
94         self.assertEqual(
95             self.cache_path_from_arglist(['-']),
96             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
97             "cache path considered --max-manifest-depth for file")
98
99     def test_cache_names_treat_negative_manifest_depths_identically(self):
100         base_args = ['/tmp', '--max-manifest-depth']
101         self.assertEqual(
102             self.cache_path_from_arglist(base_args + ['-1']),
103             self.cache_path_from_arglist(base_args + ['-2']))
104
105     def test_cache_names_treat_stdin_consistently(self):
106         self.assertEqual(
107             self.cache_path_from_arglist(['-', '--filename', 'test']),
108             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
109
110     def test_cache_names_identical_for_synonymous_names(self):
111         self.assertEqual(
112             self.cache_path_from_arglist(['.']),
113             self.cache_path_from_arglist([os.path.realpath('.')]))
114         testdir = self.make_tmpdir()
115         looplink = os.path.join(testdir, 'loop')
116         os.symlink(testdir, looplink)
117         self.assertEqual(
118             self.cache_path_from_arglist([testdir]),
119             self.cache_path_from_arglist([looplink]))
120
121     def test_cache_names_different_by_api_host(self):
122         config = arvados.config.settings()
123         orig_host = config.get('ARVADOS_API_HOST')
124         try:
125             name1 = self.cache_path_from_arglist(['.'])
126             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
127             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
128         finally:
129             if orig_host is None:
130                 del config['ARVADOS_API_HOST']
131             else:
132                 config['ARVADOS_API_HOST'] = orig_host
133
134     @mock.patch('arvados.keep.KeepClient.head')
135     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
136         keep_client_head.side_effect = [True]
137         thing = {}
138         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
139         with tempfile.NamedTemporaryFile() as cachefile:
140             self.last_cache = arv_put.ResumeCache(cachefile.name)
141         self.last_cache.save(thing)
142         self.last_cache.close()
143         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
144         self.assertNotEqual(None, resume_cache)
145
146     @mock.patch('arvados.keep.KeepClient.head')
147     def test_resume_cache_with_finished_streams(self, keep_client_head):
148         keep_client_head.side_effect = [True]
149         thing = {}
150         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
151         with tempfile.NamedTemporaryFile() as cachefile:
152             self.last_cache = arv_put.ResumeCache(cachefile.name)
153         self.last_cache.save(thing)
154         self.last_cache.close()
155         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
156         self.assertNotEqual(None, resume_cache)
157
158     @mock.patch('arvados.keep.KeepClient.head')
159     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
160         keep_client_head.side_effect = Exception('Locator not found')
161         thing = {}
162         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
163         with tempfile.NamedTemporaryFile() as cachefile:
164             self.last_cache = arv_put.ResumeCache(cachefile.name)
165         self.last_cache.save(thing)
166         self.last_cache.close()
167         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
168         self.assertNotEqual(None, resume_cache)
169         self.assertRaises(None, resume_cache.check_cache())
170
171     def test_basic_cache_storage(self):
172         thing = ['test', 'list']
173         with tempfile.NamedTemporaryFile() as cachefile:
174             self.last_cache = arv_put.ResumeCache(cachefile.name)
175         self.last_cache.save(thing)
176         self.assertEqual(thing, self.last_cache.load())
177
178     def test_empty_cache(self):
179         with tempfile.NamedTemporaryFile() as cachefile:
180             cache = arv_put.ResumeCache(cachefile.name)
181         self.assertRaises(ValueError, cache.load)
182
183     def test_cache_persistent(self):
184         thing = ['test', 'list']
185         path = os.path.join(self.make_tmpdir(), 'cache')
186         cache = arv_put.ResumeCache(path)
187         cache.save(thing)
188         cache.close()
189         self.last_cache = arv_put.ResumeCache(path)
190         self.assertEqual(thing, self.last_cache.load())
191
192     def test_multiple_cache_writes(self):
193         thing = ['short', 'list']
194         with tempfile.NamedTemporaryFile() as cachefile:
195             self.last_cache = arv_put.ResumeCache(cachefile.name)
196         # Start writing an object longer than the one we test, to make
197         # sure the cache file gets truncated.
198         self.last_cache.save(['long', 'long', 'list'])
199         self.last_cache.save(thing)
200         self.assertEqual(thing, self.last_cache.load())
201
202     def test_cache_is_locked(self):
203         with tempfile.NamedTemporaryFile() as cachefile:
204             cache = arv_put.ResumeCache(cachefile.name)
205             self.assertRaises(arv_put.ResumeCacheConflict,
206                               arv_put.ResumeCache, cachefile.name)
207
208     def test_cache_stays_locked(self):
209         with tempfile.NamedTemporaryFile() as cachefile:
210             self.last_cache = arv_put.ResumeCache(cachefile.name)
211             path = cachefile.name
212         self.last_cache.save('test')
213         self.assertRaises(arv_put.ResumeCacheConflict,
214                           arv_put.ResumeCache, path)
215
216     def test_destroy_cache(self):
217         cachefile = tempfile.NamedTemporaryFile(delete=False)
218         try:
219             cache = arv_put.ResumeCache(cachefile.name)
220             cache.save('test')
221             cache.destroy()
222             try:
223                 arv_put.ResumeCache(cachefile.name)
224             except arv_put.ResumeCacheConflict:
225                 self.fail("could not load cache after destroying it")
226             self.assertRaises(ValueError, cache.load)
227         finally:
228             if os.path.exists(cachefile.name):
229                 os.unlink(cachefile.name)
230
231     def test_restart_cache(self):
232         path = os.path.join(self.make_tmpdir(), 'cache')
233         cache = arv_put.ResumeCache(path)
234         cache.save('test')
235         cache.restart()
236         self.assertRaises(ValueError, cache.load)
237         self.assertRaises(arv_put.ResumeCacheConflict,
238                           arv_put.ResumeCache, path)
239
240
241 class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
242                                      ArvadosBaseTestCase):
243     def setUp(self):
244         super(ArvadosPutCollectionWriterTest, self).setUp()
245         run_test_server.authorize_with('active')
246         with tempfile.NamedTemporaryFile(delete=False) as cachefile:
247             self.cache = arv_put.ResumeCache(cachefile.name)
248             self.cache_filename = cachefile.name
249
250     def tearDown(self):
251         super(ArvadosPutCollectionWriterTest, self).tearDown()
252         if os.path.exists(self.cache_filename):
253             self.cache.destroy()
254         self.cache.close()
255
256     def test_writer_caches(self):
257         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
258         cwriter.write_file('/dev/null')
259         cwriter.cache_state()
260         self.assertTrue(self.cache.load())
261         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
262
263     def test_writer_works_without_cache(self):
264         cwriter = arv_put.ArvPutCollectionWriter()
265         cwriter.write_file('/dev/null')
266         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
267
268     def test_writer_resumes_from_cache(self):
269         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
270         with self.make_test_file() as testfile:
271             cwriter.write_file(testfile.name, 'test')
272             cwriter.cache_state()
273             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
274                 self.cache)
275             self.assertEqual(
276                 ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
277                 new_writer.manifest_text())
278
279     def test_new_writer_from_stale_cache(self):
280         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
281         with self.make_test_file() as testfile:
282             cwriter.write_file(testfile.name, 'test')
283         new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
284         new_writer.write_file('/dev/null')
285         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
286
287     def test_new_writer_from_empty_cache(self):
288         cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
289         cwriter.write_file('/dev/null')
290         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
291
292     def test_writer_resumable_after_arbitrary_bytes(self):
293         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
294         # These bytes are intentionally not valid UTF-8.
295         with self.make_test_file('\x00\x07\xe2') as testfile:
296             cwriter.write_file(testfile.name, 'test')
297             cwriter.cache_state()
298             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
299                 self.cache)
300         self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
301
302     def make_progress_tester(self):
303         progression = []
304         def record_func(written, expected):
305             progression.append((written, expected))
306         return progression, record_func
307
308     def test_progress_reporting(self):
309         for expect_count in (None, 8):
310             progression, reporter = self.make_progress_tester()
311             cwriter = arv_put.ArvPutCollectionWriter(
312                 reporter=reporter, bytes_expected=expect_count)
313             with self.make_test_file() as testfile:
314                 cwriter.write_file(testfile.name, 'test')
315             cwriter.finish_current_stream()
316             self.assertIn((4, expect_count), progression)
317
318     def test_resume_progress(self):
319         cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
320         with self.make_test_file() as testfile:
321             # Set up a writer with some flushed bytes.
322             cwriter.write_file(testfile.name, 'test')
323             cwriter.finish_current_stream()
324             cwriter.cache_state()
325             new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
326             self.assertEqual(new_writer.bytes_written, 4)
327
328
329 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
330     TEST_SIZE = os.path.getsize(__file__)
331
332     def test_expected_bytes_for_file(self):
333         self.assertEqual(self.TEST_SIZE,
334                           arv_put.expected_bytes_for([__file__]))
335
336     def test_expected_bytes_for_tree(self):
337         tree = self.make_tmpdir()
338         shutil.copyfile(__file__, os.path.join(tree, 'one'))
339         shutil.copyfile(__file__, os.path.join(tree, 'two'))
340         self.assertEqual(self.TEST_SIZE * 2,
341                           arv_put.expected_bytes_for([tree]))
342         self.assertEqual(self.TEST_SIZE * 3,
343                           arv_put.expected_bytes_for([tree, __file__]))
344
345     def test_expected_bytes_for_device(self):
346         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
347         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
348
349
350 class ArvadosPutReportTest(ArvadosBaseTestCase):
351     def test_machine_progress(self):
352         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
353             expect = ": {} written {} total\n".format(
354                 count, -1 if (total is None) else total)
355             self.assertTrue(
356                 arv_put.machine_progress(count, total).endswith(expect))
357
358     def test_known_human_progress(self):
359         for count, total in [(0, 1), (2, 4), (45, 60)]:
360             expect = '{:.1%}'.format(float(count) / total)
361             actual = arv_put.human_progress(count, total)
362             self.assertTrue(actual.startswith('\r'))
363             self.assertIn(expect, actual)
364
365     def test_unknown_human_progress(self):
366         for count in [1, 20, 300, 4000, 50000]:
367             self.assertTrue(re.search(r'\b{}\b'.format(count),
368                                       arv_put.human_progress(count, None)))
369
370
371 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
372     MAIN_SERVER = {}
373     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
374
375     def call_main_with_args(self, args):
376         self.main_stdout = StringIO()
377         self.main_stderr = StringIO()
378         return arv_put.main(args, self.main_stdout, self.main_stderr)
379
380     def call_main_on_test_file(self, args=[]):
381         with self.make_test_file() as testfile:
382             path = testfile.name
383             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
384         self.assertTrue(
385             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
386                                         '098f6bcd4621d373cade4e832627b4f6')),
387             "did not find file stream in Keep store")
388
389     def setUp(self):
390         super(ArvadosPutTest, self).setUp()
391         run_test_server.authorize_with('active')
392         arv_put.api_client = None
393
394     def tearDown(self):
395         for outbuf in ['main_stdout', 'main_stderr']:
396             if hasattr(self, outbuf):
397                 getattr(self, outbuf).close()
398                 delattr(self, outbuf)
399         super(ArvadosPutTest, self).tearDown()
400
401     def test_simple_file_put(self):
402         self.call_main_on_test_file()
403
404     def test_put_with_unwriteable_cache_dir(self):
405         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
406         cachedir = self.make_tmpdir()
407         os.chmod(cachedir, 0o0)
408         arv_put.ResumeCache.CACHE_DIR = cachedir
409         try:
410             self.call_main_on_test_file()
411         finally:
412             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
413             os.chmod(cachedir, 0o700)
414
415     def test_put_with_unwritable_cache_subdir(self):
416         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
417         cachedir = self.make_tmpdir()
418         os.chmod(cachedir, 0o0)
419         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
420         try:
421             self.call_main_on_test_file()
422         finally:
423             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
424             os.chmod(cachedir, 0o700)
425
426     def test_put_block_replication(self):
427         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
428              mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
429             cache_mock.side_effect = ValueError
430             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
431             self.call_main_on_test_file(['--replication', '1'])
432             self.call_main_on_test_file(['--replication', '4'])
433             self.call_main_on_test_file(['--replication', '5'])
434             self.assertEqual(
435                 [x[-1].get('copies') for x in put_mock.call_args_list],
436                 [1, 4, 5])
437
438     def test_normalize(self):
439         testfile1 = self.make_test_file()
440         testfile2 = self.make_test_file()
441         test_paths = [testfile1.name, testfile2.name]
442         # Reverse-sort the paths, so normalization must change their order.
443         test_paths.sort(reverse=True)
444         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
445                                  test_paths)
446         manifest = self.main_stdout.getvalue()
447         # Assert the second file we specified appears first in the manifest.
448         file_indices = [manifest.find(':' + os.path.basename(path))
449                         for path in test_paths]
450         self.assertGreater(*file_indices)
451
452     def test_error_name_without_collection(self):
453         self.assertRaises(SystemExit, self.call_main_with_args,
454                           ['--name', 'test without Collection',
455                            '--stream', '/dev/null'])
456
457     def test_error_when_project_not_found(self):
458         self.assertRaises(SystemExit,
459                           self.call_main_with_args,
460                           ['--project-uuid', self.Z_UUID])
461
462     def test_error_bad_project_uuid(self):
463         self.assertRaises(SystemExit,
464                           self.call_main_with_args,
465                           ['--project-uuid', self.Z_UUID, '--stream'])
466
467     def test_api_error_handling(self):
468         collections_mock = mock.Mock(name='arv.collections()')
469         coll_create_mock = collections_mock().create().execute
470         coll_create_mock.side_effect = arvados.errors.ApiError(
471             fake_httplib2_response(403), '{}')
472         arv_put.api_client = arvados.api('v1')
473         arv_put.api_client.collections = collections_mock
474         with self.assertRaises(SystemExit) as exc_test:
475             self.call_main_with_args(['/dev/null'])
476         self.assertLess(0, exc_test.exception.args[0])
477         self.assertLess(0, coll_create_mock.call_count)
478         self.assertEqual("", self.main_stdout.getvalue())
479
480
481 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
482                             ArvadosBaseTestCase):
483     def _getKeepServerConfig():
484         for config_file, mandatory in [
485                 ['application.yml', False], ['application.default.yml', True]]:
486             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
487                                 "api", "config", config_file)
488             if not mandatory and not os.path.exists(path):
489                 continue
490             with open(path) as f:
491                 rails_config = yaml.load(f.read())
492                 for config_section in ['test', 'common']:
493                     try:
494                         key = rails_config[config_section]["blob_signing_key"]
495                     except (KeyError, TypeError):
496                         pass
497                     else:
498                         return {'blob_signing_key': key,
499                                 'enforce_permissions': True}
500         return {'blog_signing_key': None, 'enforce_permissions': False}
501
502     MAIN_SERVER = {}
503     KEEP_SERVER = _getKeepServerConfig()
504     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
505
506     @classmethod
507     def setUpClass(cls):
508         super(ArvPutIntegrationTest, cls).setUpClass()
509         cls.ENVIRON = os.environ.copy()
510         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
511
512     def setUp(self):
513         super(ArvPutIntegrationTest, self).setUp()
514         arv_put.api_client = None
515
516     def authorize_with(self, token_name):
517         run_test_server.authorize_with(token_name)
518         for v in ["ARVADOS_API_HOST",
519                   "ARVADOS_API_HOST_INSECURE",
520                   "ARVADOS_API_TOKEN"]:
521             self.ENVIRON[v] = arvados.config.settings()[v]
522         arv_put.api_client = arvados.api('v1')
523
524     def current_user(self):
525         return arv_put.api_client.users().current().execute()
526
527     def test_check_real_project_found(self):
528         self.authorize_with('active')
529         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
530                         "did not correctly find test fixture project")
531
532     def test_check_error_finding_nonexistent_uuid(self):
533         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
534         self.authorize_with('active')
535         try:
536             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
537                                                   0)
538         except ValueError as error:
539             self.assertIn(BAD_UUID, error.message)
540         else:
541             self.assertFalse(result, "incorrectly found nonexistent project")
542
543     def test_check_error_finding_nonexistent_project(self):
544         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
545         self.authorize_with('active')
546         with self.assertRaises(apiclient.errors.HttpError):
547             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
548                                                   0)
549
550     def test_short_put_from_stdin(self):
551         # Have to run this as an integration test since arv-put can't
552         # read from the tests' stdin.
553         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
554         # case, because the /proc entry is already gone by the time it tries.
555         pipe = subprocess.Popen(
556             [sys.executable, arv_put.__file__, '--stream'],
557             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
558             stderr=subprocess.STDOUT, env=self.ENVIRON)
559         pipe.stdin.write('stdin test\n')
560         pipe.stdin.close()
561         deadline = time.time() + 5
562         while (pipe.poll() is None) and (time.time() < deadline):
563             time.sleep(.1)
564         returncode = pipe.poll()
565         if returncode is None:
566             pipe.terminate()
567             self.fail("arv-put did not PUT from stdin within 5 seconds")
568         elif returncode != 0:
569             sys.stdout.write(pipe.stdout.read())
570             self.fail("arv-put returned exit code {}".format(returncode))
571         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
572
573     def test_ArvPutSignedManifest(self):
574         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
575         # the newly created manifest from the API server, testing to confirm
576         # that the block locators in the returned manifest are signed.
577         self.authorize_with('active')
578
579         # Before doing anything, demonstrate that the collection
580         # we're about to create is not present in our test fixture.
581         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
582         with self.assertRaises(apiclient.errors.HttpError):
583             notfound = arv_put.api_client.collections().get(
584                 uuid=manifest_uuid).execute()
585
586         datadir = self.make_tmpdir()
587         with open(os.path.join(datadir, "foo"), "w") as f:
588             f.write("The quick brown fox jumped over the lazy dog")
589         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
590                              stdout=subprocess.PIPE, env=self.ENVIRON)
591         (arvout, arverr) = p.communicate()
592         self.assertEqual(arverr, None)
593         self.assertEqual(p.returncode, 0)
594
595         # The manifest text stored in the API server under the same
596         # manifest UUID must use signed locators.
597         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
598         self.assertRegexpMatches(
599             c['manifest_text'],
600             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
601
602         os.remove(os.path.join(datadir, "foo"))
603         os.rmdir(datadir)
604
605     def run_and_find_collection(self, text, extra_args=[]):
606         self.authorize_with('active')
607         pipe = subprocess.Popen(
608             [sys.executable, arv_put.__file__] + extra_args,
609             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
610             stderr=subprocess.PIPE, env=self.ENVIRON)
611         stdout, stderr = pipe.communicate(text)
612         search_key = ('portable_data_hash'
613                       if '--portable-data-hash' in extra_args else 'uuid')
614         collection_list = arvados.api('v1').collections().list(
615             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
616         self.assertEqual(1, len(collection_list))
617         return collection_list[0]
618
619     def test_put_collection_with_high_redundancy(self):
620         # Write empty data: we're not testing CollectionWriter, just
621         # making sure collections.create tells the API server what our
622         # desired replication level is.
623         collection = self.run_and_find_collection("", ['--replication', '4'])
624         self.assertEqual(4, collection['replication_desired'])
625
626     def test_put_collection_with_default_redundancy(self):
627         collection = self.run_and_find_collection("")
628         self.assertEqual(None, collection['replication_desired'])
629
630     def test_put_collection_with_unnamed_project_link(self):
631         link = self.run_and_find_collection(
632             "Test unnamed collection",
633             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
634         username = pwd.getpwuid(os.getuid()).pw_name
635         self.assertRegexpMatches(
636             link['name'],
637             r'^Saved at .* by {}@'.format(re.escape(username)))
638
639     def test_put_collection_with_name_and_no_project(self):
640         link_name = 'Test Collection Link in home project'
641         collection = self.run_and_find_collection(
642             "Test named collection in home project",
643             ['--portable-data-hash', '--name', link_name])
644         self.assertEqual(link_name, collection['name'])
645         my_user_uuid = self.current_user()['uuid']
646         self.assertEqual(my_user_uuid, collection['owner_uuid'])
647
648     def test_put_collection_with_named_project_link(self):
649         link_name = 'Test auto Collection Link'
650         collection = self.run_and_find_collection("Test named collection",
651                                       ['--portable-data-hash',
652                                        '--name', link_name,
653                                        '--project-uuid', self.PROJECT_UUID])
654         self.assertEqual(link_name, collection['name'])
655
656
657 if __name__ == '__main__':
658     unittest.main()