10383: Refactored the file upload decision code so that it first skims through
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import mock
6 import os
7 import pwd
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import tempfile
13 import time
14 import unittest
15 import yaml
16 import threading
17 import hashlib
18 import random
19
20 from cStringIO import StringIO
21
22 import arvados
23 import arvados.commands.put as arv_put
24 import arvados_testutil as tutil
25
26 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
27 import run_test_server
28
29 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
30     CACHE_ARGSET = [
31         [],
32         ['/dev/null'],
33         ['/dev/null', '--filename', 'empty'],
34         ['/tmp']
35         ]
36
37     def tearDown(self):
38         super(ArvadosPutResumeCacheTest, self).tearDown()
39         try:
40             self.last_cache.destroy()
41         except AttributeError:
42             pass
43
44     def cache_path_from_arglist(self, arglist):
45         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
46
47     def test_cache_names_stable(self):
48         for argset in self.CACHE_ARGSET:
49             self.assertEqual(self.cache_path_from_arglist(argset),
50                               self.cache_path_from_arglist(argset),
51                               "cache name changed for {}".format(argset))
52
53     def test_cache_names_unique(self):
54         results = []
55         for argset in self.CACHE_ARGSET:
56             path = self.cache_path_from_arglist(argset)
57             self.assertNotIn(path, results)
58             results.append(path)
59
60     def test_cache_names_simple(self):
61         # The goal here is to make sure the filename doesn't use characters
62         # reserved by the filesystem.  Feel free to adjust this regexp as
63         # long as it still does that.
64         bad_chars = re.compile(r'[^-\.\w]')
65         for argset in self.CACHE_ARGSET:
66             path = self.cache_path_from_arglist(argset)
67             self.assertFalse(bad_chars.search(os.path.basename(path)),
68                              "path too exotic: {}".format(path))
69
70     def test_cache_names_ignore_argument_order(self):
71         self.assertEqual(
72             self.cache_path_from_arglist(['a', 'b', 'c']),
73             self.cache_path_from_arglist(['c', 'a', 'b']))
74         self.assertEqual(
75             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
76             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
77
78     def test_cache_names_differ_for_similar_paths(self):
79         # This test needs names at / that don't exist on the real filesystem.
80         self.assertNotEqual(
81             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
82             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
83
84     def test_cache_names_ignore_irrelevant_arguments(self):
85         # Workaround: parse_arguments bails on --filename with a directory.
86         path1 = self.cache_path_from_arglist(['/tmp'])
87         args = arv_put.parse_arguments(['/tmp'])
88         args.filename = 'tmp'
89         path2 = arv_put.ResumeCache.make_path(args)
90         self.assertEqual(path1, path2,
91                          "cache path considered --filename for directory")
92         self.assertEqual(
93             self.cache_path_from_arglist(['-']),
94             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
95             "cache path considered --max-manifest-depth for file")
96
97     def test_cache_names_treat_negative_manifest_depths_identically(self):
98         base_args = ['/tmp', '--max-manifest-depth']
99         self.assertEqual(
100             self.cache_path_from_arglist(base_args + ['-1']),
101             self.cache_path_from_arglist(base_args + ['-2']))
102
103     def test_cache_names_treat_stdin_consistently(self):
104         self.assertEqual(
105             self.cache_path_from_arglist(['-', '--filename', 'test']),
106             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
107
108     def test_cache_names_identical_for_synonymous_names(self):
109         self.assertEqual(
110             self.cache_path_from_arglist(['.']),
111             self.cache_path_from_arglist([os.path.realpath('.')]))
112         testdir = self.make_tmpdir()
113         looplink = os.path.join(testdir, 'loop')
114         os.symlink(testdir, looplink)
115         self.assertEqual(
116             self.cache_path_from_arglist([testdir]),
117             self.cache_path_from_arglist([looplink]))
118
119     def test_cache_names_different_by_api_host(self):
120         config = arvados.config.settings()
121         orig_host = config.get('ARVADOS_API_HOST')
122         try:
123             name1 = self.cache_path_from_arglist(['.'])
124             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
125             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
126         finally:
127             if orig_host is None:
128                 del config['ARVADOS_API_HOST']
129             else:
130                 config['ARVADOS_API_HOST'] = orig_host
131
132     @mock.patch('arvados.keep.KeepClient.head')
133     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
134         keep_client_head.side_effect = [True]
135         thing = {}
136         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
137         with tempfile.NamedTemporaryFile() as cachefile:
138             self.last_cache = arv_put.ResumeCache(cachefile.name)
139         self.last_cache.save(thing)
140         self.last_cache.close()
141         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
142         self.assertNotEqual(None, resume_cache)
143
144     @mock.patch('arvados.keep.KeepClient.head')
145     def test_resume_cache_with_finished_streams(self, keep_client_head):
146         keep_client_head.side_effect = [True]
147         thing = {}
148         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
149         with tempfile.NamedTemporaryFile() as cachefile:
150             self.last_cache = arv_put.ResumeCache(cachefile.name)
151         self.last_cache.save(thing)
152         self.last_cache.close()
153         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
154         self.assertNotEqual(None, resume_cache)
155
156     @mock.patch('arvados.keep.KeepClient.head')
157     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
158         keep_client_head.side_effect = Exception('Locator not found')
159         thing = {}
160         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
161         with tempfile.NamedTemporaryFile() as cachefile:
162             self.last_cache = arv_put.ResumeCache(cachefile.name)
163         self.last_cache.save(thing)
164         self.last_cache.close()
165         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
166         self.assertNotEqual(None, resume_cache)
167         self.assertRaises(None, resume_cache.check_cache())
168
169     def test_basic_cache_storage(self):
170         thing = ['test', 'list']
171         with tempfile.NamedTemporaryFile() as cachefile:
172             self.last_cache = arv_put.ResumeCache(cachefile.name)
173         self.last_cache.save(thing)
174         self.assertEqual(thing, self.last_cache.load())
175
176     def test_empty_cache(self):
177         with tempfile.NamedTemporaryFile() as cachefile:
178             cache = arv_put.ResumeCache(cachefile.name)
179         self.assertRaises(ValueError, cache.load)
180
181     def test_cache_persistent(self):
182         thing = ['test', 'list']
183         path = os.path.join(self.make_tmpdir(), 'cache')
184         cache = arv_put.ResumeCache(path)
185         cache.save(thing)
186         cache.close()
187         self.last_cache = arv_put.ResumeCache(path)
188         self.assertEqual(thing, self.last_cache.load())
189
190     def test_multiple_cache_writes(self):
191         thing = ['short', 'list']
192         with tempfile.NamedTemporaryFile() as cachefile:
193             self.last_cache = arv_put.ResumeCache(cachefile.name)
194         # Start writing an object longer than the one we test, to make
195         # sure the cache file gets truncated.
196         self.last_cache.save(['long', 'long', 'list'])
197         self.last_cache.save(thing)
198         self.assertEqual(thing, self.last_cache.load())
199
200     def test_cache_is_locked(self):
201         with tempfile.NamedTemporaryFile() as cachefile:
202             cache = arv_put.ResumeCache(cachefile.name)
203             self.assertRaises(arv_put.ResumeCacheConflict,
204                               arv_put.ResumeCache, cachefile.name)
205
206     def test_cache_stays_locked(self):
207         with tempfile.NamedTemporaryFile() as cachefile:
208             self.last_cache = arv_put.ResumeCache(cachefile.name)
209             path = cachefile.name
210         self.last_cache.save('test')
211         self.assertRaises(arv_put.ResumeCacheConflict,
212                           arv_put.ResumeCache, path)
213
214     def test_destroy_cache(self):
215         cachefile = tempfile.NamedTemporaryFile(delete=False)
216         try:
217             cache = arv_put.ResumeCache(cachefile.name)
218             cache.save('test')
219             cache.destroy()
220             try:
221                 arv_put.ResumeCache(cachefile.name)
222             except arv_put.ResumeCacheConflict:
223                 self.fail("could not load cache after destroying it")
224             self.assertRaises(ValueError, cache.load)
225         finally:
226             if os.path.exists(cachefile.name):
227                 os.unlink(cachefile.name)
228
229     def test_restart_cache(self):
230         path = os.path.join(self.make_tmpdir(), 'cache')
231         cache = arv_put.ResumeCache(path)
232         cache.save('test')
233         cache.restart()
234         self.assertRaises(ValueError, cache.load)
235         self.assertRaises(arv_put.ResumeCacheConflict,
236                           arv_put.ResumeCache, path)
237
238
239 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
240                           ArvadosBaseTestCase):
241
242     def setUp(self):
243         super(ArvPutUploadJobTest, self).setUp()
244         run_test_server.authorize_with('active')
245         # Temp files creation
246         self.tempdir = tempfile.mkdtemp()
247         subdir = os.path.join(self.tempdir, 'subdir')
248         os.mkdir(subdir)
249         data = "x" * 1024 # 1 KB
250         for i in range(1, 5):
251             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
252                 f.write(data * i)
253         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
254             f.write(data * 5)
255         # Large temp file for resume test
256         _, self.large_file_name = tempfile.mkstemp()
257         fileobj = open(self.large_file_name, 'w')
258         # Make sure to write just a little more than one block
259         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
260             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
261             fileobj.write(data)
262         fileobj.close()
263         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
264
265     def tearDown(self):
266         super(ArvPutUploadJobTest, self).tearDown()
267         shutil.rmtree(self.tempdir)
268         os.unlink(self.large_file_name)
269
270     def test_writer_works_without_cache(self):
271         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
272         cwriter.start(save_collection=False)
273         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
274
275     def test_writer_works_with_cache(self):
276         with tempfile.NamedTemporaryFile() as f:
277             f.write('foo')
278             f.flush()
279             cwriter = arv_put.ArvPutUploadJob([f.name])
280             cwriter.start(save_collection=False)
281             self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
282             # Don't destroy the cache, and start another upload
283             cwriter_new = arv_put.ArvPutUploadJob([f.name])
284             cwriter_new.start(save_collection=False)
285             cwriter_new.destroy_cache()
286             self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
287
288     def make_progress_tester(self):
289         progression = []
290         def record_func(written, expected):
291             progression.append((written, expected))
292         return progression, record_func
293
294     def test_progress_reporting(self):
295         with tempfile.NamedTemporaryFile() as f:
296             f.write('foo')
297             f.flush()
298             for expect_count in (None, 8):
299                 progression, reporter = self.make_progress_tester()
300                 cwriter = arv_put.ArvPutUploadJob([f.name],
301                     reporter=reporter, bytes_expected=expect_count)
302                 cwriter.start(save_collection=False)
303                 cwriter.destroy_cache()
304                 self.assertIn((3, expect_count), progression)
305
306     def test_writer_upload_directory(self):
307         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
308         cwriter.start(save_collection=False)
309         cwriter.destroy_cache()
310         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
311
312     def test_resume_large_file_upload(self):
313         def wrapped_write(*args, **kwargs):
314             data = args[1]
315             # Exit only on last block
316             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
317                 raise SystemExit("Simulated error")
318             return self.arvfile_write(*args, **kwargs)
319
320         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
321                         autospec=True) as mocked_write:
322             mocked_write.side_effect = wrapped_write
323             writer = arv_put.ArvPutUploadJob([self.large_file_name],
324                                              replication_desired=1)
325             with self.assertRaises(SystemExit):
326                 writer.start(save_collection=False)
327             self.assertGreater(writer.bytes_written, 0)
328             self.assertLess(writer.bytes_written,
329                             os.path.getsize(self.large_file_name))
330         # Retry the upload
331         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
332                                           replication_desired=1)
333         writer2.start(save_collection=False)
334         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
335                          os.path.getsize(self.large_file_name))
336         writer2.destroy_cache()
337
338
339 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
340     TEST_SIZE = os.path.getsize(__file__)
341
342     def test_expected_bytes_for_file(self):
343         self.assertEqual(self.TEST_SIZE,
344                           arv_put.expected_bytes_for([__file__]))
345
346     def test_expected_bytes_for_tree(self):
347         tree = self.make_tmpdir()
348         shutil.copyfile(__file__, os.path.join(tree, 'one'))
349         shutil.copyfile(__file__, os.path.join(tree, 'two'))
350         self.assertEqual(self.TEST_SIZE * 2,
351                           arv_put.expected_bytes_for([tree]))
352         self.assertEqual(self.TEST_SIZE * 3,
353                           arv_put.expected_bytes_for([tree, __file__]))
354
355     def test_expected_bytes_for_device(self):
356         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
357         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
358
359
360 class ArvadosPutReportTest(ArvadosBaseTestCase):
361     def test_machine_progress(self):
362         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
363             expect = ": {} written {} total\n".format(
364                 count, -1 if (total is None) else total)
365             self.assertTrue(
366                 arv_put.machine_progress(count, total).endswith(expect))
367
368     def test_known_human_progress(self):
369         for count, total in [(0, 1), (2, 4), (45, 60)]:
370             expect = '{:.1%}'.format(float(count) / total)
371             actual = arv_put.human_progress(count, total)
372             self.assertTrue(actual.startswith('\r'))
373             self.assertIn(expect, actual)
374
375     def test_unknown_human_progress(self):
376         for count in [1, 20, 300, 4000, 50000]:
377             self.assertTrue(re.search(r'\b{}\b'.format(count),
378                                       arv_put.human_progress(count, None)))
379
380
381 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
382     MAIN_SERVER = {}
383     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
384
385     def call_main_with_args(self, args):
386         self.main_stdout = StringIO()
387         self.main_stderr = StringIO()
388         return arv_put.main(args, self.main_stdout, self.main_stderr)
389
390     def call_main_on_test_file(self, args=[]):
391         with self.make_test_file() as testfile:
392             path = testfile.name
393             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
394         self.assertTrue(
395             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
396                                         '098f6bcd4621d373cade4e832627b4f6')),
397             "did not find file stream in Keep store")
398
399     def setUp(self):
400         super(ArvadosPutTest, self).setUp()
401         run_test_server.authorize_with('active')
402         arv_put.api_client = None
403
404     def tearDown(self):
405         for outbuf in ['main_stdout', 'main_stderr']:
406             if hasattr(self, outbuf):
407                 getattr(self, outbuf).close()
408                 delattr(self, outbuf)
409         super(ArvadosPutTest, self).tearDown()
410
411     def test_simple_file_put(self):
412         self.call_main_on_test_file()
413
414     def test_put_with_unwriteable_cache_dir(self):
415         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
416         cachedir = self.make_tmpdir()
417         os.chmod(cachedir, 0o0)
418         arv_put.ResumeCache.CACHE_DIR = cachedir
419         try:
420             self.call_main_on_test_file()
421         finally:
422             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
423             os.chmod(cachedir, 0o700)
424
425     def test_put_with_unwritable_cache_subdir(self):
426         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
427         cachedir = self.make_tmpdir()
428         os.chmod(cachedir, 0o0)
429         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
430         try:
431             self.call_main_on_test_file()
432         finally:
433             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
434             os.chmod(cachedir, 0o700)
435
436     def test_put_block_replication(self):
437         self.call_main_on_test_file()
438         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
439             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
440             self.call_main_on_test_file(['--replication', '1'])
441             self.call_main_on_test_file(['--replication', '4'])
442             self.call_main_on_test_file(['--replication', '5'])
443             self.assertEqual(
444                 [x[-1].get('copies') for x in put_mock.call_args_list],
445                 [1, 4, 5])
446
447     def test_normalize(self):
448         testfile1 = self.make_test_file()
449         testfile2 = self.make_test_file()
450         test_paths = [testfile1.name, testfile2.name]
451         # Reverse-sort the paths, so normalization must change their order.
452         test_paths.sort(reverse=True)
453         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
454                                  test_paths)
455         manifest = self.main_stdout.getvalue()
456         # Assert the second file we specified appears first in the manifest.
457         file_indices = [manifest.find(':' + os.path.basename(path))
458                         for path in test_paths]
459         self.assertGreater(*file_indices)
460
461     def test_error_name_without_collection(self):
462         self.assertRaises(SystemExit, self.call_main_with_args,
463                           ['--name', 'test without Collection',
464                            '--stream', '/dev/null'])
465
466     def test_error_when_project_not_found(self):
467         self.assertRaises(SystemExit,
468                           self.call_main_with_args,
469                           ['--project-uuid', self.Z_UUID])
470
471     def test_error_bad_project_uuid(self):
472         self.assertRaises(SystemExit,
473                           self.call_main_with_args,
474                           ['--project-uuid', self.Z_UUID, '--stream'])
475
476     def test_api_error_handling(self):
477         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
478         coll_save_mock.side_effect = arvados.errors.ApiError(
479             fake_httplib2_response(403), '{}')
480         with mock.patch('arvados.collection.Collection.save_new',
481                         new=coll_save_mock):
482             with self.assertRaises(SystemExit) as exc_test:
483                 self.call_main_with_args(['/dev/null'])
484             self.assertLess(0, exc_test.exception.args[0])
485             self.assertLess(0, coll_save_mock.call_count)
486             self.assertEqual("", self.main_stdout.getvalue())
487
488
489 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
490                             ArvadosBaseTestCase):
491     def _getKeepServerConfig():
492         for config_file, mandatory in [
493                 ['application.yml', False], ['application.default.yml', True]]:
494             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
495                                 "api", "config", config_file)
496             if not mandatory and not os.path.exists(path):
497                 continue
498             with open(path) as f:
499                 rails_config = yaml.load(f.read())
500                 for config_section in ['test', 'common']:
501                     try:
502                         key = rails_config[config_section]["blob_signing_key"]
503                     except (KeyError, TypeError):
504                         pass
505                     else:
506                         return {'blob_signing_key': key,
507                                 'enforce_permissions': True}
508         return {'blog_signing_key': None, 'enforce_permissions': False}
509
510     MAIN_SERVER = {}
511     KEEP_SERVER = _getKeepServerConfig()
512     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
513
514     @classmethod
515     def setUpClass(cls):
516         super(ArvPutIntegrationTest, cls).setUpClass()
517         cls.ENVIRON = os.environ.copy()
518         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
519
520     def setUp(self):
521         super(ArvPutIntegrationTest, self).setUp()
522         arv_put.api_client = None
523
524     def authorize_with(self, token_name):
525         run_test_server.authorize_with(token_name)
526         for v in ["ARVADOS_API_HOST",
527                   "ARVADOS_API_HOST_INSECURE",
528                   "ARVADOS_API_TOKEN"]:
529             self.ENVIRON[v] = arvados.config.settings()[v]
530         arv_put.api_client = arvados.api('v1')
531
532     def current_user(self):
533         return arv_put.api_client.users().current().execute()
534
535     def test_check_real_project_found(self):
536         self.authorize_with('active')
537         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
538                         "did not correctly find test fixture project")
539
540     def test_check_error_finding_nonexistent_uuid(self):
541         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
542         self.authorize_with('active')
543         try:
544             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
545                                                   0)
546         except ValueError as error:
547             self.assertIn(BAD_UUID, error.message)
548         else:
549             self.assertFalse(result, "incorrectly found nonexistent project")
550
551     def test_check_error_finding_nonexistent_project(self):
552         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
553         self.authorize_with('active')
554         with self.assertRaises(apiclient.errors.HttpError):
555             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
556                                                   0)
557
558     def test_short_put_from_stdin(self):
559         # Have to run this as an integration test since arv-put can't
560         # read from the tests' stdin.
561         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
562         # case, because the /proc entry is already gone by the time it tries.
563         pipe = subprocess.Popen(
564             [sys.executable, arv_put.__file__, '--stream'],
565             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
566             stderr=subprocess.STDOUT, env=self.ENVIRON)
567         pipe.stdin.write('stdin test\n')
568         pipe.stdin.close()
569         deadline = time.time() + 5
570         while (pipe.poll() is None) and (time.time() < deadline):
571             time.sleep(.1)
572         returncode = pipe.poll()
573         if returncode is None:
574             pipe.terminate()
575             self.fail("arv-put did not PUT from stdin within 5 seconds")
576         elif returncode != 0:
577             sys.stdout.write(pipe.stdout.read())
578             self.fail("arv-put returned exit code {}".format(returncode))
579         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
580
581     def test_ArvPutSignedManifest(self):
582         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
583         # the newly created manifest from the API server, testing to confirm
584         # that the block locators in the returned manifest are signed.
585         self.authorize_with('active')
586
587         # Before doing anything, demonstrate that the collection
588         # we're about to create is not present in our test fixture.
589         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
590         with self.assertRaises(apiclient.errors.HttpError):
591             notfound = arv_put.api_client.collections().get(
592                 uuid=manifest_uuid).execute()
593
594         datadir = self.make_tmpdir()
595         with open(os.path.join(datadir, "foo"), "w") as f:
596             f.write("The quick brown fox jumped over the lazy dog")
597         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
598                              stdout=subprocess.PIPE, env=self.ENVIRON)
599         (arvout, arverr) = p.communicate()
600         self.assertEqual(arverr, None)
601         self.assertEqual(p.returncode, 0)
602
603         # The manifest text stored in the API server under the same
604         # manifest UUID must use signed locators.
605         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
606         self.assertRegexpMatches(
607             c['manifest_text'],
608             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
609
610         os.remove(os.path.join(datadir, "foo"))
611         os.rmdir(datadir)
612
613     def run_and_find_collection(self, text, extra_args=[]):
614         self.authorize_with('active')
615         pipe = subprocess.Popen(
616             [sys.executable, arv_put.__file__] + extra_args,
617             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
618             stderr=subprocess.PIPE, env=self.ENVIRON)
619         stdout, stderr = pipe.communicate(text)
620         search_key = ('portable_data_hash'
621                       if '--portable-data-hash' in extra_args else 'uuid')
622         collection_list = arvados.api('v1').collections().list(
623             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
624         self.assertEqual(1, len(collection_list))
625         return collection_list[0]
626
627     def test_put_collection_with_later_update(self):
628         tmpdir = self.make_tmpdir()
629         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
630             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
631         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
632         self.assertNotEqual(None, col['uuid'])
633         # Add a new file to the directory
634         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
635             f.write('The quick brown fox jumped over the lazy dog')
636         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
637         self.assertEqual(col['uuid'], updated_col['uuid'])
638         # Get the manifest and check that the new file is being included
639         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
640         self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
641
642     def test_put_collection_with_high_redundancy(self):
643         # Write empty data: we're not testing CollectionWriter, just
644         # making sure collections.create tells the API server what our
645         # desired replication level is.
646         collection = self.run_and_find_collection("", ['--replication', '4'])
647         self.assertEqual(4, collection['replication_desired'])
648
649     def test_put_collection_with_default_redundancy(self):
650         collection = self.run_and_find_collection("")
651         self.assertEqual(None, collection['replication_desired'])
652
653     def test_put_collection_with_unnamed_project_link(self):
654         link = self.run_and_find_collection(
655             "Test unnamed collection",
656             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
657         username = pwd.getpwuid(os.getuid()).pw_name
658         self.assertRegexpMatches(
659             link['name'],
660             r'^Saved at .* by {}@'.format(re.escape(username)))
661
662     def test_put_collection_with_name_and_no_project(self):
663         link_name = 'Test Collection Link in home project'
664         collection = self.run_and_find_collection(
665             "Test named collection in home project",
666             ['--portable-data-hash', '--name', link_name])
667         self.assertEqual(link_name, collection['name'])
668         my_user_uuid = self.current_user()['uuid']
669         self.assertEqual(my_user_uuid, collection['owner_uuid'])
670
671     def test_put_collection_with_named_project_link(self):
672         link_name = 'Test auto Collection Link'
673         collection = self.run_and_find_collection("Test named collection",
674                                       ['--portable-data-hash',
675                                        '--name', link_name,
676                                        '--project-uuid', self.PROJECT_UUID])
677         self.assertEqual(link_name, collection['name'])
678
679
680 if __name__ == '__main__':
681     unittest.main()