9463: Temp files & dirs creation/cleanup moved to setUp and tearDown stages. Replaced...
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import mock
6 import os
7 import pwd
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import tempfile
13 import time
14 import unittest
15 import yaml
16 import threading
17 import hashlib
18 import random
19
20 from cStringIO import StringIO
21
22 import arvados
23 import arvados.commands.put as arv_put
24
25 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
26 import run_test_server
27
28 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
29     CACHE_ARGSET = [
30         [],
31         ['/dev/null'],
32         ['/dev/null', '--filename', 'empty'],
33         ['/tmp'],
34         ['/tmp', '--max-manifest-depth', '0'],
35         ['/tmp', '--max-manifest-depth', '1']
36         ]
37
38     def tearDown(self):
39         super(ArvadosPutResumeCacheTest, self).tearDown()
40         try:
41             self.last_cache.destroy()
42         except AttributeError:
43             pass
44
45     def cache_path_from_arglist(self, arglist):
46         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
47
48     def test_cache_names_stable(self):
49         for argset in self.CACHE_ARGSET:
50             self.assertEqual(self.cache_path_from_arglist(argset),
51                               self.cache_path_from_arglist(argset),
52                               "cache name changed for {}".format(argset))
53
54     def test_cache_names_unique(self):
55         results = []
56         for argset in self.CACHE_ARGSET:
57             path = self.cache_path_from_arglist(argset)
58             self.assertNotIn(path, results)
59             results.append(path)
60
61     def test_cache_names_simple(self):
62         # The goal here is to make sure the filename doesn't use characters
63         # reserved by the filesystem.  Feel free to adjust this regexp as
64         # long as it still does that.
65         bad_chars = re.compile(r'[^-\.\w]')
66         for argset in self.CACHE_ARGSET:
67             path = self.cache_path_from_arglist(argset)
68             self.assertFalse(bad_chars.search(os.path.basename(path)),
69                              "path too exotic: {}".format(path))
70
71     def test_cache_names_ignore_argument_order(self):
72         self.assertEqual(
73             self.cache_path_from_arglist(['a', 'b', 'c']),
74             self.cache_path_from_arglist(['c', 'a', 'b']))
75         self.assertEqual(
76             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
77             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
78
79     def test_cache_names_differ_for_similar_paths(self):
80         # This test needs names at / that don't exist on the real filesystem.
81         self.assertNotEqual(
82             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
83             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
84
85     def test_cache_names_ignore_irrelevant_arguments(self):
86         # Workaround: parse_arguments bails on --filename with a directory.
87         path1 = self.cache_path_from_arglist(['/tmp'])
88         args = arv_put.parse_arguments(['/tmp'])
89         args.filename = 'tmp'
90         path2 = arv_put.ResumeCache.make_path(args)
91         self.assertEqual(path1, path2,
92                          "cache path considered --filename for directory")
93         self.assertEqual(
94             self.cache_path_from_arglist(['-']),
95             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
96             "cache path considered --max-manifest-depth for file")
97
98     def test_cache_names_treat_negative_manifest_depths_identically(self):
99         base_args = ['/tmp', '--max-manifest-depth']
100         self.assertEqual(
101             self.cache_path_from_arglist(base_args + ['-1']),
102             self.cache_path_from_arglist(base_args + ['-2']))
103
104     def test_cache_names_treat_stdin_consistently(self):
105         self.assertEqual(
106             self.cache_path_from_arglist(['-', '--filename', 'test']),
107             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
108
109     def test_cache_names_identical_for_synonymous_names(self):
110         self.assertEqual(
111             self.cache_path_from_arglist(['.']),
112             self.cache_path_from_arglist([os.path.realpath('.')]))
113         testdir = self.make_tmpdir()
114         looplink = os.path.join(testdir, 'loop')
115         os.symlink(testdir, looplink)
116         self.assertEqual(
117             self.cache_path_from_arglist([testdir]),
118             self.cache_path_from_arglist([looplink]))
119
120     def test_cache_names_different_by_api_host(self):
121         config = arvados.config.settings()
122         orig_host = config.get('ARVADOS_API_HOST')
123         try:
124             name1 = self.cache_path_from_arglist(['.'])
125             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
126             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
127         finally:
128             if orig_host is None:
129                 del config['ARVADOS_API_HOST']
130             else:
131                 config['ARVADOS_API_HOST'] = orig_host
132
133     @mock.patch('arvados.keep.KeepClient.head')
134     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
135         keep_client_head.side_effect = [True]
136         thing = {}
137         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
138         with tempfile.NamedTemporaryFile() as cachefile:
139             self.last_cache = arv_put.ResumeCache(cachefile.name)
140         self.last_cache.save(thing)
141         self.last_cache.close()
142         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
143         self.assertNotEqual(None, resume_cache)
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_finished_streams(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
159         keep_client_head.side_effect = Exception('Locator not found')
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168         self.assertRaises(None, resume_cache.check_cache())
169
170     def test_basic_cache_storage(self):
171         thing = ['test', 'list']
172         with tempfile.NamedTemporaryFile() as cachefile:
173             self.last_cache = arv_put.ResumeCache(cachefile.name)
174         self.last_cache.save(thing)
175         self.assertEqual(thing, self.last_cache.load())
176
177     def test_empty_cache(self):
178         with tempfile.NamedTemporaryFile() as cachefile:
179             cache = arv_put.ResumeCache(cachefile.name)
180         self.assertRaises(ValueError, cache.load)
181
182     def test_cache_persistent(self):
183         thing = ['test', 'list']
184         path = os.path.join(self.make_tmpdir(), 'cache')
185         cache = arv_put.ResumeCache(path)
186         cache.save(thing)
187         cache.close()
188         self.last_cache = arv_put.ResumeCache(path)
189         self.assertEqual(thing, self.last_cache.load())
190
191     def test_multiple_cache_writes(self):
192         thing = ['short', 'list']
193         with tempfile.NamedTemporaryFile() as cachefile:
194             self.last_cache = arv_put.ResumeCache(cachefile.name)
195         # Start writing an object longer than the one we test, to make
196         # sure the cache file gets truncated.
197         self.last_cache.save(['long', 'long', 'list'])
198         self.last_cache.save(thing)
199         self.assertEqual(thing, self.last_cache.load())
200
201     def test_cache_is_locked(self):
202         with tempfile.NamedTemporaryFile() as cachefile:
203             cache = arv_put.ResumeCache(cachefile.name)
204             self.assertRaises(arv_put.ResumeCacheConflict,
205                               arv_put.ResumeCache, cachefile.name)
206
207     def test_cache_stays_locked(self):
208         with tempfile.NamedTemporaryFile() as cachefile:
209             self.last_cache = arv_put.ResumeCache(cachefile.name)
210             path = cachefile.name
211         self.last_cache.save('test')
212         self.assertRaises(arv_put.ResumeCacheConflict,
213                           arv_put.ResumeCache, path)
214
215     def test_destroy_cache(self):
216         cachefile = tempfile.NamedTemporaryFile(delete=False)
217         try:
218             cache = arv_put.ResumeCache(cachefile.name)
219             cache.save('test')
220             cache.destroy()
221             try:
222                 arv_put.ResumeCache(cachefile.name)
223             except arv_put.ResumeCacheConflict:
224                 self.fail("could not load cache after destroying it")
225             self.assertRaises(ValueError, cache.load)
226         finally:
227             if os.path.exists(cachefile.name):
228                 os.unlink(cachefile.name)
229
230     def test_restart_cache(self):
231         path = os.path.join(self.make_tmpdir(), 'cache')
232         cache = arv_put.ResumeCache(path)
233         cache.save('test')
234         cache.restart()
235         self.assertRaises(ValueError, cache.load)
236         self.assertRaises(arv_put.ResumeCacheConflict,
237                           arv_put.ResumeCache, path)
238
239
240 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
241                           ArvadosBaseTestCase):
242     def setUp(self):
243         super(ArvPutUploadJobTest, self).setUp()
244         run_test_server.authorize_with('active')
245         self.exit_lock = threading.Lock()
246         self.save_manifest_lock = threading.Lock()
247         # Temp files creation
248         self.tempdir = tempfile.mkdtemp()
249         subdir = os.path.join(self.tempdir, 'subdir')
250         os.mkdir(subdir)
251         data = "x" * 1024 # 1 KB
252         for i in range(1, 5):
253             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
254                 f.write(data * i)
255         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
256             f.write(data * 5)
257         # For large file resuming test
258         _, self.large_file_name = tempfile.mkstemp()
259         fileobj = open(self.large_file_name, 'w')
260         # Make sure to write just a little more than one block
261         for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
262             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
263             fileobj.write(data)
264         fileobj.close()
265
266     def tearDown(self):
267         super(ArvPutUploadJobTest, self).tearDown()
268         shutil.rmtree(self.tempdir)
269         os.unlink(self.large_file_name)
270
271     def test_writer_works_without_cache(self):
272         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
273         cwriter.start()
274         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
275
276     def test_writer_works_with_cache(self):
277         with tempfile.NamedTemporaryFile() as f:
278             f.write('foo')
279             f.flush()
280             cwriter = arv_put.ArvPutUploadJob([f.name])
281             cwriter.start()
282             self.assertEqual(3, cwriter.bytes_written)
283             # Don't destroy the cache, and start another upload
284             cwriter_new = arv_put.ArvPutUploadJob([f.name])
285             cwriter_new.start()
286             self.assertEqual(0, cwriter_new.bytes_written)
287             cwriter_new.destroy_cache()
288
289     def make_progress_tester(self):
290         progression = []
291         def record_func(written, expected):
292             progression.append((written, expected))
293         return progression, record_func
294
295     def test_progress_reporting(self):
296         with tempfile.NamedTemporaryFile() as f:
297             f.write('foo')
298             f.flush()
299             for expect_count in (None, 8):
300                 progression, reporter = self.make_progress_tester()
301                 cwriter = arv_put.ArvPutUploadJob([f.name],
302                     reporter=reporter, bytes_expected=expect_count)
303                 cwriter.start()
304                 cwriter.destroy_cache()
305                 self.assertIn((3, expect_count), progression)
306
307     def test_writer_upload_directory(self):
308         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
309         cwriter.start()
310         cwriter.destroy_cache()
311         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
312
313     def test_resume_large_file_upload(self):
314         # Proxying ArvadosFile.writeto() method to be able to synchronize it
315         # with partial manifest saves
316         orig_writeto_func = getattr(arvados.arvfile.ArvadosFile, 'writeto')
317         orig_update_func = getattr(arv_put.ArvPutUploadJob, '_update')
318         def wrapped_update(*args, **kwargs):
319             job_instance = args[0]
320             orig_update_func(*args, **kwargs)
321             with self.save_manifest_lock:
322                 # Allow abnormal termination when first block written
323                 if job_instance._collection_size(job_instance._my_collection()) == arvados.config.KEEP_BLOCK_SIZE:
324                     self.exit_lock.release()
325         def wrapped_writeto(*args, **kwargs):
326             data = args[2]
327             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
328                 # Lock on the last block write call, waiting for the
329                 # manifest to be saved
330                 with self.exit_lock:
331                     raise SystemExit('Test exception')
332             ret = orig_writeto_func(*args, **kwargs)
333             self.save_manifest_lock.release()
334             return ret
335         setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_writeto)
336         setattr(arv_put.ArvPutUploadJob, '_update', wrapped_update)
337         # MD5 hash of random data to be uploaded
338         md5_original = hashlib.md5()
339         with open(self.large_file_name, 'r') as f:
340             data = f.read()
341             md5_original.update(data)
342         self.exit_lock.acquire()
343         self.save_manifest_lock.acquire()
344         writer = arv_put.ArvPutUploadJob([self.large_file_name],
345                                          update_time=0.1)
346         # First upload: partially completed with simulated error
347         try:
348             self.assertRaises(SystemExit, writer.start())
349         except SystemExit:
350             # Avoid getting a ResumeCacheConflict on the 2nd run
351             writer._cache_file.close()
352         self.assertGreater(writer.bytes_written, 0)
353         self.assertLess(writer.bytes_written,
354                         os.path.getsize(self.large_file_name))
355
356         # Restore the ArvadosFile.writeto() method to before retrying
357         setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_writeto_func)
358         # Restore the ArvPutUploadJob._update() method to before retrying
359         setattr(arv_put.ArvPutUploadJob, '_update', orig_update_func)
360         writer_new = arv_put.ArvPutUploadJob([self.large_file_name])
361         writer_new.start()
362         writer_new.destroy_cache()
363         self.assertEqual(os.path.getsize(self.large_file_name),
364                          writer.bytes_written + writer_new.bytes_written)
365         # Read the uploaded file to compare its md5 hash
366         md5_uploaded = hashlib.md5()
367         c = arvados.collection.Collection(writer_new.manifest_text())
368         with c.open(os.path.basename(self.large_file_name), 'r') as f:
369             new_data = f.read()
370             md5_uploaded.update(new_data)
371         self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
372
373
374 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
375     TEST_SIZE = os.path.getsize(__file__)
376
377     def test_expected_bytes_for_file(self):
378         self.assertEqual(self.TEST_SIZE,
379                           arv_put.expected_bytes_for([__file__]))
380
381     def test_expected_bytes_for_tree(self):
382         tree = self.make_tmpdir()
383         shutil.copyfile(__file__, os.path.join(tree, 'one'))
384         shutil.copyfile(__file__, os.path.join(tree, 'two'))
385         self.assertEqual(self.TEST_SIZE * 2,
386                           arv_put.expected_bytes_for([tree]))
387         self.assertEqual(self.TEST_SIZE * 3,
388                           arv_put.expected_bytes_for([tree, __file__]))
389
390     def test_expected_bytes_for_device(self):
391         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
392         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
393
394
395 class ArvadosPutReportTest(ArvadosBaseTestCase):
396     def test_machine_progress(self):
397         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
398             expect = ": {} written {} total\n".format(
399                 count, -1 if (total is None) else total)
400             self.assertTrue(
401                 arv_put.machine_progress(count, total).endswith(expect))
402
403     def test_known_human_progress(self):
404         for count, total in [(0, 1), (2, 4), (45, 60)]:
405             expect = '{:.1%}'.format(float(count) / total)
406             actual = arv_put.human_progress(count, total)
407             self.assertTrue(actual.startswith('\r'))
408             self.assertIn(expect, actual)
409
410     def test_unknown_human_progress(self):
411         for count in [1, 20, 300, 4000, 50000]:
412             self.assertTrue(re.search(r'\b{}\b'.format(count),
413                                       arv_put.human_progress(count, None)))
414
415
416 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
417     MAIN_SERVER = {}
418     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
419
420     def call_main_with_args(self, args):
421         self.main_stdout = StringIO()
422         self.main_stderr = StringIO()
423         return arv_put.main(args, self.main_stdout, self.main_stderr)
424
425     def call_main_on_test_file(self, args=[]):
426         with self.make_test_file() as testfile:
427             path = testfile.name
428             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
429         self.assertTrue(
430             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
431                                         '098f6bcd4621d373cade4e832627b4f6')),
432             "did not find file stream in Keep store")
433
434     def setUp(self):
435         super(ArvadosPutTest, self).setUp()
436         run_test_server.authorize_with('active')
437         arv_put.api_client = None
438
439     def tearDown(self):
440         for outbuf in ['main_stdout', 'main_stderr']:
441             if hasattr(self, outbuf):
442                 getattr(self, outbuf).close()
443                 delattr(self, outbuf)
444         super(ArvadosPutTest, self).tearDown()
445
446     def test_simple_file_put(self):
447         self.call_main_on_test_file()
448
449     def test_put_with_unwriteable_cache_dir(self):
450         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
451         cachedir = self.make_tmpdir()
452         os.chmod(cachedir, 0o0)
453         arv_put.ResumeCache.CACHE_DIR = cachedir
454         try:
455             self.call_main_on_test_file()
456         finally:
457             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
458             os.chmod(cachedir, 0o700)
459
460     def test_put_with_unwritable_cache_subdir(self):
461         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
462         cachedir = self.make_tmpdir()
463         os.chmod(cachedir, 0o0)
464         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
465         try:
466             self.call_main_on_test_file()
467         finally:
468             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
469             os.chmod(cachedir, 0o700)
470
471     def test_put_block_replication(self):
472         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
473              mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
474             cache_mock.side_effect = ValueError
475             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
476             self.call_main_on_test_file(['--replication', '1'])
477             self.call_main_on_test_file(['--replication', '4'])
478             self.call_main_on_test_file(['--replication', '5'])
479             self.assertEqual(
480                 [x[-1].get('copies') for x in put_mock.call_args_list],
481                 [1, 4, 5])
482
483     def test_normalize(self):
484         testfile1 = self.make_test_file()
485         testfile2 = self.make_test_file()
486         test_paths = [testfile1.name, testfile2.name]
487         # Reverse-sort the paths, so normalization must change their order.
488         test_paths.sort(reverse=True)
489         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
490                                  test_paths)
491         manifest = self.main_stdout.getvalue()
492         # Assert the second file we specified appears first in the manifest.
493         file_indices = [manifest.find(':' + os.path.basename(path))
494                         for path in test_paths]
495         self.assertGreater(*file_indices)
496
497     def test_error_name_without_collection(self):
498         self.assertRaises(SystemExit, self.call_main_with_args,
499                           ['--name', 'test without Collection',
500                            '--stream', '/dev/null'])
501
502     def test_error_when_project_not_found(self):
503         self.assertRaises(SystemExit,
504                           self.call_main_with_args,
505                           ['--project-uuid', self.Z_UUID])
506
507     def test_error_bad_project_uuid(self):
508         self.assertRaises(SystemExit,
509                           self.call_main_with_args,
510                           ['--project-uuid', self.Z_UUID, '--stream'])
511
512     def test_api_error_handling(self):
513         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
514         coll_save_mock.side_effect = arvados.errors.ApiError(
515             fake_httplib2_response(403), '{}')
516         arvados.collection.Collection.save_new = coll_save_mock
517         with self.assertRaises(SystemExit) as exc_test:
518             self.call_main_with_args(['/dev/null'])
519         self.assertLess(0, exc_test.exception.args[0])
520         self.assertLess(0, coll_save_mock.call_count)
521         self.assertEqual("", self.main_stdout.getvalue())
522
523
524 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
525                             ArvadosBaseTestCase):
526     def _getKeepServerConfig():
527         for config_file, mandatory in [
528                 ['application.yml', False], ['application.default.yml', True]]:
529             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
530                                 "api", "config", config_file)
531             if not mandatory and not os.path.exists(path):
532                 continue
533             with open(path) as f:
534                 rails_config = yaml.load(f.read())
535                 for config_section in ['test', 'common']:
536                     try:
537                         key = rails_config[config_section]["blob_signing_key"]
538                     except (KeyError, TypeError):
539                         pass
540                     else:
541                         return {'blob_signing_key': key,
542                                 'enforce_permissions': True}
543         return {'blog_signing_key': None, 'enforce_permissions': False}
544
545     MAIN_SERVER = {}
546     KEEP_SERVER = _getKeepServerConfig()
547     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
548
549     @classmethod
550     def setUpClass(cls):
551         super(ArvPutIntegrationTest, cls).setUpClass()
552         cls.ENVIRON = os.environ.copy()
553         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
554
555     def setUp(self):
556         super(ArvPutIntegrationTest, self).setUp()
557         arv_put.api_client = None
558
559     def authorize_with(self, token_name):
560         run_test_server.authorize_with(token_name)
561         for v in ["ARVADOS_API_HOST",
562                   "ARVADOS_API_HOST_INSECURE",
563                   "ARVADOS_API_TOKEN"]:
564             self.ENVIRON[v] = arvados.config.settings()[v]
565         arv_put.api_client = arvados.api('v1')
566
567     def current_user(self):
568         return arv_put.api_client.users().current().execute()
569
570     def test_check_real_project_found(self):
571         self.authorize_with('active')
572         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
573                         "did not correctly find test fixture project")
574
575     def test_check_error_finding_nonexistent_uuid(self):
576         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
577         self.authorize_with('active')
578         try:
579             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
580                                                   0)
581         except ValueError as error:
582             self.assertIn(BAD_UUID, error.message)
583         else:
584             self.assertFalse(result, "incorrectly found nonexistent project")
585
586     def test_check_error_finding_nonexistent_project(self):
587         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
588         self.authorize_with('active')
589         with self.assertRaises(apiclient.errors.HttpError):
590             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
591                                                   0)
592
593     def test_short_put_from_stdin(self):
594         # Have to run this as an integration test since arv-put can't
595         # read from the tests' stdin.
596         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
597         # case, because the /proc entry is already gone by the time it tries.
598         pipe = subprocess.Popen(
599             [sys.executable, arv_put.__file__, '--stream'],
600             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
601             stderr=subprocess.STDOUT, env=self.ENVIRON)
602         pipe.stdin.write('stdin test\n')
603         pipe.stdin.close()
604         deadline = time.time() + 5
605         while (pipe.poll() is None) and (time.time() < deadline):
606             time.sleep(.1)
607         returncode = pipe.poll()
608         if returncode is None:
609             pipe.terminate()
610             self.fail("arv-put did not PUT from stdin within 5 seconds")
611         elif returncode != 0:
612             sys.stdout.write(pipe.stdout.read())
613             self.fail("arv-put returned exit code {}".format(returncode))
614         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
615
616     def test_ArvPutSignedManifest(self):
617         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
618         # the newly created manifest from the API server, testing to confirm
619         # that the block locators in the returned manifest are signed.
620         self.authorize_with('active')
621
622         # Before doing anything, demonstrate that the collection
623         # we're about to create is not present in our test fixture.
624         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
625         with self.assertRaises(apiclient.errors.HttpError):
626             notfound = arv_put.api_client.collections().get(
627                 uuid=manifest_uuid).execute()
628
629         datadir = self.make_tmpdir()
630         with open(os.path.join(datadir, "foo"), "w") as f:
631             f.write("The quick brown fox jumped over the lazy dog")
632         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
633                              stdout=subprocess.PIPE, env=self.ENVIRON)
634         (arvout, arverr) = p.communicate()
635         self.assertEqual(arverr, None)
636         self.assertEqual(p.returncode, 0)
637
638         # The manifest text stored in the API server under the same
639         # manifest UUID must use signed locators.
640         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
641         self.assertRegexpMatches(
642             c['manifest_text'],
643             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
644
645         os.remove(os.path.join(datadir, "foo"))
646         os.rmdir(datadir)
647
648     def run_and_find_collection(self, text, extra_args=[]):
649         self.authorize_with('active')
650         pipe = subprocess.Popen(
651             [sys.executable, arv_put.__file__] + extra_args,
652             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
653             stderr=subprocess.PIPE, env=self.ENVIRON)
654         stdout, stderr = pipe.communicate(text)
655         search_key = ('portable_data_hash'
656                       if '--portable-data-hash' in extra_args else 'uuid')
657         collection_list = arvados.api('v1').collections().list(
658             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
659         self.assertEqual(1, len(collection_list))
660         return collection_list[0]
661
662     def test_put_collection_with_high_redundancy(self):
663         # Write empty data: we're not testing CollectionWriter, just
664         # making sure collections.create tells the API server what our
665         # desired replication level is.
666         collection = self.run_and_find_collection("", ['--replication', '4'])
667         self.assertEqual(4, collection['replication_desired'])
668
669     def test_put_collection_with_default_redundancy(self):
670         collection = self.run_and_find_collection("")
671         self.assertEqual(None, collection['replication_desired'])
672
673     def test_put_collection_with_unnamed_project_link(self):
674         link = self.run_and_find_collection(
675             "Test unnamed collection",
676             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
677         username = pwd.getpwuid(os.getuid()).pw_name
678         self.assertRegexpMatches(
679             link['name'],
680             r'^Saved at .* by {}@'.format(re.escape(username)))
681
682     def test_put_collection_with_name_and_no_project(self):
683         link_name = 'Test Collection Link in home project'
684         collection = self.run_and_find_collection(
685             "Test named collection in home project",
686             ['--portable-data-hash', '--name', link_name])
687         self.assertEqual(link_name, collection['name'])
688         my_user_uuid = self.current_user()['uuid']
689         self.assertEqual(my_user_uuid, collection['owner_uuid'])
690
691     def test_put_collection_with_named_project_link(self):
692         link_name = 'Test auto Collection Link'
693         collection = self.run_and_find_collection("Test named collection",
694                                       ['--portable-data-hash',
695                                        '--name', link_name,
696                                        '--project-uuid', self.PROJECT_UUID])
697         self.assertEqual(link_name, collection['name'])
698
699
700 if __name__ == '__main__':
701     unittest.main()