17800: Exposes bug on test.
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 from __future__ import absolute_import
8 from __future__ import division
9 from future import standard_library
10 standard_library.install_aliases()
11 from builtins import str
12 from builtins import range
13 from functools import partial
14 import apiclient
15 import ciso8601
16 import datetime
17 import hashlib
18 import json
19 import logging
20 import mock
21 import os
22 import pwd
23 import random
24 import re
25 import select
26 import shutil
27 import signal
28 import subprocess
29 import sys
30 import tempfile
31 import time
32 import unittest
33 import uuid
34 import yaml
35
36 import arvados
37 import arvados.commands.put as arv_put
38 from . import arvados_testutil as tutil
39
40 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
41 from . import run_test_server
42
43 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
44     CACHE_ARGSET = [
45         [],
46         ['/dev/null'],
47         ['/dev/null', '--filename', 'empty'],
48         ['/tmp']
49         ]
50
51     def tearDown(self):
52         super(ArvadosPutResumeCacheTest, self).tearDown()
53         try:
54             self.last_cache.destroy()
55         except AttributeError:
56             pass
57
58     def cache_path_from_arglist(self, arglist):
59         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
60
61     def test_cache_names_stable(self):
62         for argset in self.CACHE_ARGSET:
63             self.assertEqual(self.cache_path_from_arglist(argset),
64                               self.cache_path_from_arglist(argset),
65                               "cache name changed for {}".format(argset))
66
67     def test_cache_names_unique(self):
68         results = []
69         for argset in self.CACHE_ARGSET:
70             path = self.cache_path_from_arglist(argset)
71             self.assertNotIn(path, results)
72             results.append(path)
73
74     def test_cache_names_simple(self):
75         # The goal here is to make sure the filename doesn't use characters
76         # reserved by the filesystem.  Feel free to adjust this regexp as
77         # long as it still does that.
78         bad_chars = re.compile(r'[^-\.\w]')
79         for argset in self.CACHE_ARGSET:
80             path = self.cache_path_from_arglist(argset)
81             self.assertFalse(bad_chars.search(os.path.basename(path)),
82                              "path too exotic: {}".format(path))
83
84     def test_cache_names_ignore_argument_order(self):
85         self.assertEqual(
86             self.cache_path_from_arglist(['a', 'b', 'c']),
87             self.cache_path_from_arglist(['c', 'a', 'b']))
88         self.assertEqual(
89             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
90             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
91
92     def test_cache_names_differ_for_similar_paths(self):
93         # This test needs names at / that don't exist on the real filesystem.
94         self.assertNotEqual(
95             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
96             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
97
98     def test_cache_names_ignore_irrelevant_arguments(self):
99         # Workaround: parse_arguments bails on --filename with a directory.
100         path1 = self.cache_path_from_arglist(['/tmp'])
101         args = arv_put.parse_arguments(['/tmp'])
102         args.filename = 'tmp'
103         path2 = arv_put.ResumeCache.make_path(args)
104         self.assertEqual(path1, path2,
105                          "cache path considered --filename for directory")
106         self.assertEqual(
107             self.cache_path_from_arglist(['-']),
108             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
109             "cache path considered --max-manifest-depth for file")
110
111     def test_cache_names_treat_negative_manifest_depths_identically(self):
112         base_args = ['/tmp', '--max-manifest-depth']
113         self.assertEqual(
114             self.cache_path_from_arglist(base_args + ['-1']),
115             self.cache_path_from_arglist(base_args + ['-2']))
116
117     def test_cache_names_treat_stdin_consistently(self):
118         self.assertEqual(
119             self.cache_path_from_arglist(['-', '--filename', 'test']),
120             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
121
122     def test_cache_names_identical_for_synonymous_names(self):
123         self.assertEqual(
124             self.cache_path_from_arglist(['.']),
125             self.cache_path_from_arglist([os.path.realpath('.')]))
126         testdir = self.make_tmpdir()
127         looplink = os.path.join(testdir, 'loop')
128         os.symlink(testdir, looplink)
129         self.assertEqual(
130             self.cache_path_from_arglist([testdir]),
131             self.cache_path_from_arglist([looplink]))
132
133     def test_cache_names_different_by_api_host(self):
134         config = arvados.config.settings()
135         orig_host = config.get('ARVADOS_API_HOST')
136         try:
137             name1 = self.cache_path_from_arglist(['.'])
138             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
139             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
140         finally:
141             if orig_host is None:
142                 del config['ARVADOS_API_HOST']
143             else:
144                 config['ARVADOS_API_HOST'] = orig_host
145
146     @mock.patch('arvados.keep.KeepClient.head')
147     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
148         keep_client_head.side_effect = [True]
149         thing = {}
150         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
151         with tempfile.NamedTemporaryFile() as cachefile:
152             self.last_cache = arv_put.ResumeCache(cachefile.name)
153         self.last_cache.save(thing)
154         self.last_cache.close()
155         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
156         self.assertNotEqual(None, resume_cache)
157
158     @mock.patch('arvados.keep.KeepClient.head')
159     def test_resume_cache_with_finished_streams(self, keep_client_head):
160         keep_client_head.side_effect = [True]
161         thing = {}
162         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
163         with tempfile.NamedTemporaryFile() as cachefile:
164             self.last_cache = arv_put.ResumeCache(cachefile.name)
165         self.last_cache.save(thing)
166         self.last_cache.close()
167         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
168         self.assertNotEqual(None, resume_cache)
169
170     @mock.patch('arvados.keep.KeepClient.head')
171     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
172         keep_client_head.side_effect = Exception('Locator not found')
173         thing = {}
174         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
175         with tempfile.NamedTemporaryFile() as cachefile:
176             self.last_cache = arv_put.ResumeCache(cachefile.name)
177         self.last_cache.save(thing)
178         self.last_cache.close()
179         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
180         self.assertNotEqual(None, resume_cache)
181         resume_cache.check_cache()
182
183     def test_basic_cache_storage(self):
184         thing = ['test', 'list']
185         with tempfile.NamedTemporaryFile() as cachefile:
186             self.last_cache = arv_put.ResumeCache(cachefile.name)
187         self.last_cache.save(thing)
188         self.assertEqual(thing, self.last_cache.load())
189
190     def test_empty_cache(self):
191         with tempfile.NamedTemporaryFile() as cachefile:
192             cache = arv_put.ResumeCache(cachefile.name)
193         self.assertRaises(ValueError, cache.load)
194
195     def test_cache_persistent(self):
196         thing = ['test', 'list']
197         path = os.path.join(self.make_tmpdir(), 'cache')
198         cache = arv_put.ResumeCache(path)
199         cache.save(thing)
200         cache.close()
201         self.last_cache = arv_put.ResumeCache(path)
202         self.assertEqual(thing, self.last_cache.load())
203
204     def test_multiple_cache_writes(self):
205         thing = ['short', 'list']
206         with tempfile.NamedTemporaryFile() as cachefile:
207             self.last_cache = arv_put.ResumeCache(cachefile.name)
208         # Start writing an object longer than the one we test, to make
209         # sure the cache file gets truncated.
210         self.last_cache.save(['long', 'long', 'list'])
211         self.last_cache.save(thing)
212         self.assertEqual(thing, self.last_cache.load())
213
214     def test_cache_is_locked(self):
215         with tempfile.NamedTemporaryFile() as cachefile:
216             _ = arv_put.ResumeCache(cachefile.name)
217             self.assertRaises(arv_put.ResumeCacheConflict,
218                               arv_put.ResumeCache, cachefile.name)
219
220     def test_cache_stays_locked(self):
221         with tempfile.NamedTemporaryFile() as cachefile:
222             self.last_cache = arv_put.ResumeCache(cachefile.name)
223             path = cachefile.name
224         self.last_cache.save('test')
225         self.assertRaises(arv_put.ResumeCacheConflict,
226                           arv_put.ResumeCache, path)
227
228     def test_destroy_cache(self):
229         cachefile = tempfile.NamedTemporaryFile(delete=False)
230         try:
231             cache = arv_put.ResumeCache(cachefile.name)
232             cache.save('test')
233             cache.destroy()
234             try:
235                 arv_put.ResumeCache(cachefile.name)
236             except arv_put.ResumeCacheConflict:
237                 self.fail("could not load cache after destroying it")
238             self.assertRaises(ValueError, cache.load)
239         finally:
240             if os.path.exists(cachefile.name):
241                 os.unlink(cachefile.name)
242
243     def test_restart_cache(self):
244         path = os.path.join(self.make_tmpdir(), 'cache')
245         cache = arv_put.ResumeCache(path)
246         cache.save('test')
247         cache.restart()
248         self.assertRaises(ValueError, cache.load)
249         self.assertRaises(arv_put.ResumeCacheConflict,
250                           arv_put.ResumeCache, path)
251
252
253 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
254                           ArvadosBaseTestCase):
255
256     def setUp(self):
257         super(ArvPutUploadJobTest, self).setUp()
258         run_test_server.authorize_with('active')
259         # Temp files creation
260         self.tempdir = tempfile.mkdtemp()
261         subdir = os.path.join(self.tempdir, 'subdir')
262         os.mkdir(subdir)
263         data = "x" * 1024 # 1 KB
264         for i in range(1, 5):
265             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
266                 f.write(data * i)
267         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
268             f.write(data * 5)
269         # Large temp file for resume test
270         _, self.large_file_name = tempfile.mkstemp()
271         fileobj = open(self.large_file_name, 'w')
272         # Make sure to write just a little more than one block
273         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
274             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
275             fileobj.write(data)
276         fileobj.close()
277         # Temp dir containing small files to be repacked
278         self.small_files_dir = tempfile.mkdtemp()
279         data = 'y' * 1024 * 1024 # 1 MB
280         for i in range(1, 70):
281             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
282                 f.write(data + str(i))
283         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
284         # Temp dir to hold a symlink to other temp dir
285         self.tempdir_with_symlink = tempfile.mkdtemp()
286         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
287         os.symlink(os.path.join(self.tempdir, '1'),
288                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
289
290     def tearDown(self):
291         super(ArvPutUploadJobTest, self).tearDown()
292         shutil.rmtree(self.tempdir)
293         os.unlink(self.large_file_name)
294         shutil.rmtree(self.small_files_dir)
295         shutil.rmtree(self.tempdir_with_symlink)
296
297     def test_symlinks_are_followed_by_default(self):
298         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
299         cwriter.start(save_collection=False)
300         self.assertIn('linkeddir', cwriter.manifest_text())
301         self.assertIn('linkedfile', cwriter.manifest_text())
302         cwriter.destroy_cache()
303
304     def test_symlinks_are_not_followed_when_requested(self):
305         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
306                                           follow_links=False)
307         cwriter.start(save_collection=False)
308         self.assertNotIn('linkeddir', cwriter.manifest_text())
309         self.assertNotIn('linkedfile', cwriter.manifest_text())
310         cwriter.destroy_cache()
311         # Check for bug #17800: passed symlinks should also be ignored.
312         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
313         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
314         cwriter.start(save_collection=False)
315         self.assertNotIn('linkeddir', cwriter.manifest_text())
316         cwriter.destroy_cache()
317
318
319     def test_passing_nonexistant_path_raise_exception(self):
320         uuid_str = str(uuid.uuid4())
321         with self.assertRaises(arv_put.PathDoesNotExistError):
322             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
323
324     def test_writer_works_without_cache(self):
325         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
326         cwriter.start(save_collection=False)
327         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
328
329     def test_writer_works_with_cache(self):
330         with tempfile.NamedTemporaryFile() as f:
331             f.write(b'foo')
332             f.flush()
333             cwriter = arv_put.ArvPutUploadJob([f.name])
334             cwriter.start(save_collection=False)
335             self.assertEqual(0, cwriter.bytes_skipped)
336             self.assertEqual(3, cwriter.bytes_written)
337             # Don't destroy the cache, and start another upload
338             cwriter_new = arv_put.ArvPutUploadJob([f.name])
339             cwriter_new.start(save_collection=False)
340             cwriter_new.destroy_cache()
341             self.assertEqual(3, cwriter_new.bytes_skipped)
342             self.assertEqual(3, cwriter_new.bytes_written)
343
344     def make_progress_tester(self):
345         progression = []
346         def record_func(written, expected):
347             progression.append((written, expected))
348         return progression, record_func
349
350     def test_progress_reporting(self):
351         with tempfile.NamedTemporaryFile() as f:
352             f.write(b'foo')
353             f.flush()
354             for expect_count in (None, 8):
355                 progression, reporter = self.make_progress_tester()
356                 cwriter = arv_put.ArvPutUploadJob([f.name],
357                                                   reporter=reporter)
358                 cwriter.bytes_expected = expect_count
359                 cwriter.start(save_collection=False)
360                 cwriter.destroy_cache()
361                 self.assertIn((3, expect_count), progression)
362
363     def test_writer_upload_directory(self):
364         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
365         cwriter.start(save_collection=False)
366         cwriter.destroy_cache()
367         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
368
369     def test_resume_large_file_upload(self):
370         def wrapped_write(*args, **kwargs):
371             data = args[1]
372             # Exit only on last block
373             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
374                 # Simulate a checkpoint before quitting. Ensure block commit.
375                 self.writer._update(final=True)
376                 raise SystemExit("Simulated error")
377             return self.arvfile_write(*args, **kwargs)
378
379         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
380                         autospec=True) as mocked_write:
381             mocked_write.side_effect = wrapped_write
382             writer = arv_put.ArvPutUploadJob([self.large_file_name],
383                                              replication_desired=1)
384             # We'll be accessing from inside the wrapper
385             self.writer = writer
386             with self.assertRaises(SystemExit):
387                 writer.start(save_collection=False)
388             # Confirm that the file was partially uploaded
389             self.assertGreater(writer.bytes_written, 0)
390             self.assertLess(writer.bytes_written,
391                             os.path.getsize(self.large_file_name))
392         # Retry the upload
393         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
394                                           replication_desired=1)
395         writer2.start(save_collection=False)
396         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
397                          os.path.getsize(self.large_file_name))
398         writer2.destroy_cache()
399         del(self.writer)
400
401     # Test for bug #11002
402     def test_graceful_exit_while_repacking_small_blocks(self):
403         def wrapped_commit(*args, **kwargs):
404             raise SystemExit("Simulated error")
405
406         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
407                         autospec=True) as mocked_commit:
408             mocked_commit.side_effect = wrapped_commit
409             # Upload a little more than 1 block, wrapped_commit will make the first block
410             # commit to fail.
411             # arv-put should not exit with an exception by trying to commit the collection
412             # as it's in an inconsistent state.
413             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
414                                              replication_desired=1)
415             try:
416                 with self.assertRaises(SystemExit):
417                     writer.start(save_collection=False)
418             except arvados.arvfile.UnownedBlockError:
419                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
420         writer.destroy_cache()
421
422     def test_no_resume_when_asked(self):
423         def wrapped_write(*args, **kwargs):
424             data = args[1]
425             # Exit only on last block
426             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
427                 # Simulate a checkpoint before quitting.
428                 self.writer._update()
429                 raise SystemExit("Simulated error")
430             return self.arvfile_write(*args, **kwargs)
431
432         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
433                         autospec=True) as mocked_write:
434             mocked_write.side_effect = wrapped_write
435             writer = arv_put.ArvPutUploadJob([self.large_file_name],
436                                              replication_desired=1)
437             # We'll be accessing from inside the wrapper
438             self.writer = writer
439             with self.assertRaises(SystemExit):
440                 writer.start(save_collection=False)
441             # Confirm that the file was partially uploaded
442             self.assertGreater(writer.bytes_written, 0)
443             self.assertLess(writer.bytes_written,
444                             os.path.getsize(self.large_file_name))
445         # Retry the upload, this time without resume
446         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
447                                           replication_desired=1,
448                                           resume=False)
449         writer2.start(save_collection=False)
450         self.assertEqual(writer2.bytes_skipped, 0)
451         self.assertEqual(writer2.bytes_written,
452                          os.path.getsize(self.large_file_name))
453         writer2.destroy_cache()
454         del(self.writer)
455
456     def test_no_resume_when_no_cache(self):
457         def wrapped_write(*args, **kwargs):
458             data = args[1]
459             # Exit only on last block
460             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
461                 # Simulate a checkpoint before quitting.
462                 self.writer._update()
463                 raise SystemExit("Simulated error")
464             return self.arvfile_write(*args, **kwargs)
465
466         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
467                         autospec=True) as mocked_write:
468             mocked_write.side_effect = wrapped_write
469             writer = arv_put.ArvPutUploadJob([self.large_file_name],
470                                              replication_desired=1)
471             # We'll be accessing from inside the wrapper
472             self.writer = writer
473             with self.assertRaises(SystemExit):
474                 writer.start(save_collection=False)
475             # Confirm that the file was partially uploaded
476             self.assertGreater(writer.bytes_written, 0)
477             self.assertLess(writer.bytes_written,
478                             os.path.getsize(self.large_file_name))
479         # Retry the upload, this time without cache usage
480         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
481                                           replication_desired=1,
482                                           resume=False,
483                                           use_cache=False)
484         writer2.start(save_collection=False)
485         self.assertEqual(writer2.bytes_skipped, 0)
486         self.assertEqual(writer2.bytes_written,
487                          os.path.getsize(self.large_file_name))
488         writer2.destroy_cache()
489         del(self.writer)
490
491     def test_dry_run_feature(self):
492         def wrapped_write(*args, **kwargs):
493             data = args[1]
494             # Exit only on last block
495             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
496                 # Simulate a checkpoint before quitting.
497                 self.writer._update()
498                 raise SystemExit("Simulated error")
499             return self.arvfile_write(*args, **kwargs)
500
501         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
502                         autospec=True) as mocked_write:
503             mocked_write.side_effect = wrapped_write
504             writer = arv_put.ArvPutUploadJob([self.large_file_name],
505                                              replication_desired=1)
506             # We'll be accessing from inside the wrapper
507             self.writer = writer
508             with self.assertRaises(SystemExit):
509                 writer.start(save_collection=False)
510             # Confirm that the file was partially uploaded
511             self.assertGreater(writer.bytes_written, 0)
512             self.assertLess(writer.bytes_written,
513                             os.path.getsize(self.large_file_name))
514         with self.assertRaises(arv_put.ArvPutUploadIsPending):
515             # Retry the upload using dry_run to check if there is a pending upload
516             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
517                                               replication_desired=1,
518                                               dry_run=True)
519         # Complete the pending upload
520         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
521                                           replication_desired=1)
522         writer3.start(save_collection=False)
523         with self.assertRaises(arv_put.ArvPutUploadNotPending):
524             # Confirm there's no pending upload with dry_run=True
525             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
526                                               replication_desired=1,
527                                               dry_run=True)
528         # Test obvious cases
529         with self.assertRaises(arv_put.ArvPutUploadIsPending):
530             arv_put.ArvPutUploadJob([self.large_file_name],
531                                     replication_desired=1,
532                                     dry_run=True,
533                                     resume=False,
534                                     use_cache=False)
535         with self.assertRaises(arv_put.ArvPutUploadIsPending):
536             arv_put.ArvPutUploadJob([self.large_file_name],
537                                     replication_desired=1,
538                                     dry_run=True,
539                                     resume=False)
540         del(self.writer)
541
542 class CachedManifestValidationTest(ArvadosBaseTestCase):
543     class MockedPut(arv_put.ArvPutUploadJob):
544         def __init__(self, cached_manifest=None):
545             self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
546             self._state['manifest'] = cached_manifest
547             self._api_client = mock.MagicMock()
548             self.logger = mock.MagicMock()
549             self.num_retries = 1
550
551     def datetime_to_hex(self, dt):
552         return hex(int(time.mktime(dt.timetuple())))[2:]
553
554     def setUp(self):
555         super(CachedManifestValidationTest, self).setUp()
556         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
557         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
558         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
559
560     def test_empty_cached_manifest_is_valid(self):
561         put_mock = self.MockedPut()
562         self.assertEqual(None, put_mock._state.get('manifest'))
563         self.assertTrue(put_mock._cached_manifest_valid())
564         put_mock._state['manifest'] = ''
565         self.assertTrue(put_mock._cached_manifest_valid())
566
567     def test_signature_cases(self):
568         now = datetime.datetime.utcnow()
569         yesterday = now - datetime.timedelta(days=1)
570         lastweek = now - datetime.timedelta(days=7)
571         tomorrow = now + datetime.timedelta(days=1)
572         nextweek = now + datetime.timedelta(days=7)
573
574         def mocked_head(blocks={}, loc=None):
575             blk = loc.split('+', 1)[0]
576             if blocks.get(blk):
577                 return True
578             raise arvados.errors.KeepRequestError("mocked error - block invalid")
579
580         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
581         cases = [
582             # All expired, reset cache - OK
583             (yesterday, lastweek, False, False, True),
584             (lastweek, yesterday, False, False, True),
585             # All non-expired valid blocks - OK
586             (tomorrow, nextweek, True, True, True),
587             (nextweek, tomorrow, True, True, True),
588             # All non-expired invalid blocks - Not OK
589             (tomorrow, nextweek, False, False, False),
590             (nextweek, tomorrow, False, False, False),
591             # One non-expired valid block - OK
592             (tomorrow, yesterday, True, False, True),
593             (yesterday, tomorrow, False, True, True),
594             # One non-expired invalid block - Not OK
595             (tomorrow, yesterday, False, False, False),
596             (yesterday, tomorrow, False, False, False),
597         ]
598         for case in cases:
599             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
600             head_responses = {
601                 self.block1: b1_valid,
602                 self.block2: b2_valid,
603             }
604             cached_manifest = self.template % (
605                 self.datetime_to_hex(b1_expiration),
606                 self.datetime_to_hex(b2_expiration),
607             )
608             arvput = self.MockedPut(cached_manifest)
609             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
610                 head_mock.side_effect = partial(mocked_head, head_responses)
611                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
612                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
613                 )
614                 if b1_expiration > now or b2_expiration > now:
615                     # A HEAD request should have been done
616                     head_mock.assert_called_once()
617                 else:
618                     head_mock.assert_not_called()
619
620
621 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
622     TEST_SIZE = os.path.getsize(__file__)
623
624     def test_expected_bytes_for_file(self):
625         writer = arv_put.ArvPutUploadJob([__file__])
626         self.assertEqual(self.TEST_SIZE,
627                          writer.bytes_expected)
628
629     def test_expected_bytes_for_tree(self):
630         tree = self.make_tmpdir()
631         shutil.copyfile(__file__, os.path.join(tree, 'one'))
632         shutil.copyfile(__file__, os.path.join(tree, 'two'))
633
634         writer = arv_put.ArvPutUploadJob([tree])
635         self.assertEqual(self.TEST_SIZE * 2,
636                          writer.bytes_expected)
637         writer = arv_put.ArvPutUploadJob([tree, __file__])
638         self.assertEqual(self.TEST_SIZE * 3,
639                          writer.bytes_expected)
640
641     def test_expected_bytes_for_device(self):
642         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
643         self.assertIsNone(writer.bytes_expected)
644         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
645         self.assertIsNone(writer.bytes_expected)
646
647
648 class ArvadosPutReportTest(ArvadosBaseTestCase):
649     def test_machine_progress(self):
650         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
651             expect = ": {} written {} total\n".format(
652                 count, -1 if (total is None) else total)
653             self.assertTrue(
654                 arv_put.machine_progress(count, total).endswith(expect))
655
656     def test_known_human_progress(self):
657         for count, total in [(0, 1), (2, 4), (45, 60)]:
658             expect = '{:.1%}'.format(1.0*count/total)
659             actual = arv_put.human_progress(count, total)
660             self.assertTrue(actual.startswith('\r'))
661             self.assertIn(expect, actual)
662
663     def test_unknown_human_progress(self):
664         for count in [1, 20, 300, 4000, 50000]:
665             self.assertTrue(re.search(r'\b{}\b'.format(count),
666                                       arv_put.human_progress(count, None)))
667
668
669 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
670     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
671
672     def setUp(self):
673         super(ArvPutLogFormatterTest, self).setUp()
674         self.stderr = tutil.StringIO()
675         self.loggingHandler = logging.StreamHandler(self.stderr)
676         self.loggingHandler.setFormatter(
677             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
678         self.logger = logging.getLogger()
679         self.logger.addHandler(self.loggingHandler)
680         self.logger.setLevel(logging.DEBUG)
681
682     def tearDown(self):
683         self.logger.removeHandler(self.loggingHandler)
684         self.stderr.close()
685         self.stderr = None
686         super(ArvPutLogFormatterTest, self).tearDown()
687
688     def test_request_id_logged_only_once_on_error(self):
689         self.logger.error('Ooops, something bad happened.')
690         self.logger.error('Another bad thing just happened.')
691         log_lines = self.stderr.getvalue().split('\n')[:-1]
692         self.assertEqual(2, len(log_lines))
693         self.assertRegex(log_lines[0], self.matcher)
694         self.assertNotRegex(log_lines[1], self.matcher)
695
696     def test_request_id_logged_only_once_on_debug(self):
697         self.logger.debug('This is just a debug message.')
698         self.logger.debug('Another message, move along.')
699         log_lines = self.stderr.getvalue().split('\n')[:-1]
700         self.assertEqual(2, len(log_lines))
701         self.assertRegex(log_lines[0], self.matcher)
702         self.assertNotRegex(log_lines[1], self.matcher)
703
704     def test_request_id_not_logged_on_info(self):
705         self.logger.info('This should be a useful message')
706         log_lines = self.stderr.getvalue().split('\n')[:-1]
707         self.assertEqual(1, len(log_lines))
708         self.assertNotRegex(log_lines[0], self.matcher)
709
710 class ArvadosPutTest(run_test_server.TestCaseWithServers,
711                      ArvadosBaseTestCase,
712                      tutil.VersionChecker):
713     MAIN_SERVER = {}
714     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
715
716     def call_main_with_args(self, args):
717         self.main_stdout.seek(0, 0)
718         self.main_stdout.truncate(0)
719         self.main_stderr.seek(0, 0)
720         self.main_stderr.truncate(0)
721         return arv_put.main(args, self.main_stdout, self.main_stderr)
722
723     def call_main_on_test_file(self, args=[]):
724         with self.make_test_file() as testfile:
725             path = testfile.name
726             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
727         self.assertTrue(
728             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
729                                         '098f6bcd4621d373cade4e832627b4f6')),
730             "did not find file stream in Keep store")
731
732     def setUp(self):
733         super(ArvadosPutTest, self).setUp()
734         run_test_server.authorize_with('active')
735         arv_put.api_client = None
736         self.main_stdout = tutil.StringIO()
737         self.main_stderr = tutil.StringIO()
738         self.loggingHandler = logging.StreamHandler(self.main_stderr)
739         self.loggingHandler.setFormatter(
740             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
741         logging.getLogger().addHandler(self.loggingHandler)
742
743     def tearDown(self):
744         logging.getLogger().removeHandler(self.loggingHandler)
745         for outbuf in ['main_stdout', 'main_stderr']:
746             if hasattr(self, outbuf):
747                 getattr(self, outbuf).close()
748                 delattr(self, outbuf)
749         super(ArvadosPutTest, self).tearDown()
750
751     def test_version_argument(self):
752         with tutil.redirected_streams(
753                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
754             with self.assertRaises(SystemExit):
755                 self.call_main_with_args(['--version'])
756         self.assertVersionOutput(out, err)
757
758     def test_simple_file_put(self):
759         self.call_main_on_test_file()
760
761     def test_put_with_unwriteable_cache_dir(self):
762         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
763         cachedir = self.make_tmpdir()
764         os.chmod(cachedir, 0o0)
765         arv_put.ResumeCache.CACHE_DIR = cachedir
766         try:
767             self.call_main_on_test_file()
768         finally:
769             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
770             os.chmod(cachedir, 0o700)
771
772     def test_put_with_unwritable_cache_subdir(self):
773         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
774         cachedir = self.make_tmpdir()
775         os.chmod(cachedir, 0o0)
776         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
777         try:
778             self.call_main_on_test_file()
779         finally:
780             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
781             os.chmod(cachedir, 0o700)
782
783     def test_put_block_replication(self):
784         self.call_main_on_test_file()
785         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
786             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
787             self.call_main_on_test_file(['--replication', '1'])
788             self.call_main_on_test_file(['--replication', '4'])
789             self.call_main_on_test_file(['--replication', '5'])
790             self.assertEqual(
791                 [x[-1].get('copies') for x in put_mock.call_args_list],
792                 [1, 4, 5])
793
794     def test_normalize(self):
795         testfile1 = self.make_test_file()
796         testfile2 = self.make_test_file()
797         test_paths = [testfile1.name, testfile2.name]
798         # Reverse-sort the paths, so normalization must change their order.
799         test_paths.sort(reverse=True)
800         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
801                                  test_paths)
802         manifest = self.main_stdout.getvalue()
803         # Assert the second file we specified appears first in the manifest.
804         file_indices = [manifest.find(':' + os.path.basename(path))
805                         for path in test_paths]
806         self.assertGreater(*file_indices)
807
808     def test_error_name_without_collection(self):
809         self.assertRaises(SystemExit, self.call_main_with_args,
810                           ['--name', 'test without Collection',
811                            '--stream', '/dev/null'])
812
813     def test_error_when_project_not_found(self):
814         self.assertRaises(SystemExit,
815                           self.call_main_with_args,
816                           ['--project-uuid', self.Z_UUID])
817
818     def test_error_bad_project_uuid(self):
819         self.assertRaises(SystemExit,
820                           self.call_main_with_args,
821                           ['--project-uuid', self.Z_UUID, '--stream'])
822
823     def test_error_when_excluding_absolute_path(self):
824         tmpdir = self.make_tmpdir()
825         self.assertRaises(SystemExit,
826                           self.call_main_with_args,
827                           ['--exclude', '/some/absolute/path/*',
828                            tmpdir])
829
830     def test_api_error_handling(self):
831         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
832         coll_save_mock.side_effect = arvados.errors.ApiError(
833             fake_httplib2_response(403), b'{}')
834         with mock.patch('arvados.collection.Collection.save_new',
835                         new=coll_save_mock):
836             with self.assertRaises(SystemExit) as exc_test:
837                 self.call_main_with_args(['/dev/null'])
838             self.assertLess(0, exc_test.exception.args[0])
839             self.assertLess(0, coll_save_mock.call_count)
840             self.assertEqual("", self.main_stdout.getvalue())
841
842     def test_request_id_logging_on_error(self):
843         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
844         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
845         coll_save_mock.side_effect = arvados.errors.ApiError(
846             fake_httplib2_response(403), b'{}')
847         with mock.patch('arvados.collection.Collection.save_new',
848                         new=coll_save_mock):
849             with self.assertRaises(SystemExit):
850                 self.call_main_with_args(['/dev/null'])
851             self.assertRegex(
852                 self.main_stderr.getvalue(), matcher)
853
854
855 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
856                             ArvadosBaseTestCase):
857     MAIN_SERVER = {}
858     KEEP_SERVER = {'blob_signing': True}
859     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
860
861     @classmethod
862     def setUpClass(cls):
863         super(ArvPutIntegrationTest, cls).setUpClass()
864         cls.ENVIRON = os.environ.copy()
865         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
866
867     def datetime_to_hex(self, dt):
868         return hex(int(time.mktime(dt.timetuple())))[2:]
869
870     def setUp(self):
871         super(ArvPutIntegrationTest, self).setUp()
872         arv_put.api_client = None
873
874     def authorize_with(self, token_name):
875         run_test_server.authorize_with(token_name)
876         for v in ["ARVADOS_API_HOST",
877                   "ARVADOS_API_HOST_INSECURE",
878                   "ARVADOS_API_TOKEN"]:
879             self.ENVIRON[v] = arvados.config.settings()[v]
880         arv_put.api_client = arvados.api('v1')
881
882     def current_user(self):
883         return arv_put.api_client.users().current().execute()
884
885     def test_check_real_project_found(self):
886         self.authorize_with('active')
887         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
888                         "did not correctly find test fixture project")
889
890     def test_check_error_finding_nonexistent_uuid(self):
891         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
892         self.authorize_with('active')
893         try:
894             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
895                                                   0)
896         except ValueError as error:
897             self.assertIn(BAD_UUID, str(error))
898         else:
899             self.assertFalse(result, "incorrectly found nonexistent project")
900
901     def test_check_error_finding_nonexistent_project(self):
902         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
903         self.authorize_with('active')
904         with self.assertRaises(apiclient.errors.HttpError):
905             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
906                                                   0)
907
908     def test_short_put_from_stdin(self):
909         # Have to run this as an integration test since arv-put can't
910         # read from the tests' stdin.
911         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
912         # case, because the /proc entry is already gone by the time it tries.
913         pipe = subprocess.Popen(
914             [sys.executable, arv_put.__file__, '--stream'],
915             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
916             stderr=subprocess.STDOUT, env=self.ENVIRON)
917         pipe.stdin.write(b'stdin test\xa6\n')
918         pipe.stdin.close()
919         deadline = time.time() + 5
920         while (pipe.poll() is None) and (time.time() < deadline):
921             time.sleep(.1)
922         returncode = pipe.poll()
923         if returncode is None:
924             pipe.terminate()
925             self.fail("arv-put did not PUT from stdin within 5 seconds")
926         elif returncode != 0:
927             sys.stdout.write(pipe.stdout.read())
928             self.fail("arv-put returned exit code {}".format(returncode))
929         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
930                       pipe.stdout.read().decode())
931
932     def test_sigint_logs_request_id(self):
933         # Start arv-put, give it a chance to start up, send SIGINT,
934         # and check that its output includes the X-Request-Id.
935         input_stream = subprocess.Popen(
936             ['sleep', '10'],
937             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
938         pipe = subprocess.Popen(
939             [sys.executable, arv_put.__file__, '--stream'],
940             stdin=input_stream.stdout, stdout=subprocess.PIPE,
941             stderr=subprocess.STDOUT, env=self.ENVIRON)
942         # Wait for arv-put child process to print something (i.e., a
943         # log message) so we know its signal handler is installed.
944         select.select([pipe.stdout], [], [], 10)
945         pipe.send_signal(signal.SIGINT)
946         deadline = time.time() + 5
947         while (pipe.poll() is None) and (time.time() < deadline):
948             time.sleep(.1)
949         returncode = pipe.poll()
950         input_stream.terminate()
951         if returncode is None:
952             pipe.terminate()
953             self.fail("arv-put did not exit within 5 seconds")
954         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
955
956     def test_ArvPutSignedManifest(self):
957         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
958         # the newly created manifest from the API server, testing to confirm
959         # that the block locators in the returned manifest are signed.
960         self.authorize_with('active')
961
962         # Before doing anything, demonstrate that the collection
963         # we're about to create is not present in our test fixture.
964         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
965         with self.assertRaises(apiclient.errors.HttpError):
966             arv_put.api_client.collections().get(
967                 uuid=manifest_uuid).execute()
968
969         datadir = self.make_tmpdir()
970         with open(os.path.join(datadir, "foo"), "w") as f:
971             f.write("The quick brown fox jumped over the lazy dog")
972         p = subprocess.Popen([sys.executable, arv_put.__file__,
973                               os.path.join(datadir, 'foo')],
974                              stdout=subprocess.PIPE,
975                              stderr=subprocess.PIPE,
976                              env=self.ENVIRON)
977         (_, err) = p.communicate()
978         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
979         self.assertEqual(p.returncode, 0)
980
981         # The manifest text stored in the API server under the same
982         # manifest UUID must use signed locators.
983         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
984         self.assertRegex(
985             c['manifest_text'],
986             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
987
988         os.remove(os.path.join(datadir, "foo"))
989         os.rmdir(datadir)
990
991     def run_and_find_collection(self, text, extra_args=[]):
992         self.authorize_with('active')
993         pipe = subprocess.Popen(
994             [sys.executable, arv_put.__file__] + extra_args,
995             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
996             stderr=subprocess.PIPE, env=self.ENVIRON)
997         stdout, stderr = pipe.communicate(text.encode())
998         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
999         search_key = ('portable_data_hash'
1000                       if '--portable-data-hash' in extra_args else 'uuid')
1001         collection_list = arvados.api('v1').collections().list(
1002             filters=[[search_key, '=', stdout.decode().strip()]]
1003         ).execute().get('items', [])
1004         self.assertEqual(1, len(collection_list))
1005         return collection_list[0]
1006
1007     def test_all_expired_signatures_invalidates_cache(self):
1008         self.authorize_with('active')
1009         tmpdir = self.make_tmpdir()
1010         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1011             f.write('foo')
1012         # Upload a directory and get the cache file name
1013         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1014                              stdout=subprocess.PIPE,
1015                              stderr=subprocess.PIPE,
1016                              env=self.ENVIRON)
1017         (_, err) = p.communicate()
1018         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1019         self.assertEqual(p.returncode, 0)
1020         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1021                                    err.decode()).groups()[0]
1022         self.assertTrue(os.path.isfile(cache_filepath))
1023         # Load the cache file contents and modify the manifest to simulate
1024         # an expired access token
1025         with open(cache_filepath, 'r') as c:
1026             cache = json.load(c)
1027         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1028         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1029         cache['manifest'] = re.sub(
1030             r'\@.*? ',
1031             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1032             cache['manifest'])
1033         with open(cache_filepath, 'w') as c:
1034             c.write(json.dumps(cache))
1035         # Re-run the upload and expect to get an invalid cache message
1036         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1037                              stdout=subprocess.PIPE,
1038                              stderr=subprocess.PIPE,
1039                              env=self.ENVIRON)
1040         (_, err) = p.communicate()
1041         self.assertRegex(
1042             err.decode(),
1043             r'INFO: Cache expired, starting from scratch.*')
1044         self.assertEqual(p.returncode, 0)
1045
1046     def test_invalid_signature_invalidates_cache(self):
1047         self.authorize_with('active')
1048         tmpdir = self.make_tmpdir()
1049         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1050             f.write('foo')
1051         # Upload a directory and get the cache file name
1052         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1053                              stdout=subprocess.PIPE,
1054                              stderr=subprocess.PIPE,
1055                              env=self.ENVIRON)
1056         (_, err) = p.communicate()
1057         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1058         self.assertEqual(p.returncode, 0)
1059         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1060                                    err.decode()).groups()[0]
1061         self.assertTrue(os.path.isfile(cache_filepath))
1062         # Load the cache file contents and modify the manifest to simulate
1063         # an invalid access token
1064         with open(cache_filepath, 'r') as c:
1065             cache = json.load(c)
1066         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1067         cache['manifest'] = re.sub(
1068             r'\+A.*\@',
1069             "+Aabcdef0123456789abcdef0123456789abcdef01@",
1070             cache['manifest'])
1071         with open(cache_filepath, 'w') as c:
1072             c.write(json.dumps(cache))
1073         # Re-run the upload and expect to get an invalid cache message
1074         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1075                              stdout=subprocess.PIPE,
1076                              stderr=subprocess.PIPE,
1077                              env=self.ENVIRON)
1078         (_, err) = p.communicate()
1079         self.assertRegex(
1080             err.decode(),
1081             r'ERROR: arv-put: Resume cache contains invalid signature.*')
1082         self.assertEqual(p.returncode, 1)
1083
1084     def test_single_expired_signature_reuploads_file(self):
1085         self.authorize_with('active')
1086         tmpdir = self.make_tmpdir()
1087         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1088             f.write('foo')
1089         # Write a second file on its own subdir to force a new stream
1090         os.mkdir(os.path.join(tmpdir, 'bar'))
1091         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1092             f.write('bar')
1093         # Upload a directory and get the cache file name
1094         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1095                              stdout=subprocess.PIPE,
1096                              stderr=subprocess.PIPE,
1097                              env=self.ENVIRON)
1098         (_, err) = p.communicate()
1099         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1100         self.assertEqual(p.returncode, 0)
1101         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1102                                    err.decode()).groups()[0]
1103         self.assertTrue(os.path.isfile(cache_filepath))
1104         # Load the cache file contents and modify the manifest to simulate
1105         # an expired access token
1106         with open(cache_filepath, 'r') as c:
1107             cache = json.load(c)
1108         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1109         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1110         # Make one of the signatures appear to have expired
1111         cache['manifest'] = re.sub(
1112             r'\@.*? 3:3:barfile.txt',
1113             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1114             cache['manifest'])
1115         with open(cache_filepath, 'w') as c:
1116             c.write(json.dumps(cache))
1117         # Re-run the upload and expect to get an invalid cache message
1118         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1119                              stdout=subprocess.PIPE,
1120                              stderr=subprocess.PIPE,
1121                              env=self.ENVIRON)
1122         (_, err) = p.communicate()
1123         self.assertRegex(
1124             err.decode(),
1125             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1126         self.assertEqual(p.returncode, 0)
1127         # Confirm that the resulting cache is different from the last run.
1128         with open(cache_filepath, 'r') as c2:
1129             new_cache = json.load(c2)
1130         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1131
1132     def test_put_collection_with_later_update(self):
1133         tmpdir = self.make_tmpdir()
1134         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1135             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1136         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1137         self.assertNotEqual(None, col['uuid'])
1138         # Add a new file to the directory
1139         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1140             f.write('The quick brown fox jumped over the lazy dog')
1141         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1142         self.assertEqual(col['uuid'], updated_col['uuid'])
1143         # Get the manifest and check that the new file is being included
1144         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1145         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1146
1147     def test_put_collection_with_utc_expiring_datetime(self):
1148         tmpdir = self.make_tmpdir()
1149         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1150         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1151             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1152         col = self.run_and_find_collection(
1153             "",
1154             ['--no-progress', '--trash-at', trash_at, tmpdir])
1155         self.assertNotEqual(None, col['uuid'])
1156         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1157         self.assertEqual(ciso8601.parse_datetime(trash_at),
1158             ciso8601.parse_datetime(c['trash_at']))
1159
1160     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1161         tmpdir = self.make_tmpdir()
1162         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1163         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1164             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1165         col = self.run_and_find_collection(
1166             "",
1167             ['--no-progress', '--trash-at', trash_at, tmpdir])
1168         self.assertNotEqual(None, col['uuid'])
1169         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1170         self.assertEqual(
1171             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1172             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1173
1174     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1175         tmpdir = self.make_tmpdir()
1176         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1177         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1178             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1179         col = self.run_and_find_collection(
1180             "",
1181             ['--no-progress', '--trash-at', trash_at, tmpdir])
1182         self.assertNotEqual(None, col['uuid'])
1183         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1184         if time.daylight:
1185             offset = datetime.timedelta(seconds=time.altzone)
1186         else:
1187             offset = datetime.timedelta(seconds=time.timezone)
1188         self.assertEqual(
1189             ciso8601.parse_datetime(trash_at) + offset,
1190             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1191
1192     def test_put_collection_with_expiring_date_only(self):
1193         tmpdir = self.make_tmpdir()
1194         trash_at = '2140-01-01'
1195         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1196         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1197             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1198         col = self.run_and_find_collection(
1199             "",
1200             ['--no-progress', '--trash-at', trash_at, tmpdir])
1201         self.assertNotEqual(None, col['uuid'])
1202         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1203         if time.daylight:
1204             offset = datetime.timedelta(seconds=time.altzone)
1205         else:
1206             offset = datetime.timedelta(seconds=time.timezone)
1207         self.assertEqual(
1208             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1209             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1210
1211     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1212         cases = ['2100', '210010','2100-10', '2100-Oct']
1213         tmpdir = self.make_tmpdir()
1214         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1215             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1216         for test_datetime in cases:
1217             with self.assertRaises(AssertionError):
1218                 self.run_and_find_collection(
1219                     "",
1220                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1221
1222     def test_put_collection_with_relative_expiring_datetime(self):
1223         expire_after = 7
1224         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1225         tmpdir = self.make_tmpdir()
1226         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1227             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1228         col = self.run_and_find_collection(
1229             "",
1230             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1231         self.assertNotEqual(None, col['uuid'])
1232         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1233         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1234         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1235         self.assertTrue(dt_before < trash_at)
1236         self.assertTrue(dt_after > trash_at)
1237
1238     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1239         expire_after = 0 # Must be >= 1
1240         tmpdir = self.make_tmpdir()
1241         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1242             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1243         with self.assertRaises(AssertionError):
1244             self.run_and_find_collection(
1245                 "",
1246                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1247
1248     def test_upload_directory_reference_without_trailing_slash(self):
1249         tmpdir1 = self.make_tmpdir()
1250         tmpdir2 = self.make_tmpdir()
1251         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1252             f.write('This is foo')
1253         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1254             f.write('This is not foo')
1255         # Upload one directory and one file
1256         col = self.run_and_find_collection("", ['--no-progress',
1257                                                 tmpdir1,
1258                                                 os.path.join(tmpdir2, 'bar')])
1259         self.assertNotEqual(None, col['uuid'])
1260         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1261         # Check that 'foo' was written inside a subcollection
1262         # OTOH, 'bar' should have been directly uploaded on the root collection
1263         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1264
1265     def test_upload_directory_reference_with_trailing_slash(self):
1266         tmpdir1 = self.make_tmpdir()
1267         tmpdir2 = self.make_tmpdir()
1268         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1269             f.write('This is foo')
1270         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1271             f.write('This is not foo')
1272         # Upload one directory (with trailing slash) and one file
1273         col = self.run_and_find_collection("", ['--no-progress',
1274                                                 tmpdir1 + os.sep,
1275                                                 os.path.join(tmpdir2, 'bar')])
1276         self.assertNotEqual(None, col['uuid'])
1277         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1278         # Check that 'foo' and 'bar' were written at the same level
1279         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1280
1281     def test_put_collection_with_high_redundancy(self):
1282         # Write empty data: we're not testing CollectionWriter, just
1283         # making sure collections.create tells the API server what our
1284         # desired replication level is.
1285         collection = self.run_and_find_collection("", ['--replication', '4'])
1286         self.assertEqual(4, collection['replication_desired'])
1287
1288     def test_put_collection_with_default_redundancy(self):
1289         collection = self.run_and_find_collection("")
1290         self.assertEqual(None, collection['replication_desired'])
1291
1292     def test_put_collection_with_unnamed_project_link(self):
1293         link = self.run_and_find_collection(
1294             "Test unnamed collection",
1295             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1296         username = pwd.getpwuid(os.getuid()).pw_name
1297         self.assertRegex(
1298             link['name'],
1299             r'^Saved at .* by {}@'.format(re.escape(username)))
1300
1301     def test_put_collection_with_name_and_no_project(self):
1302         link_name = 'Test Collection Link in home project'
1303         collection = self.run_and_find_collection(
1304             "Test named collection in home project",
1305             ['--portable-data-hash', '--name', link_name])
1306         self.assertEqual(link_name, collection['name'])
1307         my_user_uuid = self.current_user()['uuid']
1308         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1309
1310     def test_put_collection_with_named_project_link(self):
1311         link_name = 'Test auto Collection Link'
1312         collection = self.run_and_find_collection("Test named collection",
1313                                       ['--portable-data-hash',
1314                                        '--name', link_name,
1315                                        '--project-uuid', self.PROJECT_UUID])
1316         self.assertEqual(link_name, collection['name'])
1317
1318     def test_put_collection_with_storage_classes_specified(self):
1319         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1320         self.assertEqual(len(collection['storage_classes_desired']), 1)
1321         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1322
1323     def test_put_collection_with_multiple_storage_classes_specified(self):
1324         collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar  ,baz'])
1325         self.assertEqual(len(collection['storage_classes_desired']), 3)
1326         self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
1327
1328     def test_put_collection_without_storage_classes_specified(self):
1329         collection = self.run_and_find_collection("")
1330         self.assertEqual(len(collection['storage_classes_desired']), 1)
1331         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1332
1333     def test_exclude_filename_pattern(self):
1334         tmpdir = self.make_tmpdir()
1335         tmpsubdir = os.path.join(tmpdir, 'subdir')
1336         os.mkdir(tmpsubdir)
1337         for fname in ['file1', 'file2', 'file3']:
1338             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1339                 f.write("This is %s" % fname)
1340             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1341                 f.write("This is %s" % fname)
1342         col = self.run_and_find_collection("", ['--no-progress',
1343                                                 '--exclude', '*2.txt',
1344                                                 '--exclude', 'file3.*',
1345                                                  tmpdir])
1346         self.assertNotEqual(None, col['uuid'])
1347         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1348         # None of the file2.txt & file3.txt should have been uploaded
1349         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1350         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1351         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1352
1353     def test_exclude_filepath_pattern(self):
1354         tmpdir = self.make_tmpdir()
1355         tmpsubdir = os.path.join(tmpdir, 'subdir')
1356         os.mkdir(tmpsubdir)
1357         for fname in ['file1', 'file2', 'file3']:
1358             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1359                 f.write("This is %s" % fname)
1360             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1361                 f.write("This is %s" % fname)
1362         col = self.run_and_find_collection("", ['--no-progress',
1363                                                 '--exclude', 'subdir/*2.txt',
1364                                                 '--exclude', './file1.*',
1365                                                  tmpdir])
1366         self.assertNotEqual(None, col['uuid'])
1367         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1368         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1369         self.assertNotRegex(c['manifest_text'],
1370                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1371         self.assertNotRegex(c['manifest_text'],
1372                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1373         self.assertRegex(c['manifest_text'],
1374                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1375         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1376
1377     def test_unicode_on_filename(self):
1378         tmpdir = self.make_tmpdir()
1379         fname = u"iā¤arvados.txt"
1380         with open(os.path.join(tmpdir, fname), 'w') as f:
1381             f.write("This is a unicode named file")
1382         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1383         self.assertNotEqual(None, col['uuid'])
1384         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1385         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1386
1387     def test_silent_mode_no_errors(self):
1388         self.authorize_with('active')
1389         tmpdir = self.make_tmpdir()
1390         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1391             f.write('hello world')
1392         pipe = subprocess.Popen(
1393             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1394             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1395             stderr=subprocess.PIPE, env=self.ENVIRON)
1396         stdout, stderr = pipe.communicate()
1397         # No console output should occur on normal operations
1398         self.assertNotRegex(stderr.decode(), r'.+')
1399         self.assertNotRegex(stdout.decode(), r'.+')
1400
1401     def test_silent_mode_does_not_avoid_error_messages(self):
1402         self.authorize_with('active')
1403         pipe = subprocess.Popen(
1404             [sys.executable, arv_put.__file__] + ['--silent',
1405                                                   '/path/not/existant'],
1406             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1407             stderr=subprocess.PIPE, env=self.ENVIRON)
1408         stdout, stderr = pipe.communicate()
1409         # Error message should be displayed when errors happen
1410         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1411         self.assertNotRegex(stdout.decode(), r'.+')
1412
1413
1414 if __name__ == '__main__':
1415     unittest.main()