9463: Finished first draft on arv-put command use of Collection class. Also, partial...
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import mock
6 import os
7 import pwd
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import tempfile
13 import time
14 import unittest
15 import yaml
16 import multiprocessing
17 import shutil
18 import hashlib
19 import random
20
21 from cStringIO import StringIO
22
23 import arvados
24 import arvados.commands.put as arv_put
25
26 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
27 import run_test_server
28
29 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
30     CACHE_ARGSET = [
31         [],
32         ['/dev/null'],
33         ['/dev/null', '--filename', 'empty'],
34         ['/tmp'],
35         ['/tmp', '--max-manifest-depth', '0'],
36         ['/tmp', '--max-manifest-depth', '1']
37         ]
38
39     def tearDown(self):
40         super(ArvadosPutResumeCacheTest, self).tearDown()
41         try:
42             self.last_cache.destroy()
43         except AttributeError:
44             pass
45
46     def cache_path_from_arglist(self, arglist):
47         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
48
49     def test_cache_names_stable(self):
50         for argset in self.CACHE_ARGSET:
51             self.assertEqual(self.cache_path_from_arglist(argset),
52                               self.cache_path_from_arglist(argset),
53                               "cache name changed for {}".format(argset))
54
55     def test_cache_names_unique(self):
56         results = []
57         for argset in self.CACHE_ARGSET:
58             path = self.cache_path_from_arglist(argset)
59             self.assertNotIn(path, results)
60             results.append(path)
61
62     def test_cache_names_simple(self):
63         # The goal here is to make sure the filename doesn't use characters
64         # reserved by the filesystem.  Feel free to adjust this regexp as
65         # long as it still does that.
66         bad_chars = re.compile(r'[^-\.\w]')
67         for argset in self.CACHE_ARGSET:
68             path = self.cache_path_from_arglist(argset)
69             self.assertFalse(bad_chars.search(os.path.basename(path)),
70                              "path too exotic: {}".format(path))
71
72     def test_cache_names_ignore_argument_order(self):
73         self.assertEqual(
74             self.cache_path_from_arglist(['a', 'b', 'c']),
75             self.cache_path_from_arglist(['c', 'a', 'b']))
76         self.assertEqual(
77             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
78             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
79
80     def test_cache_names_differ_for_similar_paths(self):
81         # This test needs names at / that don't exist on the real filesystem.
82         self.assertNotEqual(
83             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
84             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
85
86     def test_cache_names_ignore_irrelevant_arguments(self):
87         # Workaround: parse_arguments bails on --filename with a directory.
88         path1 = self.cache_path_from_arglist(['/tmp'])
89         args = arv_put.parse_arguments(['/tmp'])
90         args.filename = 'tmp'
91         path2 = arv_put.ResumeCache.make_path(args)
92         self.assertEqual(path1, path2,
93                          "cache path considered --filename for directory")
94         self.assertEqual(
95             self.cache_path_from_arglist(['-']),
96             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
97             "cache path considered --max-manifest-depth for file")
98
99     def test_cache_names_treat_negative_manifest_depths_identically(self):
100         base_args = ['/tmp', '--max-manifest-depth']
101         self.assertEqual(
102             self.cache_path_from_arglist(base_args + ['-1']),
103             self.cache_path_from_arglist(base_args + ['-2']))
104
105     def test_cache_names_treat_stdin_consistently(self):
106         self.assertEqual(
107             self.cache_path_from_arglist(['-', '--filename', 'test']),
108             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
109
110     def test_cache_names_identical_for_synonymous_names(self):
111         self.assertEqual(
112             self.cache_path_from_arglist(['.']),
113             self.cache_path_from_arglist([os.path.realpath('.')]))
114         testdir = self.make_tmpdir()
115         looplink = os.path.join(testdir, 'loop')
116         os.symlink(testdir, looplink)
117         self.assertEqual(
118             self.cache_path_from_arglist([testdir]),
119             self.cache_path_from_arglist([looplink]))
120
121     def test_cache_names_different_by_api_host(self):
122         config = arvados.config.settings()
123         orig_host = config.get('ARVADOS_API_HOST')
124         try:
125             name1 = self.cache_path_from_arglist(['.'])
126             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
127             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
128         finally:
129             if orig_host is None:
130                 del config['ARVADOS_API_HOST']
131             else:
132                 config['ARVADOS_API_HOST'] = orig_host
133
134     @mock.patch('arvados.keep.KeepClient.head')
135     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
136         keep_client_head.side_effect = [True]
137         thing = {}
138         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
139         with tempfile.NamedTemporaryFile() as cachefile:
140             self.last_cache = arv_put.ResumeCache(cachefile.name)
141         self.last_cache.save(thing)
142         self.last_cache.close()
143         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
144         self.assertNotEqual(None, resume_cache)
145
146     @mock.patch('arvados.keep.KeepClient.head')
147     def test_resume_cache_with_finished_streams(self, keep_client_head):
148         keep_client_head.side_effect = [True]
149         thing = {}
150         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
151         with tempfile.NamedTemporaryFile() as cachefile:
152             self.last_cache = arv_put.ResumeCache(cachefile.name)
153         self.last_cache.save(thing)
154         self.last_cache.close()
155         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
156         self.assertNotEqual(None, resume_cache)
157
158     @mock.patch('arvados.keep.KeepClient.head')
159     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
160         keep_client_head.side_effect = Exception('Locator not found')
161         thing = {}
162         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
163         with tempfile.NamedTemporaryFile() as cachefile:
164             self.last_cache = arv_put.ResumeCache(cachefile.name)
165         self.last_cache.save(thing)
166         self.last_cache.close()
167         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
168         self.assertNotEqual(None, resume_cache)
169         self.assertRaises(None, resume_cache.check_cache())
170
171     def test_basic_cache_storage(self):
172         thing = ['test', 'list']
173         with tempfile.NamedTemporaryFile() as cachefile:
174             self.last_cache = arv_put.ResumeCache(cachefile.name)
175         self.last_cache.save(thing)
176         self.assertEqual(thing, self.last_cache.load())
177
178     def test_empty_cache(self):
179         with tempfile.NamedTemporaryFile() as cachefile:
180             cache = arv_put.ResumeCache(cachefile.name)
181         self.assertRaises(ValueError, cache.load)
182
183     def test_cache_persistent(self):
184         thing = ['test', 'list']
185         path = os.path.join(self.make_tmpdir(), 'cache')
186         cache = arv_put.ResumeCache(path)
187         cache.save(thing)
188         cache.close()
189         self.last_cache = arv_put.ResumeCache(path)
190         self.assertEqual(thing, self.last_cache.load())
191
192     def test_multiple_cache_writes(self):
193         thing = ['short', 'list']
194         with tempfile.NamedTemporaryFile() as cachefile:
195             self.last_cache = arv_put.ResumeCache(cachefile.name)
196         # Start writing an object longer than the one we test, to make
197         # sure the cache file gets truncated.
198         self.last_cache.save(['long', 'long', 'list'])
199         self.last_cache.save(thing)
200         self.assertEqual(thing, self.last_cache.load())
201
202     def test_cache_is_locked(self):
203         with tempfile.NamedTemporaryFile() as cachefile:
204             cache = arv_put.ResumeCache(cachefile.name)
205             self.assertRaises(arv_put.ResumeCacheConflict,
206                               arv_put.ResumeCache, cachefile.name)
207
208     def test_cache_stays_locked(self):
209         with tempfile.NamedTemporaryFile() as cachefile:
210             self.last_cache = arv_put.ResumeCache(cachefile.name)
211             path = cachefile.name
212         self.last_cache.save('test')
213         self.assertRaises(arv_put.ResumeCacheConflict,
214                           arv_put.ResumeCache, path)
215
216     def test_destroy_cache(self):
217         cachefile = tempfile.NamedTemporaryFile(delete=False)
218         try:
219             cache = arv_put.ResumeCache(cachefile.name)
220             cache.save('test')
221             cache.destroy()
222             try:
223                 arv_put.ResumeCache(cachefile.name)
224             except arv_put.ResumeCacheConflict:
225                 self.fail("could not load cache after destroying it")
226             self.assertRaises(ValueError, cache.load)
227         finally:
228             if os.path.exists(cachefile.name):
229                 os.unlink(cachefile.name)
230
231     def test_restart_cache(self):
232         path = os.path.join(self.make_tmpdir(), 'cache')
233         cache = arv_put.ResumeCache(path)
234         cache.save('test')
235         cache.restart()
236         self.assertRaises(ValueError, cache.load)
237         self.assertRaises(arv_put.ResumeCacheConflict,
238                           arv_put.ResumeCache, path)
239
240
241 class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
242     MAIN_SERVER = {}
243     KEEP_SERVER = {}
244     
245     def setUp(self):
246         self.lock = multiprocessing.Lock()
247
248     def fake_reporter(self, written, expected):
249         self.lock.release() # Allow caller process to terminate() us...
250     
251     def bg_uploader(self, filename):
252         cache = arv_put.ArvPutCollectionCache([filename])
253         c = arv_put.ArvPutCollection(reporter=self.fake_reporter, cache=cache)
254         c.collection_flush_time = 0 # flush collection on every block flush, just for this test
255         c.write_file(filename, os.path.basename(filename))
256
257     def test_write_collection_with_name(self):
258         name = 'This is a collection'
259         c = arv_put.ArvPutCollection(name=name)
260         self.assertEqual(name, c.name())
261
262     def test_write_file_on_collection_without_save(self):
263         c = arv_put.ArvPutCollection(should_save=False)
264         with tempfile.NamedTemporaryFile(delete=False) as f:
265             f.write("The quick brown fox jumped over the lazy dog")
266         c.write_file(f.name, os.path.basename(f.name))
267         self.assertEqual(None, c.manifest_locator())
268         os.unlink(f.name)
269
270     def test_write_file_and_check_data_locators(self):
271         c = arv_put.ArvPutCollection(should_save=False)
272         with tempfile.NamedTemporaryFile(delete=False) as f:
273             # Writing ~90 MB, so that it writes 2 data blocks
274             for _ in range(2 * 1024 * 1024):
275                 f.write("The quick brown fox jumped over the lazy dog\n")
276         c.write_file(f.name, os.path.basename(f.name))
277         self.assertEqual(2, len(c.data_locators()))
278         os.unlink(f.name)
279         
280     def test_write_directory_and_check_data_locators(self):
281         data = 'b' * 1024 * 1024 # 1 MB
282         tmpdir = tempfile.mkdtemp()
283         for size in [1, 5, 10, 70]:
284             with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
285                 for _ in range(size):
286                     f.write(data)
287         os.mkdir(os.path.join(tmpdir, 'subdir1'))
288         for size in [2, 4, 6]:
289             with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
290                 for _ in range(size):
291                     f.write(data)
292         c = arv_put.ArvPutCollection()
293         c.write_directory_tree(tmpdir)
294         shutil.rmtree(tmpdir)
295         self.assertEqual(8, len(c.data_locators()))
296
297     def test_resume_large_file_upload(self):
298         _, filename = tempfile.mkstemp()
299         md5_original = hashlib.md5()
300         md5_uploaded = hashlib.md5()
301         fileobj = open(filename, 'w')
302         for _ in range(70):
303             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
304             fileobj.write(data)
305             md5_original.update(data)
306         fileobj.close()
307         self.lock.acquire()
308         uploader = multiprocessing.Process(target=self.bg_uploader, args=(filename,))
309         uploader.start()
310         self.lock.acquire() # We can now proceed, because one block and collection flush()ed
311         self.lock.release()
312         uploader.terminate()
313         uploader.join()
314         cache = arv_put.ArvPutCollectionCache([filename])
315         c = arv_put.ArvPutCollection(cache=cache)
316         self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename))
317         c.write_file(filename, os.path.basename(filename))
318         uploaded = c.collection.open(os.path.basename(filename), 'r')
319         while True:
320             data = uploaded.read(1024*1024)
321             if not data:
322                 break
323             md5_uploaded.update(data)
324         os.unlink(filename)
325         cache.destroy()
326         self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
327
328     def test_write_directory_twice(self):
329         data = 'b' * 1024 * 1024
330         tmpdir = tempfile.mkdtemp()
331         for size in [1, 5, 10, 70]:
332             with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
333                 for _ in range(size):
334                     f.write(data)
335         os.mkdir(os.path.join(tmpdir, 'subdir1'))
336         for size in [2, 4, 6]:
337             with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
338                 for _ in range(size):
339                     f.write(data)
340         c_cache = arv_put.ArvPutCollectionCache([tmpdir])
341         c = arv_put.ArvPutCollection(cache=c_cache)
342         c.write_directory_tree(tmpdir)
343         c_cache.close()
344         d_cache = arv_put.ArvPutCollectionCache([tmpdir])
345         d = arv_put.ArvPutCollection(cache=d_cache)
346         d.write_directory_tree(tmpdir)
347         d_cache.close()
348         c_cache.destroy()
349         d_cache.destroy()
350         shutil.rmtree(tmpdir)
351         self.assertNotEqual(c.bytes_written, d.bytes_written)
352         # self.assertGreater(c.bytes_written, 0)
353         # self.assertEqual(d.bytes_written, 0)
354         
355
356 class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
357                                      ArvadosBaseTestCase):
358     def setUp(self):
359         super(ArvadosPutCollectionWriterTest, self).setUp()
360         run_test_server.authorize_with('active')
361         with tempfile.NamedTemporaryFile(delete=False) as cachefile:
362             self.cache = arv_put.ResumeCache(cachefile.name)
363             self.cache_filename = cachefile.name
364
365     def tearDown(self):
366         super(ArvadosPutCollectionWriterTest, self).tearDown()
367         if os.path.exists(self.cache_filename):
368             self.cache.destroy()
369         self.cache.close()
370
371     def test_writer_caches(self):
372         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
373         cwriter.write_file('/dev/null')
374         cwriter.cache_state()
375         self.assertTrue(self.cache.load())
376         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
377
378     def test_writer_works_without_cache(self):
379         cwriter = arv_put.ArvPutCollectionWriter()
380         cwriter.write_file('/dev/null')
381         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
382
383     def test_writer_resumes_from_cache(self):
384         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
385         with self.make_test_file() as testfile:
386             cwriter.write_file(testfile.name, 'test')
387             cwriter.cache_state()
388             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
389                 self.cache)
390             self.assertEqual(
391                 ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
392                 new_writer.manifest_text())
393
394     def test_new_writer_from_stale_cache(self):
395         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
396         with self.make_test_file() as testfile:
397             cwriter.write_file(testfile.name, 'test')
398         new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
399         new_writer.write_file('/dev/null')
400         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
401
402     def test_new_writer_from_empty_cache(self):
403         cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
404         cwriter.write_file('/dev/null')
405         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
406
407     def test_writer_resumable_after_arbitrary_bytes(self):
408         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
409         # These bytes are intentionally not valid UTF-8.
410         with self.make_test_file('\x00\x07\xe2') as testfile:
411             cwriter.write_file(testfile.name, 'test')
412             cwriter.cache_state()
413             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
414                 self.cache)
415         self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
416
417     def make_progress_tester(self):
418         progression = []
419         def record_func(written, expected):
420             progression.append((written, expected))
421         return progression, record_func
422
423     def test_progress_reporting(self):
424         for expect_count in (None, 8):
425             progression, reporter = self.make_progress_tester()
426             cwriter = arv_put.ArvPutCollectionWriter(
427                 reporter=reporter, bytes_expected=expect_count)
428             with self.make_test_file() as testfile:
429                 cwriter.write_file(testfile.name, 'test')
430             cwriter.finish_current_stream()
431             self.assertIn((4, expect_count), progression)
432
433     def test_resume_progress(self):
434         cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
435         with self.make_test_file() as testfile:
436             # Set up a writer with some flushed bytes.
437             cwriter.write_file(testfile.name, 'test')
438             cwriter.finish_current_stream()
439             cwriter.cache_state()
440             new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
441             self.assertEqual(new_writer.bytes_written, 4)
442
443
444 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
445     TEST_SIZE = os.path.getsize(__file__)
446
447     def test_expected_bytes_for_file(self):
448         self.assertEqual(self.TEST_SIZE,
449                           arv_put.expected_bytes_for([__file__]))
450
451     def test_expected_bytes_for_tree(self):
452         tree = self.make_tmpdir()
453         shutil.copyfile(__file__, os.path.join(tree, 'one'))
454         shutil.copyfile(__file__, os.path.join(tree, 'two'))
455         self.assertEqual(self.TEST_SIZE * 2,
456                           arv_put.expected_bytes_for([tree]))
457         self.assertEqual(self.TEST_SIZE * 3,
458                           arv_put.expected_bytes_for([tree, __file__]))
459
460     def test_expected_bytes_for_device(self):
461         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
462         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
463
464
465 class ArvadosPutReportTest(ArvadosBaseTestCase):
466     def test_machine_progress(self):
467         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
468             expect = ": {} written {} total\n".format(
469                 count, -1 if (total is None) else total)
470             self.assertTrue(
471                 arv_put.machine_progress(count, total).endswith(expect))
472
473     def test_known_human_progress(self):
474         for count, total in [(0, 1), (2, 4), (45, 60)]:
475             expect = '{:.1%}'.format(float(count) / total)
476             actual = arv_put.human_progress(count, total)
477             self.assertTrue(actual.startswith('\r'))
478             self.assertIn(expect, actual)
479
480     def test_unknown_human_progress(self):
481         for count in [1, 20, 300, 4000, 50000]:
482             self.assertTrue(re.search(r'\b{}\b'.format(count),
483                                       arv_put.human_progress(count, None)))
484
485
486 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
487     MAIN_SERVER = {}
488     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
489
490     def call_main_with_args(self, args):
491         self.main_stdout = StringIO()
492         self.main_stderr = StringIO()
493         return arv_put.main(args, self.main_stdout, self.main_stderr)
494
495     def call_main_on_test_file(self, args=[]):
496         with self.make_test_file() as testfile:
497             path = testfile.name
498             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
499         self.assertTrue(
500             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
501                                         '098f6bcd4621d373cade4e832627b4f6')),
502             "did not find file stream in Keep store")
503
504     def setUp(self):
505         super(ArvadosPutTest, self).setUp()
506         run_test_server.authorize_with('active')
507         arv_put.api_client = None
508
509     def tearDown(self):
510         for outbuf in ['main_stdout', 'main_stderr']:
511             if hasattr(self, outbuf):
512                 getattr(self, outbuf).close()
513                 delattr(self, outbuf)
514         super(ArvadosPutTest, self).tearDown()
515
516     def test_simple_file_put(self):
517         self.call_main_on_test_file()
518
519     def test_put_with_unwriteable_cache_dir(self):
520         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
521         cachedir = self.make_tmpdir()
522         os.chmod(cachedir, 0o0)
523         arv_put.ResumeCache.CACHE_DIR = cachedir
524         try:
525             self.call_main_on_test_file()
526         finally:
527             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
528             os.chmod(cachedir, 0o700)
529
530     def test_put_with_unwritable_cache_subdir(self):
531         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
532         cachedir = self.make_tmpdir()
533         os.chmod(cachedir, 0o0)
534         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
535         try:
536             self.call_main_on_test_file()
537         finally:
538             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
539             os.chmod(cachedir, 0o700)
540
541     def test_put_block_replication(self):
542         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
543              mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
544             cache_mock.side_effect = ValueError
545             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
546             self.call_main_on_test_file(['--replication', '1'])
547             self.call_main_on_test_file(['--replication', '4'])
548             self.call_main_on_test_file(['--replication', '5'])
549             self.assertEqual(
550                 [x[-1].get('copies') for x in put_mock.call_args_list],
551                 [1, 4, 5])
552
553     def test_normalize(self):
554         testfile1 = self.make_test_file()
555         testfile2 = self.make_test_file()
556         test_paths = [testfile1.name, testfile2.name]
557         # Reverse-sort the paths, so normalization must change their order.
558         test_paths.sort(reverse=True)
559         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
560                                  test_paths)
561         manifest = self.main_stdout.getvalue()
562         # Assert the second file we specified appears first in the manifest.
563         file_indices = [manifest.find(':' + os.path.basename(path))
564                         for path in test_paths]
565         self.assertGreater(*file_indices)
566
567     def test_error_name_without_collection(self):
568         self.assertRaises(SystemExit, self.call_main_with_args,
569                           ['--name', 'test without Collection',
570                            '--stream', '/dev/null'])
571
572     def test_error_when_project_not_found(self):
573         self.assertRaises(SystemExit,
574                           self.call_main_with_args,
575                           ['--project-uuid', self.Z_UUID])
576
577     def test_error_bad_project_uuid(self):
578         self.assertRaises(SystemExit,
579                           self.call_main_with_args,
580                           ['--project-uuid', self.Z_UUID, '--stream'])
581
582     def test_api_error_handling(self):
583         collections_mock = mock.Mock(name='arv.collections()')
584         coll_create_mock = collections_mock().create().execute
585         coll_create_mock.side_effect = arvados.errors.ApiError(
586             fake_httplib2_response(403), '{}')
587         arv_put.api_client = arvados.api('v1')
588         arv_put.api_client.collections = collections_mock
589         with self.assertRaises(SystemExit) as exc_test:
590             self.call_main_with_args(['/dev/null'])
591         self.assertLess(0, exc_test.exception.args[0])
592         self.assertLess(0, coll_create_mock.call_count)
593         self.assertEqual("", self.main_stdout.getvalue())
594
595
596 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
597                             ArvadosBaseTestCase):
598     def _getKeepServerConfig():
599         for config_file, mandatory in [
600                 ['application.yml', False], ['application.default.yml', True]]:
601             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
602                                 "api", "config", config_file)
603             if not mandatory and not os.path.exists(path):
604                 continue
605             with open(path) as f:
606                 rails_config = yaml.load(f.read())
607                 for config_section in ['test', 'common']:
608                     try:
609                         key = rails_config[config_section]["blob_signing_key"]
610                     except (KeyError, TypeError):
611                         pass
612                     else:
613                         return {'blob_signing_key': key,
614                                 'enforce_permissions': True}
615         return {'blog_signing_key': None, 'enforce_permissions': False}
616
617     MAIN_SERVER = {}
618     KEEP_SERVER = _getKeepServerConfig()
619     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
620
621     @classmethod
622     def setUpClass(cls):
623         super(ArvPutIntegrationTest, cls).setUpClass()
624         cls.ENVIRON = os.environ.copy()
625         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
626
627     def setUp(self):
628         super(ArvPutIntegrationTest, self).setUp()
629         arv_put.api_client = None
630
631     def authorize_with(self, token_name):
632         run_test_server.authorize_with(token_name)
633         for v in ["ARVADOS_API_HOST",
634                   "ARVADOS_API_HOST_INSECURE",
635                   "ARVADOS_API_TOKEN"]:
636             self.ENVIRON[v] = arvados.config.settings()[v]
637         arv_put.api_client = arvados.api('v1')
638
639     def current_user(self):
640         return arv_put.api_client.users().current().execute()
641
642     def test_check_real_project_found(self):
643         self.authorize_with('active')
644         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
645                         "did not correctly find test fixture project")
646
647     def test_check_error_finding_nonexistent_uuid(self):
648         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
649         self.authorize_with('active')
650         try:
651             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
652                                                   0)
653         except ValueError as error:
654             self.assertIn(BAD_UUID, error.message)
655         else:
656             self.assertFalse(result, "incorrectly found nonexistent project")
657
658     def test_check_error_finding_nonexistent_project(self):
659         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
660         self.authorize_with('active')
661         with self.assertRaises(apiclient.errors.HttpError):
662             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
663                                                   0)
664
665     def test_short_put_from_stdin(self):
666         # Have to run this as an integration test since arv-put can't
667         # read from the tests' stdin.
668         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
669         # case, because the /proc entry is already gone by the time it tries.
670         pipe = subprocess.Popen(
671             [sys.executable, arv_put.__file__, '--stream'],
672             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
673             stderr=subprocess.STDOUT, env=self.ENVIRON)
674         pipe.stdin.write('stdin test\n')
675         pipe.stdin.close()
676         deadline = time.time() + 5
677         while (pipe.poll() is None) and (time.time() < deadline):
678             time.sleep(.1)
679         returncode = pipe.poll()
680         if returncode is None:
681             pipe.terminate()
682             self.fail("arv-put did not PUT from stdin within 5 seconds")
683         elif returncode != 0:
684             sys.stdout.write(pipe.stdout.read())
685             self.fail("arv-put returned exit code {}".format(returncode))
686         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
687
688     def test_ArvPutSignedManifest(self):
689         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
690         # the newly created manifest from the API server, testing to confirm
691         # that the block locators in the returned manifest are signed.
692         self.authorize_with('active')
693
694         # Before doing anything, demonstrate that the collection
695         # we're about to create is not present in our test fixture.
696         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
697         with self.assertRaises(apiclient.errors.HttpError):
698             notfound = arv_put.api_client.collections().get(
699                 uuid=manifest_uuid).execute()
700
701         datadir = self.make_tmpdir()
702         with open(os.path.join(datadir, "foo"), "w") as f:
703             f.write("The quick brown fox jumped over the lazy dog")
704         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
705                              stdout=subprocess.PIPE, env=self.ENVIRON)
706         (arvout, arverr) = p.communicate()
707         self.assertEqual(arverr, None)
708         self.assertEqual(p.returncode, 0)
709
710         # The manifest text stored in the API server under the same
711         # manifest UUID must use signed locators.
712         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
713         self.assertRegexpMatches(
714             c['manifest_text'],
715             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
716
717         os.remove(os.path.join(datadir, "foo"))
718         os.rmdir(datadir)
719
720     def run_and_find_collection(self, text, extra_args=[]):
721         self.authorize_with('active')
722         pipe = subprocess.Popen(
723             [sys.executable, arv_put.__file__] + extra_args,
724             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
725             stderr=subprocess.PIPE, env=self.ENVIRON)
726         stdout, stderr = pipe.communicate(text)
727         search_key = ('portable_data_hash'
728                       if '--portable-data-hash' in extra_args else 'uuid')
729         collection_list = arvados.api('v1').collections().list(
730             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
731         self.assertEqual(1, len(collection_list))
732         return collection_list[0]
733
734     def test_put_collection_with_high_redundancy(self):
735         # Write empty data: we're not testing CollectionWriter, just
736         # making sure collections.create tells the API server what our
737         # desired replication level is.
738         collection = self.run_and_find_collection("", ['--replication', '4'])
739         self.assertEqual(4, collection['replication_desired'])
740
741     def test_put_collection_with_default_redundancy(self):
742         collection = self.run_and_find_collection("")
743         self.assertEqual(None, collection['replication_desired'])
744
745     def test_put_collection_with_unnamed_project_link(self):
746         link = self.run_and_find_collection(
747             "Test unnamed collection",
748             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
749         username = pwd.getpwuid(os.getuid()).pw_name
750         self.assertRegexpMatches(
751             link['name'],
752             r'^Saved at .* by {}@'.format(re.escape(username)))
753
754     def test_put_collection_with_name_and_no_project(self):
755         link_name = 'Test Collection Link in home project'
756         collection = self.run_and_find_collection(
757             "Test named collection in home project",
758             ['--portable-data-hash', '--name', link_name])
759         self.assertEqual(link_name, collection['name'])
760         my_user_uuid = self.current_user()['uuid']
761         self.assertEqual(my_user_uuid, collection['owner_uuid'])
762
763     def test_put_collection_with_named_project_link(self):
764         link_name = 'Test auto Collection Link'
765         collection = self.run_and_find_collection("Test named collection",
766                                       ['--portable-data-hash',
767                                        '--name', link_name,
768                                        '--project-uuid', self.PROJECT_UUID])
769         self.assertEqual(link_name, collection['name'])
770
771
772 if __name__ == '__main__':
773     unittest.main()