10587: Added tests to Python commands to check for the --version argument.
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import mock
6 import os
7 import pwd
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import tempfile
13 import time
14 import unittest
15 import yaml
16 import threading
17 import hashlib
18 import random
19 import multiprocessing
20
21 from cStringIO import StringIO
22
23 import arvados
24 import arvados.commands.put as arv_put
25 import arvados_testutil as tutil
26
27 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
28 import run_test_server
29
30 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
31     CACHE_ARGSET = [
32         [],
33         ['/dev/null'],
34         ['/dev/null', '--filename', 'empty'],
35         ['/tmp'],
36         ['/tmp', '--max-manifest-depth', '0'],
37         ['/tmp', '--max-manifest-depth', '1']
38         ]
39
40     def tearDown(self):
41         super(ArvadosPutResumeCacheTest, self).tearDown()
42         try:
43             self.last_cache.destroy()
44         except AttributeError:
45             pass
46
47     def cache_path_from_arglist(self, arglist):
48         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
49
50     def test_cache_names_stable(self):
51         for argset in self.CACHE_ARGSET:
52             self.assertEqual(self.cache_path_from_arglist(argset),
53                               self.cache_path_from_arglist(argset),
54                               "cache name changed for {}".format(argset))
55
56     def test_cache_names_unique(self):
57         results = []
58         for argset in self.CACHE_ARGSET:
59             path = self.cache_path_from_arglist(argset)
60             self.assertNotIn(path, results)
61             results.append(path)
62
63     def test_cache_names_simple(self):
64         # The goal here is to make sure the filename doesn't use characters
65         # reserved by the filesystem.  Feel free to adjust this regexp as
66         # long as it still does that.
67         bad_chars = re.compile(r'[^-\.\w]')
68         for argset in self.CACHE_ARGSET:
69             path = self.cache_path_from_arglist(argset)
70             self.assertFalse(bad_chars.search(os.path.basename(path)),
71                              "path too exotic: {}".format(path))
72
73     def test_cache_names_ignore_argument_order(self):
74         self.assertEqual(
75             self.cache_path_from_arglist(['a', 'b', 'c']),
76             self.cache_path_from_arglist(['c', 'a', 'b']))
77         self.assertEqual(
78             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
79             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
80
81     def test_cache_names_differ_for_similar_paths(self):
82         # This test needs names at / that don't exist on the real filesystem.
83         self.assertNotEqual(
84             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
85             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
86
87     def test_cache_names_ignore_irrelevant_arguments(self):
88         # Workaround: parse_arguments bails on --filename with a directory.
89         path1 = self.cache_path_from_arglist(['/tmp'])
90         args = arv_put.parse_arguments(['/tmp'])
91         args.filename = 'tmp'
92         path2 = arv_put.ResumeCache.make_path(args)
93         self.assertEqual(path1, path2,
94                          "cache path considered --filename for directory")
95         self.assertEqual(
96             self.cache_path_from_arglist(['-']),
97             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
98             "cache path considered --max-manifest-depth for file")
99
100     def test_cache_names_treat_negative_manifest_depths_identically(self):
101         base_args = ['/tmp', '--max-manifest-depth']
102         self.assertEqual(
103             self.cache_path_from_arglist(base_args + ['-1']),
104             self.cache_path_from_arglist(base_args + ['-2']))
105
106     def test_cache_names_treat_stdin_consistently(self):
107         self.assertEqual(
108             self.cache_path_from_arglist(['-', '--filename', 'test']),
109             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
110
111     def test_cache_names_identical_for_synonymous_names(self):
112         self.assertEqual(
113             self.cache_path_from_arglist(['.']),
114             self.cache_path_from_arglist([os.path.realpath('.')]))
115         testdir = self.make_tmpdir()
116         looplink = os.path.join(testdir, 'loop')
117         os.symlink(testdir, looplink)
118         self.assertEqual(
119             self.cache_path_from_arglist([testdir]),
120             self.cache_path_from_arglist([looplink]))
121
122     def test_cache_names_different_by_api_host(self):
123         config = arvados.config.settings()
124         orig_host = config.get('ARVADOS_API_HOST')
125         try:
126             name1 = self.cache_path_from_arglist(['.'])
127             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
128             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
129         finally:
130             if orig_host is None:
131                 del config['ARVADOS_API_HOST']
132             else:
133                 config['ARVADOS_API_HOST'] = orig_host
134
135     @mock.patch('arvados.keep.KeepClient.head')
136     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
137         keep_client_head.side_effect = [True]
138         thing = {}
139         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
140         with tempfile.NamedTemporaryFile() as cachefile:
141             self.last_cache = arv_put.ResumeCache(cachefile.name)
142         self.last_cache.save(thing)
143         self.last_cache.close()
144         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
145         self.assertNotEqual(None, resume_cache)
146
147     @mock.patch('arvados.keep.KeepClient.head')
148     def test_resume_cache_with_finished_streams(self, keep_client_head):
149         keep_client_head.side_effect = [True]
150         thing = {}
151         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
152         with tempfile.NamedTemporaryFile() as cachefile:
153             self.last_cache = arv_put.ResumeCache(cachefile.name)
154         self.last_cache.save(thing)
155         self.last_cache.close()
156         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
157         self.assertNotEqual(None, resume_cache)
158
159     @mock.patch('arvados.keep.KeepClient.head')
160     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
161         keep_client_head.side_effect = Exception('Locator not found')
162         thing = {}
163         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
164         with tempfile.NamedTemporaryFile() as cachefile:
165             self.last_cache = arv_put.ResumeCache(cachefile.name)
166         self.last_cache.save(thing)
167         self.last_cache.close()
168         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
169         self.assertNotEqual(None, resume_cache)
170         self.assertRaises(None, resume_cache.check_cache())
171
172     def test_basic_cache_storage(self):
173         thing = ['test', 'list']
174         with tempfile.NamedTemporaryFile() as cachefile:
175             self.last_cache = arv_put.ResumeCache(cachefile.name)
176         self.last_cache.save(thing)
177         self.assertEqual(thing, self.last_cache.load())
178
179     def test_empty_cache(self):
180         with tempfile.NamedTemporaryFile() as cachefile:
181             cache = arv_put.ResumeCache(cachefile.name)
182         self.assertRaises(ValueError, cache.load)
183
184     def test_cache_persistent(self):
185         thing = ['test', 'list']
186         path = os.path.join(self.make_tmpdir(), 'cache')
187         cache = arv_put.ResumeCache(path)
188         cache.save(thing)
189         cache.close()
190         self.last_cache = arv_put.ResumeCache(path)
191         self.assertEqual(thing, self.last_cache.load())
192
193     def test_multiple_cache_writes(self):
194         thing = ['short', 'list']
195         with tempfile.NamedTemporaryFile() as cachefile:
196             self.last_cache = arv_put.ResumeCache(cachefile.name)
197         # Start writing an object longer than the one we test, to make
198         # sure the cache file gets truncated.
199         self.last_cache.save(['long', 'long', 'list'])
200         self.last_cache.save(thing)
201         self.assertEqual(thing, self.last_cache.load())
202
203     def test_cache_is_locked(self):
204         with tempfile.NamedTemporaryFile() as cachefile:
205             cache = arv_put.ResumeCache(cachefile.name)
206             self.assertRaises(arv_put.ResumeCacheConflict,
207                               arv_put.ResumeCache, cachefile.name)
208
209     def test_cache_stays_locked(self):
210         with tempfile.NamedTemporaryFile() as cachefile:
211             self.last_cache = arv_put.ResumeCache(cachefile.name)
212             path = cachefile.name
213         self.last_cache.save('test')
214         self.assertRaises(arv_put.ResumeCacheConflict,
215                           arv_put.ResumeCache, path)
216
217     def test_destroy_cache(self):
218         cachefile = tempfile.NamedTemporaryFile(delete=False)
219         try:
220             cache = arv_put.ResumeCache(cachefile.name)
221             cache.save('test')
222             cache.destroy()
223             try:
224                 arv_put.ResumeCache(cachefile.name)
225             except arv_put.ResumeCacheConflict:
226                 self.fail("could not load cache after destroying it")
227             self.assertRaises(ValueError, cache.load)
228         finally:
229             if os.path.exists(cachefile.name):
230                 os.unlink(cachefile.name)
231
232     def test_restart_cache(self):
233         path = os.path.join(self.make_tmpdir(), 'cache')
234         cache = arv_put.ResumeCache(path)
235         cache.save('test')
236         cache.restart()
237         self.assertRaises(ValueError, cache.load)
238         self.assertRaises(arv_put.ResumeCacheConflict,
239                           arv_put.ResumeCache, path)
240
241
242 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
243                           ArvadosBaseTestCase):
244     def setUp(self):
245         super(ArvPutUploadJobTest, self).setUp()
246         run_test_server.authorize_with('active')
247         # Temp files creation
248         self.tempdir = tempfile.mkdtemp()
249         subdir = os.path.join(self.tempdir, 'subdir')
250         os.mkdir(subdir)
251         data = "x" * 1024 # 1 KB
252         for i in range(1, 5):
253             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
254                 f.write(data * i)
255         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
256             f.write(data * 5)
257         # Large temp file for resume test
258         _, self.large_file_name = tempfile.mkstemp()
259         fileobj = open(self.large_file_name, 'w')
260         # Make sure to write just a little more than one block
261         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
262             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
263             fileobj.write(data)
264         fileobj.close()
265         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
266
267     def tearDown(self):
268         super(ArvPutUploadJobTest, self).tearDown()
269         shutil.rmtree(self.tempdir)
270         os.unlink(self.large_file_name)
271
272     def test_writer_works_without_cache(self):
273         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
274         cwriter.start()
275         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
276
277     def test_writer_works_with_cache(self):
278         with tempfile.NamedTemporaryFile() as f:
279             f.write('foo')
280             f.flush()
281             cwriter = arv_put.ArvPutUploadJob([f.name])
282             cwriter.start()
283             self.assertEqual(3, cwriter.bytes_written)
284             # Don't destroy the cache, and start another upload
285             cwriter_new = arv_put.ArvPutUploadJob([f.name])
286             cwriter_new.start()
287             cwriter_new.destroy_cache()
288             self.assertEqual(0, cwriter_new.bytes_written)
289
290     def make_progress_tester(self):
291         progression = []
292         def record_func(written, expected):
293             progression.append((written, expected))
294         return progression, record_func
295
296     def test_progress_reporting(self):
297         with tempfile.NamedTemporaryFile() as f:
298             f.write('foo')
299             f.flush()
300             for expect_count in (None, 8):
301                 progression, reporter = self.make_progress_tester()
302                 cwriter = arv_put.ArvPutUploadJob([f.name],
303                     reporter=reporter, bytes_expected=expect_count)
304                 cwriter.start()
305                 cwriter.destroy_cache()
306                 self.assertIn((3, expect_count), progression)
307
308     def test_writer_upload_directory(self):
309         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
310         cwriter.start()
311         cwriter.destroy_cache()
312         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
313
314     def test_resume_large_file_upload(self):
315         def wrapped_write(*args, **kwargs):
316             data = args[1]
317             # Exit only on last block
318             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
319                 raise SystemExit("Simulated error")
320             return self.arvfile_write(*args, **kwargs)
321
322         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
323                         autospec=True) as mocked_write:
324             mocked_write.side_effect = wrapped_write
325             writer = arv_put.ArvPutUploadJob([self.large_file_name],
326                                              replication_desired=1)
327             with self.assertRaises(SystemExit):
328                 writer.start()
329                 self.assertLess(writer.bytes_written,
330                                 os.path.getsize(self.large_file_name))
331         # Retry the upload
332         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
333                                           replication_desired=1)
334         writer2.start()
335         self.assertEqual(writer.bytes_written + writer2.bytes_written,
336                          os.path.getsize(self.large_file_name))
337         writer2.destroy_cache()
338
339
340 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
341     TEST_SIZE = os.path.getsize(__file__)
342
343     def test_expected_bytes_for_file(self):
344         self.assertEqual(self.TEST_SIZE,
345                           arv_put.expected_bytes_for([__file__]))
346
347     def test_expected_bytes_for_tree(self):
348         tree = self.make_tmpdir()
349         shutil.copyfile(__file__, os.path.join(tree, 'one'))
350         shutil.copyfile(__file__, os.path.join(tree, 'two'))
351         self.assertEqual(self.TEST_SIZE * 2,
352                           arv_put.expected_bytes_for([tree]))
353         self.assertEqual(self.TEST_SIZE * 3,
354                           arv_put.expected_bytes_for([tree, __file__]))
355
356     def test_expected_bytes_for_device(self):
357         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
358         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
359
360
361 class ArvadosPutReportTest(ArvadosBaseTestCase):
362     def test_machine_progress(self):
363         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
364             expect = ": {} written {} total\n".format(
365                 count, -1 if (total is None) else total)
366             self.assertTrue(
367                 arv_put.machine_progress(count, total).endswith(expect))
368
369     def test_known_human_progress(self):
370         for count, total in [(0, 1), (2, 4), (45, 60)]:
371             expect = '{:.1%}'.format(float(count) / total)
372             actual = arv_put.human_progress(count, total)
373             self.assertTrue(actual.startswith('\r'))
374             self.assertIn(expect, actual)
375
376     def test_unknown_human_progress(self):
377         for count in [1, 20, 300, 4000, 50000]:
378             self.assertTrue(re.search(r'\b{}\b'.format(count),
379                                       arv_put.human_progress(count, None)))
380
381
382 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
383     MAIN_SERVER = {}
384     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
385
386     def call_main_with_args(self, args):
387         self.main_stdout = StringIO()
388         self.main_stderr = StringIO()
389         return arv_put.main(args, self.main_stdout, self.main_stderr)
390
391     def call_main_on_test_file(self, args=[]):
392         with self.make_test_file() as testfile:
393             path = testfile.name
394             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
395         self.assertTrue(
396             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
397                                         '098f6bcd4621d373cade4e832627b4f6')),
398             "did not find file stream in Keep store")
399
400     def run_main_process(self, args):
401         _, stdout_path = tempfile.mkstemp()
402         _, stderr_path = tempfile.mkstemp()
403         def wrap():
404             def wrapper(*args, **kwargs):
405                 sys.stdout = open(stdout_path, 'w')
406                 sys.stderr = open(stderr_path, 'w')
407                 arv_put.main(*args, **kwargs)
408             return wrapper
409         p = multiprocessing.Process(target=wrap(), args=(args, sys.stdout, sys.stderr))
410         p.start()
411         p.join()
412         out = open(stdout_path, 'r').read()
413         err = open(stderr_path, 'r').read()
414         os.unlink(stdout_path)
415         os.unlink(stderr_path)
416         return p.exitcode, out, err
417
418     def setUp(self):
419         super(ArvadosPutTest, self).setUp()
420         run_test_server.authorize_with('active')
421         arv_put.api_client = None
422
423     def tearDown(self):
424         for outbuf in ['main_stdout', 'main_stderr']:
425             if hasattr(self, outbuf):
426                 getattr(self, outbuf).close()
427                 delattr(self, outbuf)
428         super(ArvadosPutTest, self).tearDown()
429
430     def test_version_argument(self):
431         exitcode, out, err = self.run_main_process(['--version'])
432         self.assertEqual(0, exitcode)
433         self.assertEqual('', out)
434         self.assertNotEqual('', err)
435         self.assertRegexpMatches(err, "[0-9]+\.[0-9]+\.[0-9]+")
436
437     def test_simple_file_put(self):
438         self.call_main_on_test_file()
439
440     def test_put_with_unwriteable_cache_dir(self):
441         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
442         cachedir = self.make_tmpdir()
443         os.chmod(cachedir, 0o0)
444         arv_put.ResumeCache.CACHE_DIR = cachedir
445         try:
446             self.call_main_on_test_file()
447         finally:
448             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
449             os.chmod(cachedir, 0o700)
450
451     def test_put_with_unwritable_cache_subdir(self):
452         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
453         cachedir = self.make_tmpdir()
454         os.chmod(cachedir, 0o0)
455         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
456         try:
457             self.call_main_on_test_file()
458         finally:
459             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
460             os.chmod(cachedir, 0o700)
461
462     def test_put_block_replication(self):
463         self.call_main_on_test_file()
464         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
465             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
466             self.call_main_on_test_file(['--replication', '1'])
467             self.call_main_on_test_file(['--replication', '4'])
468             self.call_main_on_test_file(['--replication', '5'])
469             self.assertEqual(
470                 [x[-1].get('copies') for x in put_mock.call_args_list],
471                 [1, 4, 5])
472
473     def test_normalize(self):
474         testfile1 = self.make_test_file()
475         testfile2 = self.make_test_file()
476         test_paths = [testfile1.name, testfile2.name]
477         # Reverse-sort the paths, so normalization must change their order.
478         test_paths.sort(reverse=True)
479         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
480                                  test_paths)
481         manifest = self.main_stdout.getvalue()
482         # Assert the second file we specified appears first in the manifest.
483         file_indices = [manifest.find(':' + os.path.basename(path))
484                         for path in test_paths]
485         self.assertGreater(*file_indices)
486
487     def test_error_name_without_collection(self):
488         self.assertRaises(SystemExit, self.call_main_with_args,
489                           ['--name', 'test without Collection',
490                            '--stream', '/dev/null'])
491
492     def test_error_when_project_not_found(self):
493         self.assertRaises(SystemExit,
494                           self.call_main_with_args,
495                           ['--project-uuid', self.Z_UUID])
496
497     def test_error_bad_project_uuid(self):
498         self.assertRaises(SystemExit,
499                           self.call_main_with_args,
500                           ['--project-uuid', self.Z_UUID, '--stream'])
501
502     def test_api_error_handling(self):
503         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
504         coll_save_mock.side_effect = arvados.errors.ApiError(
505             fake_httplib2_response(403), '{}')
506         with mock.patch('arvados.collection.Collection.save_new',
507                         new=coll_save_mock):
508             with self.assertRaises(SystemExit) as exc_test:
509                 self.call_main_with_args(['/dev/null'])
510             self.assertLess(0, exc_test.exception.args[0])
511             self.assertLess(0, coll_save_mock.call_count)
512             self.assertEqual("", self.main_stdout.getvalue())
513
514
515 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
516                             ArvadosBaseTestCase):
517     def _getKeepServerConfig():
518         for config_file, mandatory in [
519                 ['application.yml', False], ['application.default.yml', True]]:
520             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
521                                 "api", "config", config_file)
522             if not mandatory and not os.path.exists(path):
523                 continue
524             with open(path) as f:
525                 rails_config = yaml.load(f.read())
526                 for config_section in ['test', 'common']:
527                     try:
528                         key = rails_config[config_section]["blob_signing_key"]
529                     except (KeyError, TypeError):
530                         pass
531                     else:
532                         return {'blob_signing_key': key,
533                                 'enforce_permissions': True}
534         return {'blog_signing_key': None, 'enforce_permissions': False}
535
536     MAIN_SERVER = {}
537     KEEP_SERVER = _getKeepServerConfig()
538     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
539
540     @classmethod
541     def setUpClass(cls):
542         super(ArvPutIntegrationTest, cls).setUpClass()
543         cls.ENVIRON = os.environ.copy()
544         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
545
546     def setUp(self):
547         super(ArvPutIntegrationTest, self).setUp()
548         arv_put.api_client = None
549
550     def authorize_with(self, token_name):
551         run_test_server.authorize_with(token_name)
552         for v in ["ARVADOS_API_HOST",
553                   "ARVADOS_API_HOST_INSECURE",
554                   "ARVADOS_API_TOKEN"]:
555             self.ENVIRON[v] = arvados.config.settings()[v]
556         arv_put.api_client = arvados.api('v1')
557
558     def current_user(self):
559         return arv_put.api_client.users().current().execute()
560
561     def test_check_real_project_found(self):
562         self.authorize_with('active')
563         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
564                         "did not correctly find test fixture project")
565
566     def test_check_error_finding_nonexistent_uuid(self):
567         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
568         self.authorize_with('active')
569         try:
570             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
571                                                   0)
572         except ValueError as error:
573             self.assertIn(BAD_UUID, error.message)
574         else:
575             self.assertFalse(result, "incorrectly found nonexistent project")
576
577     def test_check_error_finding_nonexistent_project(self):
578         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
579         self.authorize_with('active')
580         with self.assertRaises(apiclient.errors.HttpError):
581             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
582                                                   0)
583
584     def test_short_put_from_stdin(self):
585         # Have to run this as an integration test since arv-put can't
586         # read from the tests' stdin.
587         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
588         # case, because the /proc entry is already gone by the time it tries.
589         pipe = subprocess.Popen(
590             [sys.executable, arv_put.__file__, '--stream'],
591             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
592             stderr=subprocess.STDOUT, env=self.ENVIRON)
593         pipe.stdin.write('stdin test\n')
594         pipe.stdin.close()
595         deadline = time.time() + 5
596         while (pipe.poll() is None) and (time.time() < deadline):
597             time.sleep(.1)
598         returncode = pipe.poll()
599         if returncode is None:
600             pipe.terminate()
601             self.fail("arv-put did not PUT from stdin within 5 seconds")
602         elif returncode != 0:
603             sys.stdout.write(pipe.stdout.read())
604             self.fail("arv-put returned exit code {}".format(returncode))
605         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
606
607     def test_ArvPutSignedManifest(self):
608         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
609         # the newly created manifest from the API server, testing to confirm
610         # that the block locators in the returned manifest are signed.
611         self.authorize_with('active')
612
613         # Before doing anything, demonstrate that the collection
614         # we're about to create is not present in our test fixture.
615         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
616         with self.assertRaises(apiclient.errors.HttpError):
617             notfound = arv_put.api_client.collections().get(
618                 uuid=manifest_uuid).execute()
619
620         datadir = self.make_tmpdir()
621         with open(os.path.join(datadir, "foo"), "w") as f:
622             f.write("The quick brown fox jumped over the lazy dog")
623         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
624                              stdout=subprocess.PIPE, env=self.ENVIRON)
625         (arvout, arverr) = p.communicate()
626         self.assertEqual(arverr, None)
627         self.assertEqual(p.returncode, 0)
628
629         # The manifest text stored in the API server under the same
630         # manifest UUID must use signed locators.
631         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
632         self.assertRegexpMatches(
633             c['manifest_text'],
634             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
635
636         os.remove(os.path.join(datadir, "foo"))
637         os.rmdir(datadir)
638
639     def run_and_find_collection(self, text, extra_args=[]):
640         self.authorize_with('active')
641         pipe = subprocess.Popen(
642             [sys.executable, arv_put.__file__] + extra_args,
643             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
644             stderr=subprocess.PIPE, env=self.ENVIRON)
645         stdout, stderr = pipe.communicate(text)
646         search_key = ('portable_data_hash'
647                       if '--portable-data-hash' in extra_args else 'uuid')
648         collection_list = arvados.api('v1').collections().list(
649             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
650         self.assertEqual(1, len(collection_list))
651         return collection_list[0]
652
653     def test_put_collection_with_high_redundancy(self):
654         # Write empty data: we're not testing CollectionWriter, just
655         # making sure collections.create tells the API server what our
656         # desired replication level is.
657         collection = self.run_and_find_collection("", ['--replication', '4'])
658         self.assertEqual(4, collection['replication_desired'])
659
660     def test_put_collection_with_default_redundancy(self):
661         collection = self.run_and_find_collection("")
662         self.assertEqual(None, collection['replication_desired'])
663
664     def test_put_collection_with_unnamed_project_link(self):
665         link = self.run_and_find_collection(
666             "Test unnamed collection",
667             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
668         username = pwd.getpwuid(os.getuid()).pw_name
669         self.assertRegexpMatches(
670             link['name'],
671             r'^Saved at .* by {}@'.format(re.escape(username)))
672
673     def test_put_collection_with_name_and_no_project(self):
674         link_name = 'Test Collection Link in home project'
675         collection = self.run_and_find_collection(
676             "Test named collection in home project",
677             ['--portable-data-hash', '--name', link_name])
678         self.assertEqual(link_name, collection['name'])
679         my_user_uuid = self.current_user()['uuid']
680         self.assertEqual(my_user_uuid, collection['owner_uuid'])
681
682     def test_put_collection_with_named_project_link(self):
683         link_name = 'Test auto Collection Link'
684         collection = self.run_and_find_collection("Test named collection",
685                                       ['--portable-data-hash',
686                                        '--name', link_name,
687                                        '--project-uuid', self.PROJECT_UUID])
688         self.assertEqual(link_name, collection['name'])
689
690
691 if __name__ == '__main__':
692     unittest.main()