9463: Lots of progress today, resume upload code written, many tests to do!
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import mock
6 import os
7 import pwd
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import tempfile
13 import time
14 import unittest
15 import yaml
16
17 from cStringIO import StringIO
18
19 import arvados
20 import arvados.commands.put as arv_put
21
22 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
23 import run_test_server
24
25 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
26     CACHE_ARGSET = [
27         [],
28         ['/dev/null'],
29         ['/dev/null', '--filename', 'empty'],
30         ['/tmp'],
31         ['/tmp', '--max-manifest-depth', '0'],
32         ['/tmp', '--max-manifest-depth', '1']
33         ]
34
35     def tearDown(self):
36         super(ArvadosPutResumeCacheTest, self).tearDown()
37         try:
38             self.last_cache.destroy()
39         except AttributeError:
40             pass
41
42     def cache_path_from_arglist(self, arglist):
43         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
44
45     def test_cache_names_stable(self):
46         for argset in self.CACHE_ARGSET:
47             self.assertEqual(self.cache_path_from_arglist(argset),
48                               self.cache_path_from_arglist(argset),
49                               "cache name changed for {}".format(argset))
50
51     def test_cache_names_unique(self):
52         results = []
53         for argset in self.CACHE_ARGSET:
54             path = self.cache_path_from_arglist(argset)
55             self.assertNotIn(path, results)
56             results.append(path)
57
58     def test_cache_names_simple(self):
59         # The goal here is to make sure the filename doesn't use characters
60         # reserved by the filesystem.  Feel free to adjust this regexp as
61         # long as it still does that.
62         bad_chars = re.compile(r'[^-\.\w]')
63         for argset in self.CACHE_ARGSET:
64             path = self.cache_path_from_arglist(argset)
65             self.assertFalse(bad_chars.search(os.path.basename(path)),
66                              "path too exotic: {}".format(path))
67
68     def test_cache_names_ignore_argument_order(self):
69         self.assertEqual(
70             self.cache_path_from_arglist(['a', 'b', 'c']),
71             self.cache_path_from_arglist(['c', 'a', 'b']))
72         self.assertEqual(
73             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
74             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
75
76     def test_cache_names_differ_for_similar_paths(self):
77         # This test needs names at / that don't exist on the real filesystem.
78         self.assertNotEqual(
79             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
80             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
81
82     def test_cache_names_ignore_irrelevant_arguments(self):
83         # Workaround: parse_arguments bails on --filename with a directory.
84         path1 = self.cache_path_from_arglist(['/tmp'])
85         args = arv_put.parse_arguments(['/tmp'])
86         args.filename = 'tmp'
87         path2 = arv_put.ResumeCache.make_path(args)
88         self.assertEqual(path1, path2,
89                          "cache path considered --filename for directory")
90         self.assertEqual(
91             self.cache_path_from_arglist(['-']),
92             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
93             "cache path considered --max-manifest-depth for file")
94
95     def test_cache_names_treat_negative_manifest_depths_identically(self):
96         base_args = ['/tmp', '--max-manifest-depth']
97         self.assertEqual(
98             self.cache_path_from_arglist(base_args + ['-1']),
99             self.cache_path_from_arglist(base_args + ['-2']))
100
101     def test_cache_names_treat_stdin_consistently(self):
102         self.assertEqual(
103             self.cache_path_from_arglist(['-', '--filename', 'test']),
104             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
105
106     def test_cache_names_identical_for_synonymous_names(self):
107         self.assertEqual(
108             self.cache_path_from_arglist(['.']),
109             self.cache_path_from_arglist([os.path.realpath('.')]))
110         testdir = self.make_tmpdir()
111         looplink = os.path.join(testdir, 'loop')
112         os.symlink(testdir, looplink)
113         self.assertEqual(
114             self.cache_path_from_arglist([testdir]),
115             self.cache_path_from_arglist([looplink]))
116
117     def test_cache_names_different_by_api_host(self):
118         config = arvados.config.settings()
119         orig_host = config.get('ARVADOS_API_HOST')
120         try:
121             name1 = self.cache_path_from_arglist(['.'])
122             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
123             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
124         finally:
125             if orig_host is None:
126                 del config['ARVADOS_API_HOST']
127             else:
128                 config['ARVADOS_API_HOST'] = orig_host
129
130     @mock.patch('arvados.keep.KeepClient.head')
131     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
132         keep_client_head.side_effect = [True]
133         thing = {}
134         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
135         with tempfile.NamedTemporaryFile() as cachefile:
136             self.last_cache = arv_put.ResumeCache(cachefile.name)
137         self.last_cache.save(thing)
138         self.last_cache.close()
139         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
140         self.assertNotEqual(None, resume_cache)
141
142     @mock.patch('arvados.keep.KeepClient.head')
143     def test_resume_cache_with_finished_streams(self, keep_client_head):
144         keep_client_head.side_effect = [True]
145         thing = {}
146         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
147         with tempfile.NamedTemporaryFile() as cachefile:
148             self.last_cache = arv_put.ResumeCache(cachefile.name)
149         self.last_cache.save(thing)
150         self.last_cache.close()
151         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
152         self.assertNotEqual(None, resume_cache)
153
154     @mock.patch('arvados.keep.KeepClient.head')
155     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
156         keep_client_head.side_effect = Exception('Locator not found')
157         thing = {}
158         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
159         with tempfile.NamedTemporaryFile() as cachefile:
160             self.last_cache = arv_put.ResumeCache(cachefile.name)
161         self.last_cache.save(thing)
162         self.last_cache.close()
163         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
164         self.assertNotEqual(None, resume_cache)
165         self.assertRaises(None, resume_cache.check_cache())
166
167     def test_basic_cache_storage(self):
168         thing = ['test', 'list']
169         with tempfile.NamedTemporaryFile() as cachefile:
170             self.last_cache = arv_put.ResumeCache(cachefile.name)
171         self.last_cache.save(thing)
172         self.assertEqual(thing, self.last_cache.load())
173
174     def test_empty_cache(self):
175         with tempfile.NamedTemporaryFile() as cachefile:
176             cache = arv_put.ResumeCache(cachefile.name)
177         self.assertRaises(ValueError, cache.load)
178
179     def test_cache_persistent(self):
180         thing = ['test', 'list']
181         path = os.path.join(self.make_tmpdir(), 'cache')
182         cache = arv_put.ResumeCache(path)
183         cache.save(thing)
184         cache.close()
185         self.last_cache = arv_put.ResumeCache(path)
186         self.assertEqual(thing, self.last_cache.load())
187
188     def test_multiple_cache_writes(self):
189         thing = ['short', 'list']
190         with tempfile.NamedTemporaryFile() as cachefile:
191             self.last_cache = arv_put.ResumeCache(cachefile.name)
192         # Start writing an object longer than the one we test, to make
193         # sure the cache file gets truncated.
194         self.last_cache.save(['long', 'long', 'list'])
195         self.last_cache.save(thing)
196         self.assertEqual(thing, self.last_cache.load())
197
198     def test_cache_is_locked(self):
199         with tempfile.NamedTemporaryFile() as cachefile:
200             cache = arv_put.ResumeCache(cachefile.name)
201             self.assertRaises(arv_put.ResumeCacheConflict,
202                               arv_put.ResumeCache, cachefile.name)
203
204     def test_cache_stays_locked(self):
205         with tempfile.NamedTemporaryFile() as cachefile:
206             self.last_cache = arv_put.ResumeCache(cachefile.name)
207             path = cachefile.name
208         self.last_cache.save('test')
209         self.assertRaises(arv_put.ResumeCacheConflict,
210                           arv_put.ResumeCache, path)
211
212     def test_destroy_cache(self):
213         cachefile = tempfile.NamedTemporaryFile(delete=False)
214         try:
215             cache = arv_put.ResumeCache(cachefile.name)
216             cache.save('test')
217             cache.destroy()
218             try:
219                 arv_put.ResumeCache(cachefile.name)
220             except arv_put.ResumeCacheConflict:
221                 self.fail("could not load cache after destroying it")
222             self.assertRaises(ValueError, cache.load)
223         finally:
224             if os.path.exists(cachefile.name):
225                 os.unlink(cachefile.name)
226
227     def test_restart_cache(self):
228         path = os.path.join(self.make_tmpdir(), 'cache')
229         cache = arv_put.ResumeCache(path)
230         cache.save('test')
231         cache.restart()
232         self.assertRaises(ValueError, cache.load)
233         self.assertRaises(arv_put.ResumeCacheConflict,
234                           arv_put.ResumeCache, path)
235
236
237 class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
238     MAIN_SERVER = {}
239     KEEP_SERVER = {}
240     import shutil
241         
242     # def test_write_files(self):
243     #     c = arv_put.ArvPutCollection()
244     #     data = 'a' * 1024 * 1024 # 1 MB
245     #     tmpdir = tempfile.mkdtemp()
246     #     for size in [1, 10, 64, 128]:
247     #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
248     #             for _ in range(size):
249     #                 f.write(data)
250     #         c.write_file(f.name, os.path.basename(f.name))
251     #     shutil.rmtree(tmpdir)
252     #     self.assertEqual(True, c.manifest())
253     #
254     # def test_write_directory(self):
255     #     data = 'b' * 1024 * 1024
256     #     tmpdir = tempfile.mkdtemp()
257     #     for size in [1, 5, 10, 70]:
258     #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
259     #             for _ in range(size):
260     #                 f.write(data)
261     #     os.mkdir(os.path.join(tmpdir, 'subdir1'))
262     #     for size in [2, 4, 6]:
263     #         with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
264     #             for _ in range(size):
265     #                 f.write(data)
266     #     c = arv_put.ArvPutUploader([tmpdir])
267     #     shutil.rmtree(tmpdir)
268     #     self.assertEqual(True, c.manifest())
269
270     def test_write_directory_twice(self):
271         data = 'b' * 1024 * 1024
272         tmpdir = tempfile.mkdtemp()
273         for size in [1, 5, 10, 70]:
274             with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
275                 for _ in range(size):
276                     f.write(data)
277         os.mkdir(os.path.join(tmpdir, 'subdir1'))
278         for size in [2, 4, 6]:
279             with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
280                 for _ in range(size):
281                     f.write(data)
282         c = arv_put.ArvPutUploader([tmpdir])
283         d = arv_put.ArvPutUploader([tmpdir])
284         print "ESCRIDIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
285         shutil.rmtree(tmpdir)
286         self.assertEqual(0, d.bytes_written())
287         
288
289 class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
290                                      ArvadosBaseTestCase):
291     def setUp(self):
292         super(ArvadosPutCollectionWriterTest, self).setUp()
293         run_test_server.authorize_with('active')
294         with tempfile.NamedTemporaryFile(delete=False) as cachefile:
295             self.cache = arv_put.ResumeCache(cachefile.name)
296             self.cache_filename = cachefile.name
297
298     def tearDown(self):
299         super(ArvadosPutCollectionWriterTest, self).tearDown()
300         if os.path.exists(self.cache_filename):
301             self.cache.destroy()
302         self.cache.close()
303
304     def test_writer_caches(self):
305         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
306         cwriter.write_file('/dev/null')
307         cwriter.cache_state()
308         self.assertTrue(self.cache.load())
309         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
310
311     def test_writer_works_without_cache(self):
312         cwriter = arv_put.ArvPutCollectionWriter()
313         cwriter.write_file('/dev/null')
314         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
315
316     def test_writer_resumes_from_cache(self):
317         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
318         with self.make_test_file() as testfile:
319             cwriter.write_file(testfile.name, 'test')
320             cwriter.cache_state()
321             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
322                 self.cache)
323             self.assertEqual(
324                 ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
325                 new_writer.manifest_text())
326
327     def test_new_writer_from_stale_cache(self):
328         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
329         with self.make_test_file() as testfile:
330             cwriter.write_file(testfile.name, 'test')
331         new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
332         new_writer.write_file('/dev/null')
333         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
334
335     def test_new_writer_from_empty_cache(self):
336         cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
337         cwriter.write_file('/dev/null')
338         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
339
340     def test_writer_resumable_after_arbitrary_bytes(self):
341         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
342         # These bytes are intentionally not valid UTF-8.
343         with self.make_test_file('\x00\x07\xe2') as testfile:
344             cwriter.write_file(testfile.name, 'test')
345             cwriter.cache_state()
346             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
347                 self.cache)
348         self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
349
350     def make_progress_tester(self):
351         progression = []
352         def record_func(written, expected):
353             progression.append((written, expected))
354         return progression, record_func
355
356     def test_progress_reporting(self):
357         for expect_count in (None, 8):
358             progression, reporter = self.make_progress_tester()
359             cwriter = arv_put.ArvPutCollectionWriter(
360                 reporter=reporter, bytes_expected=expect_count)
361             with self.make_test_file() as testfile:
362                 cwriter.write_file(testfile.name, 'test')
363             cwriter.finish_current_stream()
364             self.assertIn((4, expect_count), progression)
365
366     def test_resume_progress(self):
367         cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
368         with self.make_test_file() as testfile:
369             # Set up a writer with some flushed bytes.
370             cwriter.write_file(testfile.name, 'test')
371             cwriter.finish_current_stream()
372             cwriter.cache_state()
373             new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
374             self.assertEqual(new_writer.bytes_written, 4)
375
376
377 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
378     TEST_SIZE = os.path.getsize(__file__)
379
380     def test_expected_bytes_for_file(self):
381         self.assertEqual(self.TEST_SIZE,
382                           arv_put.expected_bytes_for([__file__]))
383
384     def test_expected_bytes_for_tree(self):
385         tree = self.make_tmpdir()
386         shutil.copyfile(__file__, os.path.join(tree, 'one'))
387         shutil.copyfile(__file__, os.path.join(tree, 'two'))
388         self.assertEqual(self.TEST_SIZE * 2,
389                           arv_put.expected_bytes_for([tree]))
390         self.assertEqual(self.TEST_SIZE * 3,
391                           arv_put.expected_bytes_for([tree, __file__]))
392
393     def test_expected_bytes_for_device(self):
394         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
395         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
396
397
398 class ArvadosPutReportTest(ArvadosBaseTestCase):
399     def test_machine_progress(self):
400         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
401             expect = ": {} written {} total\n".format(
402                 count, -1 if (total is None) else total)
403             self.assertTrue(
404                 arv_put.machine_progress(count, total).endswith(expect))
405
406     def test_known_human_progress(self):
407         for count, total in [(0, 1), (2, 4), (45, 60)]:
408             expect = '{:.1%}'.format(float(count) / total)
409             actual = arv_put.human_progress(count, total)
410             self.assertTrue(actual.startswith('\r'))
411             self.assertIn(expect, actual)
412
413     def test_unknown_human_progress(self):
414         for count in [1, 20, 300, 4000, 50000]:
415             self.assertTrue(re.search(r'\b{}\b'.format(count),
416                                       arv_put.human_progress(count, None)))
417
418
419 class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
420     MAIN_SERVER = {}
421     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
422
423     def call_main_with_args(self, args):
424         self.main_stdout = StringIO()
425         self.main_stderr = StringIO()
426         return arv_put.main(args, self.main_stdout, self.main_stderr)
427
428     def call_main_on_test_file(self, args=[]):
429         with self.make_test_file() as testfile:
430             path = testfile.name
431             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
432         self.assertTrue(
433             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
434                                         '098f6bcd4621d373cade4e832627b4f6')),
435             "did not find file stream in Keep store")
436
437     def setUp(self):
438         super(ArvadosPutTest, self).setUp()
439         run_test_server.authorize_with('active')
440         arv_put.api_client = None
441
442     def tearDown(self):
443         for outbuf in ['main_stdout', 'main_stderr']:
444             if hasattr(self, outbuf):
445                 getattr(self, outbuf).close()
446                 delattr(self, outbuf)
447         super(ArvadosPutTest, self).tearDown()
448
449     def test_simple_file_put(self):
450         self.call_main_on_test_file()
451
452     def test_put_with_unwriteable_cache_dir(self):
453         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
454         cachedir = self.make_tmpdir()
455         os.chmod(cachedir, 0o0)
456         arv_put.ResumeCache.CACHE_DIR = cachedir
457         try:
458             self.call_main_on_test_file()
459         finally:
460             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
461             os.chmod(cachedir, 0o700)
462
463     def test_put_with_unwritable_cache_subdir(self):
464         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
465         cachedir = self.make_tmpdir()
466         os.chmod(cachedir, 0o0)
467         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
468         try:
469             self.call_main_on_test_file()
470         finally:
471             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
472             os.chmod(cachedir, 0o700)
473
474     def test_put_block_replication(self):
475         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
476              mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
477             cache_mock.side_effect = ValueError
478             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
479             self.call_main_on_test_file(['--replication', '1'])
480             self.call_main_on_test_file(['--replication', '4'])
481             self.call_main_on_test_file(['--replication', '5'])
482             self.assertEqual(
483                 [x[-1].get('copies') for x in put_mock.call_args_list],
484                 [1, 4, 5])
485
486     def test_normalize(self):
487         testfile1 = self.make_test_file()
488         testfile2 = self.make_test_file()
489         test_paths = [testfile1.name, testfile2.name]
490         # Reverse-sort the paths, so normalization must change their order.
491         test_paths.sort(reverse=True)
492         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
493                                  test_paths)
494         manifest = self.main_stdout.getvalue()
495         # Assert the second file we specified appears first in the manifest.
496         file_indices = [manifest.find(':' + os.path.basename(path))
497                         for path in test_paths]
498         self.assertGreater(*file_indices)
499
500     def test_error_name_without_collection(self):
501         self.assertRaises(SystemExit, self.call_main_with_args,
502                           ['--name', 'test without Collection',
503                            '--stream', '/dev/null'])
504
505     def test_error_when_project_not_found(self):
506         self.assertRaises(SystemExit,
507                           self.call_main_with_args,
508                           ['--project-uuid', self.Z_UUID])
509
510     def test_error_bad_project_uuid(self):
511         self.assertRaises(SystemExit,
512                           self.call_main_with_args,
513                           ['--project-uuid', self.Z_UUID, '--stream'])
514
515     def test_api_error_handling(self):
516         collections_mock = mock.Mock(name='arv.collections()')
517         coll_create_mock = collections_mock().create().execute
518         coll_create_mock.side_effect = arvados.errors.ApiError(
519             fake_httplib2_response(403), '{}')
520         arv_put.api_client = arvados.api('v1')
521         arv_put.api_client.collections = collections_mock
522         with self.assertRaises(SystemExit) as exc_test:
523             self.call_main_with_args(['/dev/null'])
524         self.assertLess(0, exc_test.exception.args[0])
525         self.assertLess(0, coll_create_mock.call_count)
526         self.assertEqual("", self.main_stdout.getvalue())
527
528
529 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
530                             ArvadosBaseTestCase):
531     def _getKeepServerConfig():
532         for config_file, mandatory in [
533                 ['application.yml', False], ['application.default.yml', True]]:
534             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
535                                 "api", "config", config_file)
536             if not mandatory and not os.path.exists(path):
537                 continue
538             with open(path) as f:
539                 rails_config = yaml.load(f.read())
540                 for config_section in ['test', 'common']:
541                     try:
542                         key = rails_config[config_section]["blob_signing_key"]
543                     except (KeyError, TypeError):
544                         pass
545                     else:
546                         return {'blob_signing_key': key,
547                                 'enforce_permissions': True}
548         return {'blog_signing_key': None, 'enforce_permissions': False}
549
550     MAIN_SERVER = {}
551     KEEP_SERVER = _getKeepServerConfig()
552     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
553
554     @classmethod
555     def setUpClass(cls):
556         super(ArvPutIntegrationTest, cls).setUpClass()
557         cls.ENVIRON = os.environ.copy()
558         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
559
560     def setUp(self):
561         super(ArvPutIntegrationTest, self).setUp()
562         arv_put.api_client = None
563
564     def authorize_with(self, token_name):
565         run_test_server.authorize_with(token_name)
566         for v in ["ARVADOS_API_HOST",
567                   "ARVADOS_API_HOST_INSECURE",
568                   "ARVADOS_API_TOKEN"]:
569             self.ENVIRON[v] = arvados.config.settings()[v]
570         arv_put.api_client = arvados.api('v1')
571
572     def current_user(self):
573         return arv_put.api_client.users().current().execute()
574
575     def test_check_real_project_found(self):
576         self.authorize_with('active')
577         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
578                         "did not correctly find test fixture project")
579
580     def test_check_error_finding_nonexistent_uuid(self):
581         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
582         self.authorize_with('active')
583         try:
584             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
585                                                   0)
586         except ValueError as error:
587             self.assertIn(BAD_UUID, error.message)
588         else:
589             self.assertFalse(result, "incorrectly found nonexistent project")
590
591     def test_check_error_finding_nonexistent_project(self):
592         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
593         self.authorize_with('active')
594         with self.assertRaises(apiclient.errors.HttpError):
595             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
596                                                   0)
597
598     def test_short_put_from_stdin(self):
599         # Have to run this as an integration test since arv-put can't
600         # read from the tests' stdin.
601         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
602         # case, because the /proc entry is already gone by the time it tries.
603         pipe = subprocess.Popen(
604             [sys.executable, arv_put.__file__, '--stream'],
605             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
606             stderr=subprocess.STDOUT, env=self.ENVIRON)
607         pipe.stdin.write('stdin test\n')
608         pipe.stdin.close()
609         deadline = time.time() + 5
610         while (pipe.poll() is None) and (time.time() < deadline):
611             time.sleep(.1)
612         returncode = pipe.poll()
613         if returncode is None:
614             pipe.terminate()
615             self.fail("arv-put did not PUT from stdin within 5 seconds")
616         elif returncode != 0:
617             sys.stdout.write(pipe.stdout.read())
618             self.fail("arv-put returned exit code {}".format(returncode))
619         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
620
621     def test_ArvPutSignedManifest(self):
622         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
623         # the newly created manifest from the API server, testing to confirm
624         # that the block locators in the returned manifest are signed.
625         self.authorize_with('active')
626
627         # Before doing anything, demonstrate that the collection
628         # we're about to create is not present in our test fixture.
629         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
630         with self.assertRaises(apiclient.errors.HttpError):
631             notfound = arv_put.api_client.collections().get(
632                 uuid=manifest_uuid).execute()
633
634         datadir = self.make_tmpdir()
635         with open(os.path.join(datadir, "foo"), "w") as f:
636             f.write("The quick brown fox jumped over the lazy dog")
637         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
638                              stdout=subprocess.PIPE, env=self.ENVIRON)
639         (arvout, arverr) = p.communicate()
640         self.assertEqual(arverr, None)
641         self.assertEqual(p.returncode, 0)
642
643         # The manifest text stored in the API server under the same
644         # manifest UUID must use signed locators.
645         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
646         self.assertRegexpMatches(
647             c['manifest_text'],
648             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
649
650         os.remove(os.path.join(datadir, "foo"))
651         os.rmdir(datadir)
652
653     def run_and_find_collection(self, text, extra_args=[]):
654         self.authorize_with('active')
655         pipe = subprocess.Popen(
656             [sys.executable, arv_put.__file__] + extra_args,
657             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
658             stderr=subprocess.PIPE, env=self.ENVIRON)
659         stdout, stderr = pipe.communicate(text)
660         search_key = ('portable_data_hash'
661                       if '--portable-data-hash' in extra_args else 'uuid')
662         collection_list = arvados.api('v1').collections().list(
663             filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
664         self.assertEqual(1, len(collection_list))
665         return collection_list[0]
666
667     def test_put_collection_with_high_redundancy(self):
668         # Write empty data: we're not testing CollectionWriter, just
669         # making sure collections.create tells the API server what our
670         # desired replication level is.
671         collection = self.run_and_find_collection("", ['--replication', '4'])
672         self.assertEqual(4, collection['replication_desired'])
673
674     def test_put_collection_with_default_redundancy(self):
675         collection = self.run_and_find_collection("")
676         self.assertEqual(None, collection['replication_desired'])
677
678     def test_put_collection_with_unnamed_project_link(self):
679         link = self.run_and_find_collection(
680             "Test unnamed collection",
681             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
682         username = pwd.getpwuid(os.getuid()).pw_name
683         self.assertRegexpMatches(
684             link['name'],
685             r'^Saved at .* by {}@'.format(re.escape(username)))
686
687     def test_put_collection_with_name_and_no_project(self):
688         link_name = 'Test Collection Link in home project'
689         collection = self.run_and_find_collection(
690             "Test named collection in home project",
691             ['--portable-data-hash', '--name', link_name])
692         self.assertEqual(link_name, collection['name'])
693         my_user_uuid = self.current_user()['uuid']
694         self.assertEqual(my_user_uuid, collection['owner_uuid'])
695
696     def test_put_collection_with_named_project_link(self):
697         link_name = 'Test auto Collection Link'
698         collection = self.run_and_find_collection("Test named collection",
699                                       ['--portable-data-hash',
700                                        '--name', link_name,
701                                        '--project-uuid', self.PROJECT_UUID])
702         self.assertEqual(link_name, collection['name'])
703
704
705 if __name__ == '__main__':
706     unittest.main()