9945: Merge branch 'master' into 9945-make-python-package-dependency-free
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from functools import partial
12 import apiclient
13 import datetime
14 import hashlib
15 import json
16 import logging
17 import mock
18 import os
19 import pwd
20 import random
21 import re
22 import select
23 import shutil
24 import signal
25 import subprocess
26 import sys
27 import tempfile
28 import time
29 import unittest
30 import uuid
31 import yaml
32
33 import arvados
34 import arvados.commands.put as arv_put
35 from . import arvados_testutil as tutil
36
37 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
38 from . import run_test_server
39
40 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
41     CACHE_ARGSET = [
42         [],
43         ['/dev/null'],
44         ['/dev/null', '--filename', 'empty'],
45         ['/tmp']
46         ]
47
48     def tearDown(self):
49         super(ArvadosPutResumeCacheTest, self).tearDown()
50         try:
51             self.last_cache.destroy()
52         except AttributeError:
53             pass
54
55     def cache_path_from_arglist(self, arglist):
56         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
57
58     def test_cache_names_stable(self):
59         for argset in self.CACHE_ARGSET:
60             self.assertEqual(self.cache_path_from_arglist(argset),
61                               self.cache_path_from_arglist(argset),
62                               "cache name changed for {}".format(argset))
63
64     def test_cache_names_unique(self):
65         results = []
66         for argset in self.CACHE_ARGSET:
67             path = self.cache_path_from_arglist(argset)
68             self.assertNotIn(path, results)
69             results.append(path)
70
71     def test_cache_names_simple(self):
72         # The goal here is to make sure the filename doesn't use characters
73         # reserved by the filesystem.  Feel free to adjust this regexp as
74         # long as it still does that.
75         bad_chars = re.compile(r'[^-\.\w]')
76         for argset in self.CACHE_ARGSET:
77             path = self.cache_path_from_arglist(argset)
78             self.assertFalse(bad_chars.search(os.path.basename(path)),
79                              "path too exotic: {}".format(path))
80
81     def test_cache_names_ignore_argument_order(self):
82         self.assertEqual(
83             self.cache_path_from_arglist(['a', 'b', 'c']),
84             self.cache_path_from_arglist(['c', 'a', 'b']))
85         self.assertEqual(
86             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
87             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
88
89     def test_cache_names_differ_for_similar_paths(self):
90         # This test needs names at / that don't exist on the real filesystem.
91         self.assertNotEqual(
92             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
93             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
94
95     def test_cache_names_ignore_irrelevant_arguments(self):
96         # Workaround: parse_arguments bails on --filename with a directory.
97         path1 = self.cache_path_from_arglist(['/tmp'])
98         args = arv_put.parse_arguments(['/tmp'])
99         args.filename = 'tmp'
100         path2 = arv_put.ResumeCache.make_path(args)
101         self.assertEqual(path1, path2,
102                          "cache path considered --filename for directory")
103         self.assertEqual(
104             self.cache_path_from_arglist(['-']),
105             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
106             "cache path considered --max-manifest-depth for file")
107
108     def test_cache_names_treat_negative_manifest_depths_identically(self):
109         base_args = ['/tmp', '--max-manifest-depth']
110         self.assertEqual(
111             self.cache_path_from_arglist(base_args + ['-1']),
112             self.cache_path_from_arglist(base_args + ['-2']))
113
114     def test_cache_names_treat_stdin_consistently(self):
115         self.assertEqual(
116             self.cache_path_from_arglist(['-', '--filename', 'test']),
117             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
118
119     def test_cache_names_identical_for_synonymous_names(self):
120         self.assertEqual(
121             self.cache_path_from_arglist(['.']),
122             self.cache_path_from_arglist([os.path.realpath('.')]))
123         testdir = self.make_tmpdir()
124         looplink = os.path.join(testdir, 'loop')
125         os.symlink(testdir, looplink)
126         self.assertEqual(
127             self.cache_path_from_arglist([testdir]),
128             self.cache_path_from_arglist([looplink]))
129
130     def test_cache_names_different_by_api_host(self):
131         config = arvados.config.settings()
132         orig_host = config.get('ARVADOS_API_HOST')
133         try:
134             name1 = self.cache_path_from_arglist(['.'])
135             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
136             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
137         finally:
138             if orig_host is None:
139                 del config['ARVADOS_API_HOST']
140             else:
141                 config['ARVADOS_API_HOST'] = orig_host
142
143     @mock.patch('arvados.keep.KeepClient.head')
144     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
145         keep_client_head.side_effect = [True]
146         thing = {}
147         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
148         with tempfile.NamedTemporaryFile() as cachefile:
149             self.last_cache = arv_put.ResumeCache(cachefile.name)
150         self.last_cache.save(thing)
151         self.last_cache.close()
152         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
153         self.assertNotEqual(None, resume_cache)
154
155     @mock.patch('arvados.keep.KeepClient.head')
156     def test_resume_cache_with_finished_streams(self, keep_client_head):
157         keep_client_head.side_effect = [True]
158         thing = {}
159         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
160         with tempfile.NamedTemporaryFile() as cachefile:
161             self.last_cache = arv_put.ResumeCache(cachefile.name)
162         self.last_cache.save(thing)
163         self.last_cache.close()
164         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
165         self.assertNotEqual(None, resume_cache)
166
167     @mock.patch('arvados.keep.KeepClient.head')
168     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
169         keep_client_head.side_effect = Exception('Locator not found')
170         thing = {}
171         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
172         with tempfile.NamedTemporaryFile() as cachefile:
173             self.last_cache = arv_put.ResumeCache(cachefile.name)
174         self.last_cache.save(thing)
175         self.last_cache.close()
176         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
177         self.assertNotEqual(None, resume_cache)
178         resume_cache.check_cache()
179
180     def test_basic_cache_storage(self):
181         thing = ['test', 'list']
182         with tempfile.NamedTemporaryFile() as cachefile:
183             self.last_cache = arv_put.ResumeCache(cachefile.name)
184         self.last_cache.save(thing)
185         self.assertEqual(thing, self.last_cache.load())
186
187     def test_empty_cache(self):
188         with tempfile.NamedTemporaryFile() as cachefile:
189             cache = arv_put.ResumeCache(cachefile.name)
190         self.assertRaises(ValueError, cache.load)
191
192     def test_cache_persistent(self):
193         thing = ['test', 'list']
194         path = os.path.join(self.make_tmpdir(), 'cache')
195         cache = arv_put.ResumeCache(path)
196         cache.save(thing)
197         cache.close()
198         self.last_cache = arv_put.ResumeCache(path)
199         self.assertEqual(thing, self.last_cache.load())
200
201     def test_multiple_cache_writes(self):
202         thing = ['short', 'list']
203         with tempfile.NamedTemporaryFile() as cachefile:
204             self.last_cache = arv_put.ResumeCache(cachefile.name)
205         # Start writing an object longer than the one we test, to make
206         # sure the cache file gets truncated.
207         self.last_cache.save(['long', 'long', 'list'])
208         self.last_cache.save(thing)
209         self.assertEqual(thing, self.last_cache.load())
210
211     def test_cache_is_locked(self):
212         with tempfile.NamedTemporaryFile() as cachefile:
213             cache = arv_put.ResumeCache(cachefile.name)
214             self.assertRaises(arv_put.ResumeCacheConflict,
215                               arv_put.ResumeCache, cachefile.name)
216
217     def test_cache_stays_locked(self):
218         with tempfile.NamedTemporaryFile() as cachefile:
219             self.last_cache = arv_put.ResumeCache(cachefile.name)
220             path = cachefile.name
221         self.last_cache.save('test')
222         self.assertRaises(arv_put.ResumeCacheConflict,
223                           arv_put.ResumeCache, path)
224
225     def test_destroy_cache(self):
226         cachefile = tempfile.NamedTemporaryFile(delete=False)
227         try:
228             cache = arv_put.ResumeCache(cachefile.name)
229             cache.save('test')
230             cache.destroy()
231             try:
232                 arv_put.ResumeCache(cachefile.name)
233             except arv_put.ResumeCacheConflict:
234                 self.fail("could not load cache after destroying it")
235             self.assertRaises(ValueError, cache.load)
236         finally:
237             if os.path.exists(cachefile.name):
238                 os.unlink(cachefile.name)
239
240     def test_restart_cache(self):
241         path = os.path.join(self.make_tmpdir(), 'cache')
242         cache = arv_put.ResumeCache(path)
243         cache.save('test')
244         cache.restart()
245         self.assertRaises(ValueError, cache.load)
246         self.assertRaises(arv_put.ResumeCacheConflict,
247                           arv_put.ResumeCache, path)
248
249
250 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
251                           ArvadosBaseTestCase):
252
253     def setUp(self):
254         super(ArvPutUploadJobTest, self).setUp()
255         run_test_server.authorize_with('active')
256         # Temp files creation
257         self.tempdir = tempfile.mkdtemp()
258         subdir = os.path.join(self.tempdir, 'subdir')
259         os.mkdir(subdir)
260         data = "x" * 1024 # 1 KB
261         for i in range(1, 5):
262             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
263                 f.write(data * i)
264         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
265             f.write(data * 5)
266         # Large temp file for resume test
267         _, self.large_file_name = tempfile.mkstemp()
268         fileobj = open(self.large_file_name, 'w')
269         # Make sure to write just a little more than one block
270         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
271             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
272             fileobj.write(data)
273         fileobj.close()
274         # Temp dir containing small files to be repacked
275         self.small_files_dir = tempfile.mkdtemp()
276         data = 'y' * 1024 * 1024 # 1 MB
277         for i in range(1, 70):
278             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
279                 f.write(data + str(i))
280         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
281         # Temp dir to hold a symlink to other temp dir
282         self.tempdir_with_symlink = tempfile.mkdtemp()
283         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
284         os.symlink(os.path.join(self.tempdir, '1'),
285                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
286
287     def tearDown(self):
288         super(ArvPutUploadJobTest, self).tearDown()
289         shutil.rmtree(self.tempdir)
290         os.unlink(self.large_file_name)
291         shutil.rmtree(self.small_files_dir)
292         shutil.rmtree(self.tempdir_with_symlink)
293
294     def test_symlinks_are_followed_by_default(self):
295         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
296         cwriter.start(save_collection=False)
297         self.assertIn('linkeddir', cwriter.manifest_text())
298         self.assertIn('linkedfile', cwriter.manifest_text())
299         cwriter.destroy_cache()
300
301     def test_symlinks_are_not_followed_when_requested(self):
302         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
303                                           follow_links=False)
304         cwriter.start(save_collection=False)
305         self.assertNotIn('linkeddir', cwriter.manifest_text())
306         self.assertNotIn('linkedfile', cwriter.manifest_text())
307         cwriter.destroy_cache()
308
309     def test_passing_nonexistant_path_raise_exception(self):
310         uuid_str = str(uuid.uuid4())
311         with self.assertRaises(arv_put.PathDoesNotExistError):
312             cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
313
314     def test_writer_works_without_cache(self):
315         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
316         cwriter.start(save_collection=False)
317         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
318
319     def test_writer_works_with_cache(self):
320         with tempfile.NamedTemporaryFile() as f:
321             f.write(b'foo')
322             f.flush()
323             cwriter = arv_put.ArvPutUploadJob([f.name])
324             cwriter.start(save_collection=False)
325             self.assertEqual(0, cwriter.bytes_skipped)
326             self.assertEqual(3, cwriter.bytes_written)
327             # Don't destroy the cache, and start another upload
328             cwriter_new = arv_put.ArvPutUploadJob([f.name])
329             cwriter_new.start(save_collection=False)
330             cwriter_new.destroy_cache()
331             self.assertEqual(3, cwriter_new.bytes_skipped)
332             self.assertEqual(3, cwriter_new.bytes_written)
333
334     def make_progress_tester(self):
335         progression = []
336         def record_func(written, expected):
337             progression.append((written, expected))
338         return progression, record_func
339
340     def test_progress_reporting(self):
341         with tempfile.NamedTemporaryFile() as f:
342             f.write(b'foo')
343             f.flush()
344             for expect_count in (None, 8):
345                 progression, reporter = self.make_progress_tester()
346                 cwriter = arv_put.ArvPutUploadJob([f.name],
347                                                   reporter=reporter)
348                 cwriter.bytes_expected = expect_count
349                 cwriter.start(save_collection=False)
350                 cwriter.destroy_cache()
351                 self.assertIn((3, expect_count), progression)
352
353     def test_writer_upload_directory(self):
354         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
355         cwriter.start(save_collection=False)
356         cwriter.destroy_cache()
357         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
358
359     def test_resume_large_file_upload(self):
360         def wrapped_write(*args, **kwargs):
361             data = args[1]
362             # Exit only on last block
363             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
364                 # Simulate a checkpoint before quitting. Ensure block commit.
365                 self.writer._update(final=True)
366                 raise SystemExit("Simulated error")
367             return self.arvfile_write(*args, **kwargs)
368
369         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
370                         autospec=True) as mocked_write:
371             mocked_write.side_effect = wrapped_write
372             writer = arv_put.ArvPutUploadJob([self.large_file_name],
373                                              replication_desired=1)
374             # We'll be accessing from inside the wrapper
375             self.writer = writer
376             with self.assertRaises(SystemExit):
377                 writer.start(save_collection=False)
378             # Confirm that the file was partially uploaded
379             self.assertGreater(writer.bytes_written, 0)
380             self.assertLess(writer.bytes_written,
381                             os.path.getsize(self.large_file_name))
382         # Retry the upload
383         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
384                                           replication_desired=1)
385         writer2.start(save_collection=False)
386         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
387                          os.path.getsize(self.large_file_name))
388         writer2.destroy_cache()
389         del(self.writer)
390
391     # Test for bug #11002
392     def test_graceful_exit_while_repacking_small_blocks(self):
393         def wrapped_commit(*args, **kwargs):
394             raise SystemExit("Simulated error")
395
396         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
397                         autospec=True) as mocked_commit:
398             mocked_commit.side_effect = wrapped_commit
399             # Upload a little more than 1 block, wrapped_commit will make the first block
400             # commit to fail.
401             # arv-put should not exit with an exception by trying to commit the collection
402             # as it's in an inconsistent state.
403             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
404                                              replication_desired=1)
405             try:
406                 with self.assertRaises(SystemExit):
407                     writer.start(save_collection=False)
408             except arvados.arvfile.UnownedBlockError:
409                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
410         writer.destroy_cache()
411
412     def test_no_resume_when_asked(self):
413         def wrapped_write(*args, **kwargs):
414             data = args[1]
415             # Exit only on last block
416             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
417                 # Simulate a checkpoint before quitting.
418                 self.writer._update()
419                 raise SystemExit("Simulated error")
420             return self.arvfile_write(*args, **kwargs)
421
422         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
423                         autospec=True) as mocked_write:
424             mocked_write.side_effect = wrapped_write
425             writer = arv_put.ArvPutUploadJob([self.large_file_name],
426                                              replication_desired=1)
427             # We'll be accessing from inside the wrapper
428             self.writer = writer
429             with self.assertRaises(SystemExit):
430                 writer.start(save_collection=False)
431             # Confirm that the file was partially uploaded
432             self.assertGreater(writer.bytes_written, 0)
433             self.assertLess(writer.bytes_written,
434                             os.path.getsize(self.large_file_name))
435         # Retry the upload, this time without resume
436         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
437                                           replication_desired=1,
438                                           resume=False)
439         writer2.start(save_collection=False)
440         self.assertEqual(writer2.bytes_skipped, 0)
441         self.assertEqual(writer2.bytes_written,
442                          os.path.getsize(self.large_file_name))
443         writer2.destroy_cache()
444         del(self.writer)
445
446     def test_no_resume_when_no_cache(self):
447         def wrapped_write(*args, **kwargs):
448             data = args[1]
449             # Exit only on last block
450             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
451                 # Simulate a checkpoint before quitting.
452                 self.writer._update()
453                 raise SystemExit("Simulated error")
454             return self.arvfile_write(*args, **kwargs)
455
456         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
457                         autospec=True) as mocked_write:
458             mocked_write.side_effect = wrapped_write
459             writer = arv_put.ArvPutUploadJob([self.large_file_name],
460                                              replication_desired=1)
461             # We'll be accessing from inside the wrapper
462             self.writer = writer
463             with self.assertRaises(SystemExit):
464                 writer.start(save_collection=False)
465             # Confirm that the file was partially uploaded
466             self.assertGreater(writer.bytes_written, 0)
467             self.assertLess(writer.bytes_written,
468                             os.path.getsize(self.large_file_name))
469         # Retry the upload, this time without cache usage
470         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
471                                           replication_desired=1,
472                                           resume=False,
473                                           use_cache=False)
474         writer2.start(save_collection=False)
475         self.assertEqual(writer2.bytes_skipped, 0)
476         self.assertEqual(writer2.bytes_written,
477                          os.path.getsize(self.large_file_name))
478         writer2.destroy_cache()
479         del(self.writer)
480
481     def test_dry_run_feature(self):
482         def wrapped_write(*args, **kwargs):
483             data = args[1]
484             # Exit only on last block
485             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
486                 # Simulate a checkpoint before quitting.
487                 self.writer._update()
488                 raise SystemExit("Simulated error")
489             return self.arvfile_write(*args, **kwargs)
490
491         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
492                         autospec=True) as mocked_write:
493             mocked_write.side_effect = wrapped_write
494             writer = arv_put.ArvPutUploadJob([self.large_file_name],
495                                              replication_desired=1)
496             # We'll be accessing from inside the wrapper
497             self.writer = writer
498             with self.assertRaises(SystemExit):
499                 writer.start(save_collection=False)
500             # Confirm that the file was partially uploaded
501             self.assertGreater(writer.bytes_written, 0)
502             self.assertLess(writer.bytes_written,
503                             os.path.getsize(self.large_file_name))
504         with self.assertRaises(arv_put.ArvPutUploadIsPending):
505             # Retry the upload using dry_run to check if there is a pending upload
506             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
507                                               replication_desired=1,
508                                               dry_run=True)
509         # Complete the pending upload
510         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
511                                           replication_desired=1)
512         writer3.start(save_collection=False)
513         with self.assertRaises(arv_put.ArvPutUploadNotPending):
514             # Confirm there's no pending upload with dry_run=True
515             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
516                                               replication_desired=1,
517                                               dry_run=True)
518         # Test obvious cases
519         with self.assertRaises(arv_put.ArvPutUploadIsPending):
520             arv_put.ArvPutUploadJob([self.large_file_name],
521                                     replication_desired=1,
522                                     dry_run=True,
523                                     resume=False,
524                                     use_cache=False)
525         with self.assertRaises(arv_put.ArvPutUploadIsPending):
526             arv_put.ArvPutUploadJob([self.large_file_name],
527                                     replication_desired=1,
528                                     dry_run=True,
529                                     resume=False)
530         del(self.writer)
531
532 class CachedManifestValidationTest(ArvadosBaseTestCase):
533     class MockedPut(arv_put.ArvPutUploadJob):
534         def __init__(self, cached_manifest=None):
535             self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
536             self._state['manifest'] = cached_manifest
537             self._api_client = mock.MagicMock()
538             self.logger = mock.MagicMock()
539             self.num_retries = 1
540
541     def datetime_to_hex(self, dt):
542         return hex(int(time.mktime(dt.timetuple())))[2:]
543
544     def setUp(self):
545         super(CachedManifestValidationTest, self).setUp()
546         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
547         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
548         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
549
550     def test_empty_cached_manifest_is_valid(self):
551         put_mock = self.MockedPut()
552         self.assertEqual(None, put_mock._state.get('manifest'))
553         self.assertTrue(put_mock._cached_manifest_valid())
554         put_mock._state['manifest'] = ''
555         self.assertTrue(put_mock._cached_manifest_valid())
556
557     def test_signature_cases(self):
558         now = datetime.datetime.utcnow()
559         yesterday = now - datetime.timedelta(days=1)
560         lastweek = now - datetime.timedelta(days=7)
561         tomorrow = now + datetime.timedelta(days=1)
562         nextweek = now + datetime.timedelta(days=7)
563
564         def mocked_head(blocks={}, loc=None):
565             blk = loc.split('+', 1)[0]
566             if blocks.get(blk):
567                 return True
568             raise arvados.errors.KeepRequestError("mocked error - block invalid")
569
570         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
571         cases = [
572             # All expired, reset cache - OK
573             (yesterday, lastweek, False, False, True),
574             (lastweek, yesterday, False, False, True),
575             # All non-expired valid blocks - OK
576             (tomorrow, nextweek, True, True, True),
577             (nextweek, tomorrow, True, True, True),
578             # All non-expired invalid blocks - Not OK
579             (tomorrow, nextweek, False, False, False),
580             (nextweek, tomorrow, False, False, False),
581             # One non-expired valid block - OK
582             (tomorrow, yesterday, True, False, True),
583             (yesterday, tomorrow, False, True, True),
584             # One non-expired invalid block - Not OK
585             (tomorrow, yesterday, False, False, False),
586             (yesterday, tomorrow, False, False, False),
587         ]
588         for case in cases:
589             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
590             head_responses = {
591                 self.block1: b1_valid,
592                 self.block2: b2_valid,
593             }
594             cached_manifest = self.template % (
595                 self.datetime_to_hex(b1_expiration),
596                 self.datetime_to_hex(b2_expiration),
597             )
598             arvput = self.MockedPut(cached_manifest)
599             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
600                 head_mock.side_effect = partial(mocked_head, head_responses)
601                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
602                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
603                 )
604                 if b1_expiration > now or b2_expiration > now:
605                     # A HEAD request should have been done
606                     head_mock.assert_called_once()
607                 else:
608                     head_mock.assert_not_called()
609
610
611 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
612     TEST_SIZE = os.path.getsize(__file__)
613
614     def test_expected_bytes_for_file(self):
615         writer = arv_put.ArvPutUploadJob([__file__])
616         self.assertEqual(self.TEST_SIZE,
617                          writer.bytes_expected)
618
619     def test_expected_bytes_for_tree(self):
620         tree = self.make_tmpdir()
621         shutil.copyfile(__file__, os.path.join(tree, 'one'))
622         shutil.copyfile(__file__, os.path.join(tree, 'two'))
623
624         writer = arv_put.ArvPutUploadJob([tree])
625         self.assertEqual(self.TEST_SIZE * 2,
626                          writer.bytes_expected)
627         writer = arv_put.ArvPutUploadJob([tree, __file__])
628         self.assertEqual(self.TEST_SIZE * 3,
629                          writer.bytes_expected)
630
631     def test_expected_bytes_for_device(self):
632         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
633         self.assertIsNone(writer.bytes_expected)
634         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
635         self.assertIsNone(writer.bytes_expected)
636
637
638 class ArvadosPutReportTest(ArvadosBaseTestCase):
639     def test_machine_progress(self):
640         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
641             expect = ": {} written {} total\n".format(
642                 count, -1 if (total is None) else total)
643             self.assertTrue(
644                 arv_put.machine_progress(count, total).endswith(expect))
645
646     def test_known_human_progress(self):
647         for count, total in [(0, 1), (2, 4), (45, 60)]:
648             expect = '{:.1%}'.format(1.0*count/total)
649             actual = arv_put.human_progress(count, total)
650             self.assertTrue(actual.startswith('\r'))
651             self.assertIn(expect, actual)
652
653     def test_unknown_human_progress(self):
654         for count in [1, 20, 300, 4000, 50000]:
655             self.assertTrue(re.search(r'\b{}\b'.format(count),
656                                       arv_put.human_progress(count, None)))
657
658
659 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
660     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
661
662     def setUp(self):
663         super(ArvPutLogFormatterTest, self).setUp()
664         self.stderr = tutil.StringIO()
665         self.loggingHandler = logging.StreamHandler(self.stderr)
666         self.loggingHandler.setFormatter(
667             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
668         self.logger = logging.getLogger()
669         self.logger.addHandler(self.loggingHandler)
670         self.logger.setLevel(logging.DEBUG)
671
672     def tearDown(self):
673         self.logger.removeHandler(self.loggingHandler)
674         self.stderr.close()
675         self.stderr = None
676         super(ArvPutLogFormatterTest, self).tearDown()
677
678     def test_request_id_logged_only_once_on_error(self):
679         self.logger.error('Ooops, something bad happened.')
680         self.logger.error('Another bad thing just happened.')
681         log_lines = self.stderr.getvalue().split('\n')[:-1]
682         self.assertEqual(2, len(log_lines))
683         self.assertRegex(log_lines[0], self.matcher)
684         self.assertNotRegex(log_lines[1], self.matcher)
685
686     def test_request_id_logged_only_once_on_debug(self):
687         self.logger.debug('This is just a debug message.')
688         self.logger.debug('Another message, move along.')
689         log_lines = self.stderr.getvalue().split('\n')[:-1]
690         self.assertEqual(2, len(log_lines))
691         self.assertRegex(log_lines[0], self.matcher)
692         self.assertNotRegex(log_lines[1], self.matcher)
693
694     def test_request_id_not_logged_on_info(self):
695         self.logger.info('This should be a useful message')
696         log_lines = self.stderr.getvalue().split('\n')[:-1]
697         self.assertEqual(1, len(log_lines))
698         self.assertNotRegex(log_lines[0], self.matcher)
699
700 class ArvadosPutTest(run_test_server.TestCaseWithServers,
701                      ArvadosBaseTestCase,
702                      tutil.VersionChecker):
703     MAIN_SERVER = {}
704     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
705
706     def call_main_with_args(self, args):
707         self.main_stdout.seek(0, 0)
708         self.main_stdout.truncate(0)
709         self.main_stderr.seek(0, 0)
710         self.main_stderr.truncate(0)
711         return arv_put.main(args, self.main_stdout, self.main_stderr)
712
713     def call_main_on_test_file(self, args=[]):
714         with self.make_test_file() as testfile:
715             path = testfile.name
716             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
717         self.assertTrue(
718             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
719                                         '098f6bcd4621d373cade4e832627b4f6')),
720             "did not find file stream in Keep store")
721
722     def setUp(self):
723         super(ArvadosPutTest, self).setUp()
724         run_test_server.authorize_with('active')
725         arv_put.api_client = None
726         self.main_stdout = tutil.StringIO()
727         self.main_stderr = tutil.StringIO()
728         self.loggingHandler = logging.StreamHandler(self.main_stderr)
729         self.loggingHandler.setFormatter(
730             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
731         logging.getLogger().addHandler(self.loggingHandler)
732
733     def tearDown(self):
734         logging.getLogger().removeHandler(self.loggingHandler)
735         for outbuf in ['main_stdout', 'main_stderr']:
736             if hasattr(self, outbuf):
737                 getattr(self, outbuf).close()
738                 delattr(self, outbuf)
739         super(ArvadosPutTest, self).tearDown()
740
741     def test_version_argument(self):
742         with tutil.redirected_streams(
743                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
744             with self.assertRaises(SystemExit):
745                 self.call_main_with_args(['--version'])
746         self.assertVersionOutput(out, err)
747
748     def test_simple_file_put(self):
749         self.call_main_on_test_file()
750
751     def test_put_with_unwriteable_cache_dir(self):
752         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
753         cachedir = self.make_tmpdir()
754         os.chmod(cachedir, 0o0)
755         arv_put.ResumeCache.CACHE_DIR = cachedir
756         try:
757             self.call_main_on_test_file()
758         finally:
759             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
760             os.chmod(cachedir, 0o700)
761
762     def test_put_with_unwritable_cache_subdir(self):
763         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
764         cachedir = self.make_tmpdir()
765         os.chmod(cachedir, 0o0)
766         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
767         try:
768             self.call_main_on_test_file()
769         finally:
770             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
771             os.chmod(cachedir, 0o700)
772
773     def test_put_block_replication(self):
774         self.call_main_on_test_file()
775         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
776             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
777             self.call_main_on_test_file(['--replication', '1'])
778             self.call_main_on_test_file(['--replication', '4'])
779             self.call_main_on_test_file(['--replication', '5'])
780             self.assertEqual(
781                 [x[-1].get('copies') for x in put_mock.call_args_list],
782                 [1, 4, 5])
783
784     def test_normalize(self):
785         testfile1 = self.make_test_file()
786         testfile2 = self.make_test_file()
787         test_paths = [testfile1.name, testfile2.name]
788         # Reverse-sort the paths, so normalization must change their order.
789         test_paths.sort(reverse=True)
790         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
791                                  test_paths)
792         manifest = self.main_stdout.getvalue()
793         # Assert the second file we specified appears first in the manifest.
794         file_indices = [manifest.find(':' + os.path.basename(path))
795                         for path in test_paths]
796         self.assertGreater(*file_indices)
797
798     def test_error_name_without_collection(self):
799         self.assertRaises(SystemExit, self.call_main_with_args,
800                           ['--name', 'test without Collection',
801                            '--stream', '/dev/null'])
802
803     def test_error_when_project_not_found(self):
804         self.assertRaises(SystemExit,
805                           self.call_main_with_args,
806                           ['--project-uuid', self.Z_UUID])
807
808     def test_error_bad_project_uuid(self):
809         self.assertRaises(SystemExit,
810                           self.call_main_with_args,
811                           ['--project-uuid', self.Z_UUID, '--stream'])
812
813     def test_error_when_multiple_storage_classes_specified(self):
814         self.assertRaises(SystemExit,
815                           self.call_main_with_args,
816                           ['--storage-classes', 'hot,cold'])
817
818     def test_error_when_excluding_absolute_path(self):
819         tmpdir = self.make_tmpdir()
820         self.assertRaises(SystemExit,
821                           self.call_main_with_args,
822                           ['--exclude', '/some/absolute/path/*',
823                            tmpdir])
824
825     def test_api_error_handling(self):
826         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
827         coll_save_mock.side_effect = arvados.errors.ApiError(
828             fake_httplib2_response(403), b'{}')
829         with mock.patch('arvados.collection.Collection.save_new',
830                         new=coll_save_mock):
831             with self.assertRaises(SystemExit) as exc_test:
832                 self.call_main_with_args(['/dev/null'])
833             self.assertLess(0, exc_test.exception.args[0])
834             self.assertLess(0, coll_save_mock.call_count)
835             self.assertEqual("", self.main_stdout.getvalue())
836
837     def test_request_id_logging_on_error(self):
838         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
839         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
840         coll_save_mock.side_effect = arvados.errors.ApiError(
841             fake_httplib2_response(403), b'{}')
842         with mock.patch('arvados.collection.Collection.save_new',
843                         new=coll_save_mock):
844             with self.assertRaises(SystemExit) as exc_test:
845                 self.call_main_with_args(['/dev/null'])
846             self.assertRegex(
847                 self.main_stderr.getvalue(), matcher)
848
849
850 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
851                             ArvadosBaseTestCase):
852     def _getKeepServerConfig():
853         for config_file, mandatory in [
854                 ['application.yml', False], ['application.default.yml', True]]:
855             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
856                                 "api", "config", config_file)
857             if not mandatory and not os.path.exists(path):
858                 continue
859             with open(path) as f:
860                 rails_config = yaml.load(f.read())
861                 for config_section in ['test', 'common']:
862                     try:
863                         key = rails_config[config_section]["blob_signing_key"]
864                     except (KeyError, TypeError):
865                         pass
866                     else:
867                         return {'blob_signing_key': key,
868                                 'enforce_permissions': True}
869         return {'blog_signing_key': None, 'enforce_permissions': False}
870
871     MAIN_SERVER = {}
872     KEEP_SERVER = _getKeepServerConfig()
873     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
874
875     @classmethod
876     def setUpClass(cls):
877         super(ArvPutIntegrationTest, cls).setUpClass()
878         cls.ENVIRON = os.environ.copy()
879         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
880
881     def datetime_to_hex(self, dt):
882         return hex(int(time.mktime(dt.timetuple())))[2:]
883
884     def setUp(self):
885         super(ArvPutIntegrationTest, self).setUp()
886         arv_put.api_client = None
887
888     def authorize_with(self, token_name):
889         run_test_server.authorize_with(token_name)
890         for v in ["ARVADOS_API_HOST",
891                   "ARVADOS_API_HOST_INSECURE",
892                   "ARVADOS_API_TOKEN"]:
893             self.ENVIRON[v] = arvados.config.settings()[v]
894         arv_put.api_client = arvados.api('v1')
895
896     def current_user(self):
897         return arv_put.api_client.users().current().execute()
898
899     def test_check_real_project_found(self):
900         self.authorize_with('active')
901         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
902                         "did not correctly find test fixture project")
903
904     def test_check_error_finding_nonexistent_uuid(self):
905         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
906         self.authorize_with('active')
907         try:
908             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
909                                                   0)
910         except ValueError as error:
911             self.assertIn(BAD_UUID, str(error))
912         else:
913             self.assertFalse(result, "incorrectly found nonexistent project")
914
915     def test_check_error_finding_nonexistent_project(self):
916         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
917         self.authorize_with('active')
918         with self.assertRaises(apiclient.errors.HttpError):
919             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
920                                                   0)
921
922     def test_short_put_from_stdin(self):
923         # Have to run this as an integration test since arv-put can't
924         # read from the tests' stdin.
925         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
926         # case, because the /proc entry is already gone by the time it tries.
927         pipe = subprocess.Popen(
928             [sys.executable, arv_put.__file__, '--stream'],
929             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
930             stderr=subprocess.STDOUT, env=self.ENVIRON)
931         pipe.stdin.write(b'stdin test\n')
932         pipe.stdin.close()
933         deadline = time.time() + 5
934         while (pipe.poll() is None) and (time.time() < deadline):
935             time.sleep(.1)
936         returncode = pipe.poll()
937         if returncode is None:
938             pipe.terminate()
939             self.fail("arv-put did not PUT from stdin within 5 seconds")
940         elif returncode != 0:
941             sys.stdout.write(pipe.stdout.read())
942             self.fail("arv-put returned exit code {}".format(returncode))
943         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
944                       pipe.stdout.read().decode())
945
946     def test_sigint_logs_request_id(self):
947         # Start arv-put, give it a chance to start up, send SIGINT,
948         # and check that its output includes the X-Request-Id.
949         input_stream = subprocess.Popen(
950             ['sleep', '10'],
951             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
952         pipe = subprocess.Popen(
953             [sys.executable, arv_put.__file__, '--stream'],
954             stdin=input_stream.stdout, stdout=subprocess.PIPE,
955             stderr=subprocess.STDOUT, env=self.ENVIRON)
956         # Wait for arv-put child process to print something (i.e., a
957         # log message) so we know its signal handler is installed.
958         select.select([pipe.stdout], [], [], 10)
959         pipe.send_signal(signal.SIGINT)
960         deadline = time.time() + 5
961         while (pipe.poll() is None) and (time.time() < deadline):
962             time.sleep(.1)
963         returncode = pipe.poll()
964         input_stream.terminate()
965         if returncode is None:
966             pipe.terminate()
967             self.fail("arv-put did not exit within 5 seconds")
968         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
969
970     def test_ArvPutSignedManifest(self):
971         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
972         # the newly created manifest from the API server, testing to confirm
973         # that the block locators in the returned manifest are signed.
974         self.authorize_with('active')
975
976         # Before doing anything, demonstrate that the collection
977         # we're about to create is not present in our test fixture.
978         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
979         with self.assertRaises(apiclient.errors.HttpError):
980             notfound = arv_put.api_client.collections().get(
981                 uuid=manifest_uuid).execute()
982
983         datadir = self.make_tmpdir()
984         with open(os.path.join(datadir, "foo"), "w") as f:
985             f.write("The quick brown fox jumped over the lazy dog")
986         p = subprocess.Popen([sys.executable, arv_put.__file__,
987                               os.path.join(datadir, 'foo')],
988                              stdout=subprocess.PIPE,
989                              stderr=subprocess.PIPE,
990                              env=self.ENVIRON)
991         (out, err) = p.communicate()
992         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
993         self.assertEqual(p.returncode, 0)
994
995         # The manifest text stored in the API server under the same
996         # manifest UUID must use signed locators.
997         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
998         self.assertRegex(
999             c['manifest_text'],
1000             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1001
1002         os.remove(os.path.join(datadir, "foo"))
1003         os.rmdir(datadir)
1004
1005     def run_and_find_collection(self, text, extra_args=[]):
1006         self.authorize_with('active')
1007         pipe = subprocess.Popen(
1008             [sys.executable, arv_put.__file__] + extra_args,
1009             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1010             stderr=subprocess.PIPE, env=self.ENVIRON)
1011         stdout, stderr = pipe.communicate(text.encode())
1012         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1013         search_key = ('portable_data_hash'
1014                       if '--portable-data-hash' in extra_args else 'uuid')
1015         collection_list = arvados.api('v1').collections().list(
1016             filters=[[search_key, '=', stdout.decode().strip()]]
1017         ).execute().get('items', [])
1018         self.assertEqual(1, len(collection_list))
1019         return collection_list[0]
1020
1021     def test_all_expired_signatures_invalidates_cache(self):
1022         self.authorize_with('active')
1023         tmpdir = self.make_tmpdir()
1024         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1025             f.write('foo')
1026         # Upload a directory and get the cache file name
1027         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1028                              stdout=subprocess.PIPE,
1029                              stderr=subprocess.PIPE,
1030                              env=self.ENVIRON)
1031         (out, err) = p.communicate()
1032         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1033         self.assertEqual(p.returncode, 0)
1034         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1035                                    err.decode()).groups()[0]
1036         self.assertTrue(os.path.isfile(cache_filepath))
1037         # Load the cache file contents and modify the manifest to simulate
1038         # an expired access token
1039         with open(cache_filepath, 'r') as c:
1040             cache = json.load(c)
1041         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1042         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1043         cache['manifest'] = re.sub(
1044             r'\@.*? ',
1045             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1046             cache['manifest'])
1047         with open(cache_filepath, 'w') as c:
1048             c.write(json.dumps(cache))
1049         # Re-run the upload and expect to get an invalid cache message
1050         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1051                              stdout=subprocess.PIPE,
1052                              stderr=subprocess.PIPE,
1053                              env=self.ENVIRON)
1054         (out, err) = p.communicate()
1055         self.assertRegex(
1056             err.decode(),
1057             r'INFO: Cache expired, starting from scratch.*')
1058         self.assertEqual(p.returncode, 0)
1059
1060     def test_invalid_signature_invalidates_cache(self):
1061         self.authorize_with('active')
1062         tmpdir = self.make_tmpdir()
1063         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1064             f.write('foo')
1065         # Upload a directory and get the cache file name
1066         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1067                              stdout=subprocess.PIPE,
1068                              stderr=subprocess.PIPE,
1069                              env=self.ENVIRON)
1070         (out, err) = p.communicate()
1071         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1072         self.assertEqual(p.returncode, 0)
1073         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1074                                    err.decode()).groups()[0]
1075         self.assertTrue(os.path.isfile(cache_filepath))
1076         # Load the cache file contents and modify the manifest to simulate
1077         # an invalid access token
1078         with open(cache_filepath, 'r') as c:
1079             cache = json.load(c)
1080         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1081         cache['manifest'] = re.sub(
1082             r'\+A.*\@',
1083             "+Aabcdef0123456789abcdef0123456789abcdef01@",
1084             cache['manifest'])
1085         with open(cache_filepath, 'w') as c:
1086             c.write(json.dumps(cache))
1087         # Re-run the upload and expect to get an invalid cache message
1088         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1089                              stdout=subprocess.PIPE,
1090                              stderr=subprocess.PIPE,
1091                              env=self.ENVIRON)
1092         (out, err) = p.communicate()
1093         self.assertRegex(
1094             err.decode(),
1095             r'ERROR: arv-put: Resume cache contains invalid signature.*')
1096         self.assertEqual(p.returncode, 1)
1097
1098     def test_single_expired_signature_reuploads_file(self):
1099         self.authorize_with('active')
1100         tmpdir = self.make_tmpdir()
1101         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1102             f.write('foo')
1103         # Write a second file on its own subdir to force a new stream
1104         os.mkdir(os.path.join(tmpdir, 'bar'))
1105         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1106             f.write('bar')
1107         # Upload a directory and get the cache file name
1108         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1109                              stdout=subprocess.PIPE,
1110                              stderr=subprocess.PIPE,
1111                              env=self.ENVIRON)
1112         (out, err) = p.communicate()
1113         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1114         self.assertEqual(p.returncode, 0)
1115         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1116                                    err.decode()).groups()[0]
1117         self.assertTrue(os.path.isfile(cache_filepath))
1118         # Load the cache file contents and modify the manifest to simulate
1119         # an expired access token
1120         with open(cache_filepath, 'r') as c:
1121             cache = json.load(c)
1122         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1123         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1124         # Make one of the signatures appear to have expired
1125         cache['manifest'] = re.sub(
1126             r'\@.*? 3:3:barfile.txt',
1127             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1128             cache['manifest'])
1129         with open(cache_filepath, 'w') as c:
1130             c.write(json.dumps(cache))
1131         # Re-run the upload and expect to get an invalid cache message
1132         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1133                              stdout=subprocess.PIPE,
1134                              stderr=subprocess.PIPE,
1135                              env=self.ENVIRON)
1136         (out, err) = p.communicate()
1137         self.assertRegex(
1138             err.decode(),
1139             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1140         self.assertEqual(p.returncode, 0)
1141         # Confirm that the resulting cache is different from the last run.
1142         with open(cache_filepath, 'r') as c2:
1143             new_cache = json.load(c2)
1144         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1145
1146     def test_put_collection_with_later_update(self):
1147         tmpdir = self.make_tmpdir()
1148         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1149             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1150         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1151         self.assertNotEqual(None, col['uuid'])
1152         # Add a new file to the directory
1153         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1154             f.write('The quick brown fox jumped over the lazy dog')
1155         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1156         self.assertEqual(col['uuid'], updated_col['uuid'])
1157         # Get the manifest and check that the new file is being included
1158         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1159         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1160
1161     def test_upload_directory_reference_without_trailing_slash(self):
1162         tmpdir1 = self.make_tmpdir()
1163         tmpdir2 = self.make_tmpdir()
1164         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1165             f.write('This is foo')
1166         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1167             f.write('This is not foo')
1168         # Upload one directory and one file
1169         col = self.run_and_find_collection("", ['--no-progress',
1170                                                 tmpdir1,
1171                                                 os.path.join(tmpdir2, 'bar')])
1172         self.assertNotEqual(None, col['uuid'])
1173         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1174         # Check that 'foo' was written inside a subcollection
1175         # OTOH, 'bar' should have been directly uploaded on the root collection
1176         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1177
1178     def test_upload_directory_reference_with_trailing_slash(self):
1179         tmpdir1 = self.make_tmpdir()
1180         tmpdir2 = self.make_tmpdir()
1181         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1182             f.write('This is foo')
1183         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1184             f.write('This is not foo')
1185         # Upload one directory (with trailing slash) and one file
1186         col = self.run_and_find_collection("", ['--no-progress',
1187                                                 tmpdir1 + os.sep,
1188                                                 os.path.join(tmpdir2, 'bar')])
1189         self.assertNotEqual(None, col['uuid'])
1190         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1191         # Check that 'foo' and 'bar' were written at the same level
1192         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1193
1194     def test_put_collection_with_high_redundancy(self):
1195         # Write empty data: we're not testing CollectionWriter, just
1196         # making sure collections.create tells the API server what our
1197         # desired replication level is.
1198         collection = self.run_and_find_collection("", ['--replication', '4'])
1199         self.assertEqual(4, collection['replication_desired'])
1200
1201     def test_put_collection_with_default_redundancy(self):
1202         collection = self.run_and_find_collection("")
1203         self.assertEqual(None, collection['replication_desired'])
1204
1205     def test_put_collection_with_unnamed_project_link(self):
1206         link = self.run_and_find_collection(
1207             "Test unnamed collection",
1208             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1209         username = pwd.getpwuid(os.getuid()).pw_name
1210         self.assertRegex(
1211             link['name'],
1212             r'^Saved at .* by {}@'.format(re.escape(username)))
1213
1214     def test_put_collection_with_name_and_no_project(self):
1215         link_name = 'Test Collection Link in home project'
1216         collection = self.run_and_find_collection(
1217             "Test named collection in home project",
1218             ['--portable-data-hash', '--name', link_name])
1219         self.assertEqual(link_name, collection['name'])
1220         my_user_uuid = self.current_user()['uuid']
1221         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1222
1223     def test_put_collection_with_named_project_link(self):
1224         link_name = 'Test auto Collection Link'
1225         collection = self.run_and_find_collection("Test named collection",
1226                                       ['--portable-data-hash',
1227                                        '--name', link_name,
1228                                        '--project-uuid', self.PROJECT_UUID])
1229         self.assertEqual(link_name, collection['name'])
1230
1231     def test_put_collection_with_storage_classes_specified(self):
1232         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1233
1234         self.assertEqual(len(collection['storage_classes_desired']), 1)
1235         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1236
1237     def test_put_collection_without_storage_classes_specified(self):
1238         collection = self.run_and_find_collection("")
1239
1240         self.assertEqual(len(collection['storage_classes_desired']), 1)
1241         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1242
1243     def test_exclude_filename_pattern(self):
1244         tmpdir = self.make_tmpdir()
1245         tmpsubdir = os.path.join(tmpdir, 'subdir')
1246         os.mkdir(tmpsubdir)
1247         for fname in ['file1', 'file2', 'file3']:
1248             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1249                 f.write("This is %s" % fname)
1250             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1251                 f.write("This is %s" % fname)
1252         col = self.run_and_find_collection("", ['--no-progress',
1253                                                 '--exclude', '*2.txt',
1254                                                 '--exclude', 'file3.*',
1255                                                  tmpdir])
1256         self.assertNotEqual(None, col['uuid'])
1257         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1258         # None of the file2.txt & file3.txt should have been uploaded
1259         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1260         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1261         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1262
1263     def test_exclude_filepath_pattern(self):
1264         tmpdir = self.make_tmpdir()
1265         tmpsubdir = os.path.join(tmpdir, 'subdir')
1266         os.mkdir(tmpsubdir)
1267         for fname in ['file1', 'file2', 'file3']:
1268             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1269                 f.write("This is %s" % fname)
1270             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1271                 f.write("This is %s" % fname)
1272         col = self.run_and_find_collection("", ['--no-progress',
1273                                                 '--exclude', 'subdir/*2.txt',
1274                                                 '--exclude', './file1.*',
1275                                                  tmpdir])
1276         self.assertNotEqual(None, col['uuid'])
1277         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1278         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1279         self.assertNotRegex(c['manifest_text'],
1280                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1281         self.assertNotRegex(c['manifest_text'],
1282                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1283         self.assertRegex(c['manifest_text'],
1284                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1285         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1286
1287     def test_silent_mode_no_errors(self):
1288         self.authorize_with('active')
1289         tmpdir = self.make_tmpdir()
1290         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1291             f.write('hello world')
1292         pipe = subprocess.Popen(
1293             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1294             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1295             stderr=subprocess.PIPE, env=self.ENVIRON)
1296         stdout, stderr = pipe.communicate()
1297         # No console output should occur on normal operations
1298         self.assertNotRegex(stderr.decode(), r'.+')
1299         self.assertNotRegex(stdout.decode(), r'.+')
1300
1301     def test_silent_mode_does_not_avoid_error_messages(self):
1302         self.authorize_with('active')
1303         pipe = subprocess.Popen(
1304             [sys.executable, arv_put.__file__] + ['--silent',
1305                                                   '/path/not/existant'],
1306             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1307             stderr=subprocess.PIPE, env=self.ENVIRON)
1308         stdout, stderr = pipe.communicate()
1309         # Error message should be displayed when errors happen
1310         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1311         self.assertNotRegex(stdout.decode(), r'.+')
1312
1313
1314 if __name__ == '__main__':
1315     unittest.main()