10383: Merge branch 'master' into 10383-arv-put-incremental-upload
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import mock
6 import os
7 import pwd
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import tempfile
13 import time
14 import unittest
15 import yaml
16 import threading
17 import hashlib
18 import random
19
20 from cStringIO import StringIO
21
22 import arvados
23 import arvados.commands.put as arv_put
24 import arvados_testutil as tutil
25
26 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
27 import run_test_server
28
29 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
30     CACHE_ARGSET = [
31         [],
32         ['/dev/null'],
33         ['/dev/null', '--filename', 'empty'],
34         ['/tmp'],
35         ['/tmp', '--max-manifest-depth', '0'],
36         ['/tmp', '--max-manifest-depth', '1']
37         ]
38
39     def tearDown(self):
40         super(ArvadosPutResumeCacheTest, self).tearDown()
41         try:
42             self.last_cache.destroy()
43         except AttributeError:
44             pass
45
46     def cache_path_from_arglist(self, arglist):
47         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
48
49     def test_cache_names_stable(self):
50         for argset in self.CACHE_ARGSET:
51             self.assertEqual(self.cache_path_from_arglist(argset),
52                               self.cache_path_from_arglist(argset),
53                               "cache name changed for {}".format(argset))
54
55     def test_cache_names_unique(self):
56         results = []
57         for argset in self.CACHE_ARGSET:
58             path = self.cache_path_from_arglist(argset)
59             self.assertNotIn(path, results)
60             results.append(path)
61
62     def test_cache_names_simple(self):
63         # The goal here is to make sure the filename doesn't use characters
64         # reserved by the filesystem.  Feel free to adjust this regexp as
65         # long as it still does that.
66         bad_chars = re.compile(r'[^-\.\w]')
67         for argset in self.CACHE_ARGSET:
68             path = self.cache_path_from_arglist(argset)
69             self.assertFalse(bad_chars.search(os.path.basename(path)),
70                              "path too exotic: {}".format(path))
71
72     def test_cache_names_ignore_argument_order(self):
73         self.assertEqual(
74             self.cache_path_from_arglist(['a', 'b', 'c']),
75             self.cache_path_from_arglist(['c', 'a', 'b']))
76         self.assertEqual(
77             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
78             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
79
80     def test_cache_names_differ_for_similar_paths(self):
81         # This test needs names at / that don't exist on the real filesystem.
82         self.assertNotEqual(
83             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
84             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
85
86     def test_cache_names_ignore_irrelevant_arguments(self):
87         # Workaround: parse_arguments bails on --filename with a directory.
88         path1 = self.cache_path_from_arglist(['/tmp'])
89         args = arv_put.parse_arguments(['/tmp'])
90         args.filename = 'tmp'
91         path2 = arv_put.ResumeCache.make_path(args)
92         self.assertEqual(path1, path2,
93                          "cache path considered --filename for directory")
94         self.assertEqual(
95             self.cache_path_from_arglist(['-']),
96             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
97             "cache path considered --max-manifest-depth for file")
98
99     def test_cache_names_treat_negative_manifest_depths_identically(self):
100         base_args = ['/tmp', '--max-manifest-depth']
101         self.assertEqual(
102             self.cache_path_from_arglist(base_args + ['-1']),
103             self.cache_path_from_arglist(base_args + ['-2']))
104
105     def test_cache_names_treat_stdin_consistently(self):
106         self.assertEqual(
107             self.cache_path_from_arglist(['-', '--filename', 'test']),
108             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
109
110     def test_cache_names_identical_for_synonymous_names(self):
111         self.assertEqual(
112             self.cache_path_from_arglist(['.']),
113             self.cache_path_from_arglist([os.path.realpath('.')]))
114         testdir = self.make_tmpdir()
115         looplink = os.path.join(testdir, 'loop')
116         os.symlink(testdir, looplink)
117         self.assertEqual(
118             self.cache_path_from_arglist([testdir]),
119             self.cache_path_from_arglist([looplink]))
120
121     def test_cache_names_different_by_api_host(self):
122         config = arvados.config.settings()
123         orig_host = config.get('ARVADOS_API_HOST')
124         try:
125             name1 = self.cache_path_from_arglist(['.'])
126             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
127             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
128         finally:
129             if orig_host is None:
130                 del config['ARVADOS_API_HOST']
131             else:
132                 config['ARVADOS_API_HOST'] = orig_host
133
134     @mock.patch('arvados.keep.KeepClient.head')
135     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
136         keep_client_head.side_effect = [True]
137         thing = {}
138         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
139         with tempfile.NamedTemporaryFile() as cachefile:
140             self.last_cache = arv_put.ResumeCache(cachefile.name)
141         self.last_cache.save(thing)
142         self.last_cache.close()
143         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
144         self.assertNotEqual(None, resume_cache)
145
146     @mock.patch('arvados.keep.KeepClient.head')
147     def test_resume_cache_with_finished_streams(self, keep_client_head):
148         keep_client_head.side_effect = [True]
149         thing = {}
150         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
151         with tempfile.NamedTemporaryFile() as cachefile:
152             self.last_cache = arv_put.ResumeCache(cachefile.name)
153         self.last_cache.save(thing)
154         self.last_cache.close()
155         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
156         self.assertNotEqual(None, resume_cache)
157
158     @mock.patch('arvados.keep.KeepClient.head')
159     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
160         keep_client_head.side_effect = Exception('Locator not found')
161         thing = {}
162         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
163         with tempfile.NamedTemporaryFile() as cachefile:
164             self.last_cache = arv_put.ResumeCache(cachefile.name)
165         self.last_cache.save(thing)
166         self.last_cache.close()
167         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
168         self.assertNotEqual(None, resume_cache)
169         self.assertRaises(None, resume_cache.check_cache())
170
171     def test_basic_cache_storage(self):
172         thing = ['test', 'list']
173         with tempfile.NamedTemporaryFile() as cachefile:
174             self.last_cache = arv_put.ResumeCache(cachefile.name)
175         self.last_cache.save(thing)
176         self.assertEqual(thing, self.last_cache.load())
177
178     def test_empty_cache(self):
179         with tempfile.NamedTemporaryFile() as cachefile:
180             cache = arv_put.ResumeCache(cachefile.name)
181         self.assertRaises(ValueError, cache.load)
182
183     def test_cache_persistent(self):
184         thing = ['test', 'list']
185         path = os.path.join(self.make_tmpdir(), 'cache')
186         cache = arv_put.ResumeCache(path)
187         cache.save(thing)
188         cache.close()
189         self.last_cache = arv_put.ResumeCache(path)
190         self.assertEqual(thing, self.last_cache.load())
191
192     def test_multiple_cache_writes(self):
193         thing = ['short', 'list']
194         with tempfile.NamedTemporaryFile() as cachefile:
195             self.last_cache = arv_put.ResumeCache(cachefile.name)
196         # Start writing an object longer than the one we test, to make
197         # sure the cache file gets truncated.
198         self.last_cache.save(['long', 'long', 'list'])
199         self.last_cache.save(thing)
200         self.assertEqual(thing, self.last_cache.load())
201
202     def test_cache_is_locked(self):
203         with tempfile.NamedTemporaryFile() as cachefile:
204             cache = arv_put.ResumeCache(cachefile.name)
205             self.assertRaises(arv_put.ResumeCacheConflict,
206                               arv_put.ResumeCache, cachefile.name)
207
208     def test_cache_stays_locked(self):
209         with tempfile.NamedTemporaryFile() as cachefile:
210             self.last_cache = arv_put.ResumeCache(cachefile.name)
211             path = cachefile.name
212         self.last_cache.save('test')
213         self.assertRaises(arv_put.ResumeCacheConflict,
214                           arv_put.ResumeCache, path)
215
216     def test_destroy_cache(self):
217         cachefile = tempfile.NamedTemporaryFile(delete=False)
218         try:
219             cache = arv_put.ResumeCache(cachefile.name)
220             cache.save('test')
221             cache.destroy()
222             try:
223                 arv_put.ResumeCache(cachefile.name)
224             except arv_put.ResumeCacheConflict:
225                 self.fail("could not load cache after destroying it")
226             self.assertRaises(ValueError, cache.load)
227         finally:
228             if os.path.exists(cachefile.name):
229                 os.unlink(cachefile.name)
230
231     def test_restart_cache(self):
232         path = os.path.join(self.make_tmpdir(), 'cache')
233         cache = arv_put.ResumeCache(path)
234         cache.save('test')
235         cache.restart()
236         self.assertRaises(ValueError, cache.load)
237         self.assertRaises(arv_put.ResumeCacheConflict,
238                           arv_put.ResumeCache, path)
239
240
241 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
242                           ArvadosBaseTestCase):
243
244     def setUp(self):
245         super(ArvPutUploadJobTest, self).setUp()
246         run_test_server.authorize_with('active')
247         # Temp files creation
248         self.tempdir = tempfile.mkdtemp()
249         subdir = os.path.join(self.tempdir, 'subdir')
250         os.mkdir(subdir)
251         data = "x" * 1024 # 1 KB
252         for i in range(1, 5):
253             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
254                 f.write(data * i)
255         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
256             f.write(data * 5)
257         # Large temp file for resume test
258         _, self.large_file_name = tempfile.mkstemp()
259         fileobj = open(self.large_file_name, 'w')
260         # Make sure to write just a little more than one block
261         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
262             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
263             fileobj.write(data)
264         fileobj.close()
265         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
266
267     def tearDown(self):
268         super(ArvPutUploadJobTest, self).tearDown()
269         shutil.rmtree(self.tempdir)
270         os.unlink(self.large_file_name)
271
272     def test_writer_works_without_cache(self):
273         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
274         cwriter.start(save_collection=False)
275         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
276
277     def test_writer_works_with_cache(self):
278         with tempfile.NamedTemporaryFile() as f:
279             f.write('foo')
280             f.flush()
281             cwriter = arv_put.ArvPutUploadJob([f.name])
282             cwriter.start(save_collection=False)
283             self.assertEqual(3, cwriter.bytes_written)
284             # Don't destroy the cache, and start another upload
285             cwriter_new = arv_put.ArvPutUploadJob([f.name])
286             cwriter_new.start(save_collection=False)
287             cwriter_new.destroy_cache()
288             self.assertEqual(0, cwriter_new.bytes_written)
289
290     def make_progress_tester(self):
291         progression = []
292         def record_func(written, expected):
293             progression.append((written, expected))
294         return progression, record_func
295
296     def test_progress_reporting(self):
297         with tempfile.NamedTemporaryFile() as f:
298             f.write('foo')
299             f.flush()
300             for expect_count in (None, 8):
301                 progression, reporter = self.make_progress_tester()
302                 cwriter = arv_put.ArvPutUploadJob([f.name],
303                     reporter=reporter, bytes_expected=expect_count)
304                 cwriter.start(save_collection=False)
305                 cwriter.destroy_cache()
306                 self.assertIn((3, expect_count), progression)
307
308     def test_writer_upload_directory(self):
309         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
310         cwriter.start(save_collection=False)
311         cwriter.destroy_cache()
312         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
313
314     def test_resume_large_file_upload(self):
315         def wrapped_write(*args, **kwargs):
316             data = args[1]
317             # Exit only on last block
318             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
319                 raise SystemExit("Simulated error")
320             return self.arvfile_write(*args, **kwargs)
321
322         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
323                         autospec=True) as mocked_write:
324             mocked_write.side_effect = wrapped_write
325             writer = arv_put.ArvPutUploadJob([self.large_file_name],
326                                              replication_desired=1)
327             with self.assertRaises(SystemExit):
328                 writer.start(save_collection=False)
329                 self.assertLess(writer.bytes_written,
330                                 os.path.getsize(self.large_file_name))
331         # Retry the upload
332         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
333                                           replication_desired=1)
334         writer2.start(save_collection=False)
335         self.assertEqual(writer.bytes_written + writer2.bytes_written,
336                          os.path.getsize(self.large_file_name))
337         writer2.destroy_cache()
338
339
340 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
341     TEST_SIZE = os.path.getsize(__file__)
342
343     def test_expected_bytes_for_file(self):
344         self.assertEqual(self.TEST_SIZE,
345                           arv_put.expected_bytes_for([__file__]))
346
347     def test_expected_bytes_for_tree(self):
348         tree = self.make_tmpdir()
349         shutil.copyfile(__file__, os.path.join(tree, 'one'))
350         shutil.copyfile(__file__, os.path.join(tree, 'two'))
351         self.assertEqual(self.TEST_SIZE * 2,
352                           arv_put.expected_bytes_for([tree]))
353         self.assertEqual(self.TEST_SIZE * 3,
354                           arv_put.expected_bytes_for([tree, __file__]))
355
356     def test_expected_bytes_for_device(self):
357         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
358         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
359
360
361 class ArvadosPutReportTest(ArvadosBaseTestCase):
362     def test_machine_progress(self):
363         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
364             expect = ": {} written {} total\n".format(
365                 count, -1 if (total is None) else total)
366             self.assertTrue(
367                 arv_put.machine_progress(count, total).endswith(expect))
368
369     def test_known_human_progress(self):
370         for count, total in [(0, 1), (2, 4), (45, 60)]:
371             expect = '{:.1%}'.format(float(count) / total)
372             actual = arv_put.human_progress(count, total)
373             self.assertTrue(actual.startswith('\r'))
374             self.assertIn(expect, actual)
375
376     def test_unknown_human_progress(self):
377         for count in [1, 20, 300, 4000, 50000]:
378             self.assertTrue(re.search(r'\b{}\b'.format(count),
379                                       arv_put.human_progress(count, None)))
380
381
382 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
383     MAIN_SERVER = {}
384     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
385
386     def call_main_with_args(self, args):
387         self.main_stdout = StringIO()
388         self.main_stderr = StringIO()
389         return arv_put.main(args, self.main_stdout, self.main_stderr)
390
391     def call_main_on_test_file(self, args=[]):
392         with self.make_test_file() as testfile:
393             path = testfile.name
394             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
395         self.assertTrue(
396             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
397                                         '098f6bcd4621d373cade4e832627b4f6')),
398             "did not find file stream in Keep store")
399
400     def setUp(self):
401         super(ArvadosPutTest, self).setUp()
402         run_test_server.authorize_with('active')
403         arv_put.api_client = None
404
405     def tearDown(self):
406         for outbuf in ['main_stdout', 'main_stderr']:
407             if hasattr(self, outbuf):
408                 getattr(self, outbuf).close()
409                 delattr(self, outbuf)
410         super(ArvadosPutTest, self).tearDown()
411
412     def test_simple_file_put(self):
413         self.call_main_on_test_file()
414
415     def test_put_with_unwriteable_cache_dir(self):
416         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
417         cachedir = self.make_tmpdir()
418         os.chmod(cachedir, 0o0)
419         arv_put.ResumeCache.CACHE_DIR = cachedir
420         try:
421             self.call_main_on_test_file()
422         finally:
423             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
424             os.chmod(cachedir, 0o700)
425
426     def test_put_with_unwritable_cache_subdir(self):
427         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
428         cachedir = self.make_tmpdir()
429         os.chmod(cachedir, 0o0)
430         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
431         try:
432             self.call_main_on_test_file()
433         finally:
434             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
435             os.chmod(cachedir, 0o700)
436
437     def test_put_block_replication(self):
438         self.call_main_on_test_file()
439         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
440             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
441             self.call_main_on_test_file(['--replication', '1'])
442             self.call_main_on_test_file(['--replication', '4'])
443             self.call_main_on_test_file(['--replication', '5'])
444             self.assertEqual(
445                 [x[-1].get('copies') for x in put_mock.call_args_list],
446                 [1, 4, 5])
447
448     def test_normalize(self):
449         testfile1 = self.make_test_file()
450         testfile2 = self.make_test_file()
451         test_paths = [testfile1.name, testfile2.name]
452         # Reverse-sort the paths, so normalization must change their order.
453         test_paths.sort(reverse=True)
454         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
455                                  test_paths)
456         manifest = self.main_stdout.getvalue()
457         # Assert the second file we specified appears first in the manifest.
458         file_indices = [manifest.find(':' + os.path.basename(path))
459                         for path in test_paths]
460         self.assertGreater(*file_indices)
461
462     def test_error_name_without_collection(self):
463         self.assertRaises(SystemExit, self.call_main_with_args,
464                           ['--name', 'test without Collection',
465                            '--stream', '/dev/null'])
466
467     def test_error_when_project_not_found(self):
468         self.assertRaises(SystemExit,
469                           self.call_main_with_args,
470                           ['--project-uuid', self.Z_UUID])
471
472     def test_error_bad_project_uuid(self):
473         self.assertRaises(SystemExit,
474                           self.call_main_with_args,
475                           ['--project-uuid', self.Z_UUID, '--stream'])
476
477     def test_api_error_handling(self):
478         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
479         coll_save_mock.side_effect = arvados.errors.ApiError(
480             fake_httplib2_response(403), '{}')
481         with mock.patch('arvados.collection.Collection.save_new',
482                         new=coll_save_mock):
483             with self.assertRaises(SystemExit) as exc_test:
484                 self.call_main_with_args(['/dev/null'])
485             self.assertLess(0, exc_test.exception.args[0])
486             self.assertLess(0, coll_save_mock.call_count)
487             self.assertEqual("", self.main_stdout.getvalue())
488
489
490 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
491                             ArvadosBaseTestCase):
492     def _getKeepServerConfig():
493         for config_file, mandatory in [
494                 ['application.yml', False], ['application.default.yml', True]]:
495             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
496                                 "api", "config", config_file)
497             if not mandatory and not os.path.exists(path):
498                 continue
499             with open(path) as f:
500                 rails_config = yaml.load(f.read())
501                 for config_section in ['test', 'common']:
502                     try:
503                         key = rails_config[config_section]["blob_signing_key"]
504                     except (KeyError, TypeError):
505                         pass
506                     else:
507                         return {'blob_signing_key': key,
508                                 'enforce_permissions': True}
509         return {'blog_signing_key': None, 'enforce_permissions': False}
510
511     MAIN_SERVER = {}
512     KEEP_SERVER = _getKeepServerConfig()
513     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
514
515     @classmethod
516     def setUpClass(cls):
517         super(ArvPutIntegrationTest, cls).setUpClass()
518         cls.ENVIRON = os.environ.copy()
519         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
520
521     def setUp(self):
522         super(ArvPutIntegrationTest, self).setUp()
523         arv_put.api_client = None
524
525     def authorize_with(self, token_name):
526         run_test_server.authorize_with(token_name)
527         for v in ["ARVADOS_API_HOST",
528                   "ARVADOS_API_HOST_INSECURE",
529                   "ARVADOS_API_TOKEN"]:
530             self.ENVIRON[v] = arvados.config.settings()[v]
531         arv_put.api_client = arvados.api('v1')
532
533     def current_user(self):
534         return arv_put.api_client.users().current().execute()
535
536     def test_check_real_project_found(self):
537         self.authorize_with('active')
538         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
539                         "did not correctly find test fixture project")
540
541     def test_check_error_finding_nonexistent_uuid(self):
542         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
543         self.authorize_with('active')
544         try:
545             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
546                                                   0)
547         except ValueError as error:
548             self.assertIn(BAD_UUID, error.message)
549         else:
550             self.assertFalse(result, "incorrectly found nonexistent project")
551
552     def test_check_error_finding_nonexistent_project(self):
553         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
554         self.authorize_with('active')
555         with self.assertRaises(apiclient.errors.HttpError):
556             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
557                                                   0)
558
559     def test_short_put_from_stdin(self):
560         # Have to run this as an integration test since arv-put can't
561         # read from the tests' stdin.
562         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
563         # case, because the /proc entry is already gone by the time it tries.
564         pipe = subprocess.Popen(
565             [sys.executable, arv_put.__file__, '--stream'],
566             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
567             stderr=subprocess.STDOUT, env=self.ENVIRON)
568         pipe.stdin.write('stdin test\n')
569         pipe.stdin.close()
570         deadline = time.time() + 5
571         while (pipe.poll() is None) and (time.time() < deadline):
572             time.sleep(.1)
573         returncode = pipe.poll()
574         if returncode is None:
575             pipe.terminate()
576             self.fail("arv-put did not PUT from stdin within 5 seconds")
577         elif returncode != 0:
578             sys.stdout.write(pipe.stdout.read())
579             self.fail("arv-put returned exit code {}".format(returncode))
580         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
581
582     def test_ArvPutSignedManifest(self):
583         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
584         # the newly created manifest from the API server, testing to confirm
585         # that the block locators in the returned manifest are signed.
586         self.authorize_with('active')
587
588         # Before doing anything, demonstrate that the collection
589         # we're about to create is not present in our test fixture.
590         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
591         with self.assertRaises(apiclient.errors.HttpError):
592             notfound = arv_put.api_client.collections().get(
593                 uuid=manifest_uuid).execute()
594
595         datadir = self.make_tmpdir()
596         with open(os.path.join(datadir, "foo"), "w") as f:
597             f.write("The quick brown fox jumped over the lazy dog")
598         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
599                              stdout=subprocess.PIPE, env=self.ENVIRON)
600         (arvout, arverr) = p.communicate()
601         self.assertEqual(arverr, None)
602         self.assertEqual(p.returncode, 0)
603
604         # The manifest text stored in the API server under the same
605         # manifest UUID must use signed locators.
606         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
607         self.assertRegexpMatches(
608             c['manifest_text'],
609             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
610
611         os.remove(os.path.join(datadir, "foo"))
612         os.rmdir(datadir)
613
614     def run_and_find_collection(self, text, extra_args=[]):
615         self.authorize_with('active')
616         pipe = subprocess.Popen(
617             [sys.executable, arv_put.__file__] + extra_args,
618             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
619             stderr=subprocess.PIPE, env=self.ENVIRON)
620         stdout, stderr = pipe.communicate(text)
621         search_key = ('portable_data_hash'
622                       if '--portable-data-hash' in extra_args else 'uuid')
623         collection_list = arvados.api('v1').collections().list(
624             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
625         self.assertEqual(1, len(collection_list))
626         return collection_list[0]
627
628     def test_put_collection_with_later_update(self):
629         tmpdir = self.make_tmpdir()
630         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
631             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
632         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
633         self.assertNotEqual(None, col['uuid'])
634         # Add a new file to the directory
635         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
636             f.write('The quick brown fox jumped over the lazy dog')
637         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
638         self.assertEqual(col['uuid'], updated_col['uuid'])
639         # Get the manifest and check that the new file is being included
640         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
641         self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
642
643     def test_put_collection_with_high_redundancy(self):
644         # Write empty data: we're not testing CollectionWriter, just
645         # making sure collections.create tells the API server what our
646         # desired replication level is.
647         collection = self.run_and_find_collection("", ['--replication', '4'])
648         self.assertEqual(4, collection['replication_desired'])
649
650     def test_put_collection_with_default_redundancy(self):
651         collection = self.run_and_find_collection("")
652         self.assertEqual(None, collection['replication_desired'])
653
654     def test_put_collection_with_unnamed_project_link(self):
655         link = self.run_and_find_collection(
656             "Test unnamed collection",
657             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
658         username = pwd.getpwuid(os.getuid()).pw_name
659         self.assertRegexpMatches(
660             link['name'],
661             r'^Saved at .* by {}@'.format(re.escape(username)))
662
663     def test_put_collection_with_name_and_no_project(self):
664         link_name = 'Test Collection Link in home project'
665         collection = self.run_and_find_collection(
666             "Test named collection in home project",
667             ['--portable-data-hash', '--name', link_name])
668         self.assertEqual(link_name, collection['name'])
669         my_user_uuid = self.current_user()['uuid']
670         self.assertEqual(my_user_uuid, collection['owner_uuid'])
671
672     def test_put_collection_with_named_project_link(self):
673         link_name = 'Test auto Collection Link'
674         collection = self.run_and_find_collection("Test named collection",
675                                       ['--portable-data-hash',
676                                        '--name', link_name,
677                                        '--project-uuid', self.PROJECT_UUID])
678         self.assertEqual(link_name, collection['name'])
679
680
681 if __name__ == '__main__':
682     unittest.main()