17609: Merge branch 'master'
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 from __future__ import absolute_import
8 from __future__ import division
9 from future import standard_library
10 standard_library.install_aliases()
11 from builtins import str
12 from builtins import range
13 from functools import partial
14 import apiclient
15 import ciso8601
16 import datetime
17 import hashlib
18 import json
19 import logging
20 import mock
21 import os
22 import pwd
23 import random
24 import re
25 import select
26 import shutil
27 import signal
28 import subprocess
29 import sys
30 import tempfile
31 import time
32 import unittest
33 import uuid
34 import yaml
35
36 import arvados
37 import arvados.commands.put as arv_put
38 from . import arvados_testutil as tutil
39
40 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
41 from . import run_test_server
42
43 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
44     CACHE_ARGSET = [
45         [],
46         ['/dev/null'],
47         ['/dev/null', '--filename', 'empty'],
48         ['/tmp']
49         ]
50
51     def tearDown(self):
52         super(ArvadosPutResumeCacheTest, self).tearDown()
53         try:
54             self.last_cache.destroy()
55         except AttributeError:
56             pass
57
58     def cache_path_from_arglist(self, arglist):
59         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
60
61     def test_cache_names_stable(self):
62         for argset in self.CACHE_ARGSET:
63             self.assertEqual(self.cache_path_from_arglist(argset),
64                               self.cache_path_from_arglist(argset),
65                               "cache name changed for {}".format(argset))
66
67     def test_cache_names_unique(self):
68         results = []
69         for argset in self.CACHE_ARGSET:
70             path = self.cache_path_from_arglist(argset)
71             self.assertNotIn(path, results)
72             results.append(path)
73
74     def test_cache_names_simple(self):
75         # The goal here is to make sure the filename doesn't use characters
76         # reserved by the filesystem.  Feel free to adjust this regexp as
77         # long as it still does that.
78         bad_chars = re.compile(r'[^-\.\w]')
79         for argset in self.CACHE_ARGSET:
80             path = self.cache_path_from_arglist(argset)
81             self.assertFalse(bad_chars.search(os.path.basename(path)),
82                              "path too exotic: {}".format(path))
83
84     def test_cache_names_ignore_argument_order(self):
85         self.assertEqual(
86             self.cache_path_from_arglist(['a', 'b', 'c']),
87             self.cache_path_from_arglist(['c', 'a', 'b']))
88         self.assertEqual(
89             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
90             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
91
92     def test_cache_names_differ_for_similar_paths(self):
93         # This test needs names at / that don't exist on the real filesystem.
94         self.assertNotEqual(
95             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
96             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
97
98     def test_cache_names_ignore_irrelevant_arguments(self):
99         # Workaround: parse_arguments bails on --filename with a directory.
100         path1 = self.cache_path_from_arglist(['/tmp'])
101         args = arv_put.parse_arguments(['/tmp'])
102         args.filename = 'tmp'
103         path2 = arv_put.ResumeCache.make_path(args)
104         self.assertEqual(path1, path2,
105                          "cache path considered --filename for directory")
106         self.assertEqual(
107             self.cache_path_from_arglist(['-']),
108             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
109             "cache path considered --max-manifest-depth for file")
110
111     def test_cache_names_treat_negative_manifest_depths_identically(self):
112         base_args = ['/tmp', '--max-manifest-depth']
113         self.assertEqual(
114             self.cache_path_from_arglist(base_args + ['-1']),
115             self.cache_path_from_arglist(base_args + ['-2']))
116
117     def test_cache_names_treat_stdin_consistently(self):
118         self.assertEqual(
119             self.cache_path_from_arglist(['-', '--filename', 'test']),
120             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
121
122     def test_cache_names_identical_for_synonymous_names(self):
123         self.assertEqual(
124             self.cache_path_from_arglist(['.']),
125             self.cache_path_from_arglist([os.path.realpath('.')]))
126         testdir = self.make_tmpdir()
127         looplink = os.path.join(testdir, 'loop')
128         os.symlink(testdir, looplink)
129         self.assertEqual(
130             self.cache_path_from_arglist([testdir]),
131             self.cache_path_from_arglist([looplink]))
132
133     def test_cache_names_different_by_api_host(self):
134         config = arvados.config.settings()
135         orig_host = config.get('ARVADOS_API_HOST')
136         try:
137             name1 = self.cache_path_from_arglist(['.'])
138             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
139             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
140         finally:
141             if orig_host is None:
142                 del config['ARVADOS_API_HOST']
143             else:
144                 config['ARVADOS_API_HOST'] = orig_host
145
146     @mock.patch('arvados.keep.KeepClient.head')
147     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
148         keep_client_head.side_effect = [True]
149         thing = {}
150         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
151         with tempfile.NamedTemporaryFile() as cachefile:
152             self.last_cache = arv_put.ResumeCache(cachefile.name)
153         self.last_cache.save(thing)
154         self.last_cache.close()
155         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
156         self.assertNotEqual(None, resume_cache)
157
158     @mock.patch('arvados.keep.KeepClient.head')
159     def test_resume_cache_with_finished_streams(self, keep_client_head):
160         keep_client_head.side_effect = [True]
161         thing = {}
162         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
163         with tempfile.NamedTemporaryFile() as cachefile:
164             self.last_cache = arv_put.ResumeCache(cachefile.name)
165         self.last_cache.save(thing)
166         self.last_cache.close()
167         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
168         self.assertNotEqual(None, resume_cache)
169
170     @mock.patch('arvados.keep.KeepClient.head')
171     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
172         keep_client_head.side_effect = Exception('Locator not found')
173         thing = {}
174         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
175         with tempfile.NamedTemporaryFile() as cachefile:
176             self.last_cache = arv_put.ResumeCache(cachefile.name)
177         self.last_cache.save(thing)
178         self.last_cache.close()
179         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
180         self.assertNotEqual(None, resume_cache)
181         resume_cache.check_cache()
182
183     def test_basic_cache_storage(self):
184         thing = ['test', 'list']
185         with tempfile.NamedTemporaryFile() as cachefile:
186             self.last_cache = arv_put.ResumeCache(cachefile.name)
187         self.last_cache.save(thing)
188         self.assertEqual(thing, self.last_cache.load())
189
190     def test_empty_cache(self):
191         with tempfile.NamedTemporaryFile() as cachefile:
192             cache = arv_put.ResumeCache(cachefile.name)
193         self.assertRaises(ValueError, cache.load)
194
195     def test_cache_persistent(self):
196         thing = ['test', 'list']
197         path = os.path.join(self.make_tmpdir(), 'cache')
198         cache = arv_put.ResumeCache(path)
199         cache.save(thing)
200         cache.close()
201         self.last_cache = arv_put.ResumeCache(path)
202         self.assertEqual(thing, self.last_cache.load())
203
204     def test_multiple_cache_writes(self):
205         thing = ['short', 'list']
206         with tempfile.NamedTemporaryFile() as cachefile:
207             self.last_cache = arv_put.ResumeCache(cachefile.name)
208         # Start writing an object longer than the one we test, to make
209         # sure the cache file gets truncated.
210         self.last_cache.save(['long', 'long', 'list'])
211         self.last_cache.save(thing)
212         self.assertEqual(thing, self.last_cache.load())
213
214     def test_cache_is_locked(self):
215         with tempfile.NamedTemporaryFile() as cachefile:
216             _ = arv_put.ResumeCache(cachefile.name)
217             self.assertRaises(arv_put.ResumeCacheConflict,
218                               arv_put.ResumeCache, cachefile.name)
219
220     def test_cache_stays_locked(self):
221         with tempfile.NamedTemporaryFile() as cachefile:
222             self.last_cache = arv_put.ResumeCache(cachefile.name)
223             path = cachefile.name
224         self.last_cache.save('test')
225         self.assertRaises(arv_put.ResumeCacheConflict,
226                           arv_put.ResumeCache, path)
227
228     def test_destroy_cache(self):
229         cachefile = tempfile.NamedTemporaryFile(delete=False)
230         try:
231             cache = arv_put.ResumeCache(cachefile.name)
232             cache.save('test')
233             cache.destroy()
234             try:
235                 arv_put.ResumeCache(cachefile.name)
236             except arv_put.ResumeCacheConflict:
237                 self.fail("could not load cache after destroying it")
238             self.assertRaises(ValueError, cache.load)
239         finally:
240             if os.path.exists(cachefile.name):
241                 os.unlink(cachefile.name)
242
243     def test_restart_cache(self):
244         path = os.path.join(self.make_tmpdir(), 'cache')
245         cache = arv_put.ResumeCache(path)
246         cache.save('test')
247         cache.restart()
248         self.assertRaises(ValueError, cache.load)
249         self.assertRaises(arv_put.ResumeCacheConflict,
250                           arv_put.ResumeCache, path)
251
252
253 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
254                           ArvadosBaseTestCase):
255
256     def setUp(self):
257         super(ArvPutUploadJobTest, self).setUp()
258         run_test_server.authorize_with('active')
259         # Temp files creation
260         self.tempdir = tempfile.mkdtemp()
261         subdir = os.path.join(self.tempdir, 'subdir')
262         os.mkdir(subdir)
263         data = "x" * 1024 # 1 KB
264         for i in range(1, 5):
265             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
266                 f.write(data * i)
267         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
268             f.write(data * 5)
269         # Large temp file for resume test
270         _, self.large_file_name = tempfile.mkstemp()
271         fileobj = open(self.large_file_name, 'w')
272         # Make sure to write just a little more than one block
273         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
274             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
275             fileobj.write(data)
276         fileobj.close()
277         # Temp dir containing small files to be repacked
278         self.small_files_dir = tempfile.mkdtemp()
279         data = 'y' * 1024 * 1024 # 1 MB
280         for i in range(1, 70):
281             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
282                 f.write(data + str(i))
283         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
284         # Temp dir to hold a symlink to other temp dir
285         self.tempdir_with_symlink = tempfile.mkdtemp()
286         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
287         os.symlink(os.path.join(self.tempdir, '1'),
288                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
289
290     def tearDown(self):
291         super(ArvPutUploadJobTest, self).tearDown()
292         shutil.rmtree(self.tempdir)
293         os.unlink(self.large_file_name)
294         shutil.rmtree(self.small_files_dir)
295         shutil.rmtree(self.tempdir_with_symlink)
296
297     def test_symlinks_are_followed_by_default(self):
298         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
299         cwriter.start(save_collection=False)
300         self.assertIn('linkeddir', cwriter.manifest_text())
301         self.assertIn('linkedfile', cwriter.manifest_text())
302         cwriter.destroy_cache()
303
304     def test_symlinks_are_not_followed_when_requested(self):
305         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
306                                           follow_links=False)
307         cwriter.start(save_collection=False)
308         self.assertNotIn('linkeddir', cwriter.manifest_text())
309         self.assertNotIn('linkedfile', cwriter.manifest_text())
310         cwriter.destroy_cache()
311
312     def test_passing_nonexistant_path_raise_exception(self):
313         uuid_str = str(uuid.uuid4())
314         with self.assertRaises(arv_put.PathDoesNotExistError):
315             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
316
317     def test_writer_works_without_cache(self):
318         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
319         cwriter.start(save_collection=False)
320         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
321
322     def test_writer_works_with_cache(self):
323         with tempfile.NamedTemporaryFile() as f:
324             f.write(b'foo')
325             f.flush()
326             cwriter = arv_put.ArvPutUploadJob([f.name])
327             cwriter.start(save_collection=False)
328             self.assertEqual(0, cwriter.bytes_skipped)
329             self.assertEqual(3, cwriter.bytes_written)
330             # Don't destroy the cache, and start another upload
331             cwriter_new = arv_put.ArvPutUploadJob([f.name])
332             cwriter_new.start(save_collection=False)
333             cwriter_new.destroy_cache()
334             self.assertEqual(3, cwriter_new.bytes_skipped)
335             self.assertEqual(3, cwriter_new.bytes_written)
336
337     def make_progress_tester(self):
338         progression = []
339         def record_func(written, expected):
340             progression.append((written, expected))
341         return progression, record_func
342
343     def test_progress_reporting(self):
344         with tempfile.NamedTemporaryFile() as f:
345             f.write(b'foo')
346             f.flush()
347             for expect_count in (None, 8):
348                 progression, reporter = self.make_progress_tester()
349                 cwriter = arv_put.ArvPutUploadJob([f.name],
350                                                   reporter=reporter)
351                 cwriter.bytes_expected = expect_count
352                 cwriter.start(save_collection=False)
353                 cwriter.destroy_cache()
354                 self.assertIn((3, expect_count), progression)
355
356     def test_writer_upload_directory(self):
357         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
358         cwriter.start(save_collection=False)
359         cwriter.destroy_cache()
360         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
361
362     def test_resume_large_file_upload(self):
363         def wrapped_write(*args, **kwargs):
364             data = args[1]
365             # Exit only on last block
366             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
367                 # Simulate a checkpoint before quitting. Ensure block commit.
368                 self.writer._update(final=True)
369                 raise SystemExit("Simulated error")
370             return self.arvfile_write(*args, **kwargs)
371
372         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
373                         autospec=True) as mocked_write:
374             mocked_write.side_effect = wrapped_write
375             writer = arv_put.ArvPutUploadJob([self.large_file_name],
376                                              replication_desired=1)
377             # We'll be accessing from inside the wrapper
378             self.writer = writer
379             with self.assertRaises(SystemExit):
380                 writer.start(save_collection=False)
381             # Confirm that the file was partially uploaded
382             self.assertGreater(writer.bytes_written, 0)
383             self.assertLess(writer.bytes_written,
384                             os.path.getsize(self.large_file_name))
385         # Retry the upload
386         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
387                                           replication_desired=1)
388         writer2.start(save_collection=False)
389         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
390                          os.path.getsize(self.large_file_name))
391         writer2.destroy_cache()
392         del(self.writer)
393
394     # Test for bug #11002
395     def test_graceful_exit_while_repacking_small_blocks(self):
396         def wrapped_commit(*args, **kwargs):
397             raise SystemExit("Simulated error")
398
399         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
400                         autospec=True) as mocked_commit:
401             mocked_commit.side_effect = wrapped_commit
402             # Upload a little more than 1 block, wrapped_commit will make the first block
403             # commit to fail.
404             # arv-put should not exit with an exception by trying to commit the collection
405             # as it's in an inconsistent state.
406             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
407                                              replication_desired=1)
408             try:
409                 with self.assertRaises(SystemExit):
410                     writer.start(save_collection=False)
411             except arvados.arvfile.UnownedBlockError:
412                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
413         writer.destroy_cache()
414
415     def test_no_resume_when_asked(self):
416         def wrapped_write(*args, **kwargs):
417             data = args[1]
418             # Exit only on last block
419             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
420                 # Simulate a checkpoint before quitting.
421                 self.writer._update()
422                 raise SystemExit("Simulated error")
423             return self.arvfile_write(*args, **kwargs)
424
425         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
426                         autospec=True) as mocked_write:
427             mocked_write.side_effect = wrapped_write
428             writer = arv_put.ArvPutUploadJob([self.large_file_name],
429                                              replication_desired=1)
430             # We'll be accessing from inside the wrapper
431             self.writer = writer
432             with self.assertRaises(SystemExit):
433                 writer.start(save_collection=False)
434             # Confirm that the file was partially uploaded
435             self.assertGreater(writer.bytes_written, 0)
436             self.assertLess(writer.bytes_written,
437                             os.path.getsize(self.large_file_name))
438         # Retry the upload, this time without resume
439         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
440                                           replication_desired=1,
441                                           resume=False)
442         writer2.start(save_collection=False)
443         self.assertEqual(writer2.bytes_skipped, 0)
444         self.assertEqual(writer2.bytes_written,
445                          os.path.getsize(self.large_file_name))
446         writer2.destroy_cache()
447         del(self.writer)
448
449     def test_no_resume_when_no_cache(self):
450         def wrapped_write(*args, **kwargs):
451             data = args[1]
452             # Exit only on last block
453             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
454                 # Simulate a checkpoint before quitting.
455                 self.writer._update()
456                 raise SystemExit("Simulated error")
457             return self.arvfile_write(*args, **kwargs)
458
459         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
460                         autospec=True) as mocked_write:
461             mocked_write.side_effect = wrapped_write
462             writer = arv_put.ArvPutUploadJob([self.large_file_name],
463                                              replication_desired=1)
464             # We'll be accessing from inside the wrapper
465             self.writer = writer
466             with self.assertRaises(SystemExit):
467                 writer.start(save_collection=False)
468             # Confirm that the file was partially uploaded
469             self.assertGreater(writer.bytes_written, 0)
470             self.assertLess(writer.bytes_written,
471                             os.path.getsize(self.large_file_name))
472         # Retry the upload, this time without cache usage
473         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
474                                           replication_desired=1,
475                                           resume=False,
476                                           use_cache=False)
477         writer2.start(save_collection=False)
478         self.assertEqual(writer2.bytes_skipped, 0)
479         self.assertEqual(writer2.bytes_written,
480                          os.path.getsize(self.large_file_name))
481         writer2.destroy_cache()
482         del(self.writer)
483
484     def test_dry_run_feature(self):
485         def wrapped_write(*args, **kwargs):
486             data = args[1]
487             # Exit only on last block
488             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
489                 # Simulate a checkpoint before quitting.
490                 self.writer._update()
491                 raise SystemExit("Simulated error")
492             return self.arvfile_write(*args, **kwargs)
493
494         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
495                         autospec=True) as mocked_write:
496             mocked_write.side_effect = wrapped_write
497             writer = arv_put.ArvPutUploadJob([self.large_file_name],
498                                              replication_desired=1)
499             # We'll be accessing from inside the wrapper
500             self.writer = writer
501             with self.assertRaises(SystemExit):
502                 writer.start(save_collection=False)
503             # Confirm that the file was partially uploaded
504             self.assertGreater(writer.bytes_written, 0)
505             self.assertLess(writer.bytes_written,
506                             os.path.getsize(self.large_file_name))
507         with self.assertRaises(arv_put.ArvPutUploadIsPending):
508             # Retry the upload using dry_run to check if there is a pending upload
509             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
510                                               replication_desired=1,
511                                               dry_run=True)
512         # Complete the pending upload
513         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
514                                           replication_desired=1)
515         writer3.start(save_collection=False)
516         with self.assertRaises(arv_put.ArvPutUploadNotPending):
517             # Confirm there's no pending upload with dry_run=True
518             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
519                                               replication_desired=1,
520                                               dry_run=True)
521         # Test obvious cases
522         with self.assertRaises(arv_put.ArvPutUploadIsPending):
523             arv_put.ArvPutUploadJob([self.large_file_name],
524                                     replication_desired=1,
525                                     dry_run=True,
526                                     resume=False,
527                                     use_cache=False)
528         with self.assertRaises(arv_put.ArvPutUploadIsPending):
529             arv_put.ArvPutUploadJob([self.large_file_name],
530                                     replication_desired=1,
531                                     dry_run=True,
532                                     resume=False)
533         del(self.writer)
534
535 class CachedManifestValidationTest(ArvadosBaseTestCase):
536     class MockedPut(arv_put.ArvPutUploadJob):
537         def __init__(self, cached_manifest=None):
538             self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
539             self._state['manifest'] = cached_manifest
540             self._api_client = mock.MagicMock()
541             self.logger = mock.MagicMock()
542             self.num_retries = 1
543
544     def datetime_to_hex(self, dt):
545         return hex(int(time.mktime(dt.timetuple())))[2:]
546
547     def setUp(self):
548         super(CachedManifestValidationTest, self).setUp()
549         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
550         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
551         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
552
553     def test_empty_cached_manifest_is_valid(self):
554         put_mock = self.MockedPut()
555         self.assertEqual(None, put_mock._state.get('manifest'))
556         self.assertTrue(put_mock._cached_manifest_valid())
557         put_mock._state['manifest'] = ''
558         self.assertTrue(put_mock._cached_manifest_valid())
559
560     def test_signature_cases(self):
561         now = datetime.datetime.utcnow()
562         yesterday = now - datetime.timedelta(days=1)
563         lastweek = now - datetime.timedelta(days=7)
564         tomorrow = now + datetime.timedelta(days=1)
565         nextweek = now + datetime.timedelta(days=7)
566
567         def mocked_head(blocks={}, loc=None):
568             blk = loc.split('+', 1)[0]
569             if blocks.get(blk):
570                 return True
571             raise arvados.errors.KeepRequestError("mocked error - block invalid")
572
573         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
574         cases = [
575             # All expired, reset cache - OK
576             (yesterday, lastweek, False, False, True),
577             (lastweek, yesterday, False, False, True),
578             # All non-expired valid blocks - OK
579             (tomorrow, nextweek, True, True, True),
580             (nextweek, tomorrow, True, True, True),
581             # All non-expired invalid blocks - Not OK
582             (tomorrow, nextweek, False, False, False),
583             (nextweek, tomorrow, False, False, False),
584             # One non-expired valid block - OK
585             (tomorrow, yesterday, True, False, True),
586             (yesterday, tomorrow, False, True, True),
587             # One non-expired invalid block - Not OK
588             (tomorrow, yesterday, False, False, False),
589             (yesterday, tomorrow, False, False, False),
590         ]
591         for case in cases:
592             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
593             head_responses = {
594                 self.block1: b1_valid,
595                 self.block2: b2_valid,
596             }
597             cached_manifest = self.template % (
598                 self.datetime_to_hex(b1_expiration),
599                 self.datetime_to_hex(b2_expiration),
600             )
601             arvput = self.MockedPut(cached_manifest)
602             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
603                 head_mock.side_effect = partial(mocked_head, head_responses)
604                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
605                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
606                 )
607                 if b1_expiration > now or b2_expiration > now:
608                     # A HEAD request should have been done
609                     head_mock.assert_called_once()
610                 else:
611                     head_mock.assert_not_called()
612
613
614 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
615     TEST_SIZE = os.path.getsize(__file__)
616
617     def test_expected_bytes_for_file(self):
618         writer = arv_put.ArvPutUploadJob([__file__])
619         self.assertEqual(self.TEST_SIZE,
620                          writer.bytes_expected)
621
622     def test_expected_bytes_for_tree(self):
623         tree = self.make_tmpdir()
624         shutil.copyfile(__file__, os.path.join(tree, 'one'))
625         shutil.copyfile(__file__, os.path.join(tree, 'two'))
626
627         writer = arv_put.ArvPutUploadJob([tree])
628         self.assertEqual(self.TEST_SIZE * 2,
629                          writer.bytes_expected)
630         writer = arv_put.ArvPutUploadJob([tree, __file__])
631         self.assertEqual(self.TEST_SIZE * 3,
632                          writer.bytes_expected)
633
634     def test_expected_bytes_for_device(self):
635         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
636         self.assertIsNone(writer.bytes_expected)
637         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
638         self.assertIsNone(writer.bytes_expected)
639
640
641 class ArvadosPutReportTest(ArvadosBaseTestCase):
642     def test_machine_progress(self):
643         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
644             expect = ": {} written {} total\n".format(
645                 count, -1 if (total is None) else total)
646             self.assertTrue(
647                 arv_put.machine_progress(count, total).endswith(expect))
648
649     def test_known_human_progress(self):
650         for count, total in [(0, 1), (2, 4), (45, 60)]:
651             expect = '{:.1%}'.format(1.0*count/total)
652             actual = arv_put.human_progress(count, total)
653             self.assertTrue(actual.startswith('\r'))
654             self.assertIn(expect, actual)
655
656     def test_unknown_human_progress(self):
657         for count in [1, 20, 300, 4000, 50000]:
658             self.assertTrue(re.search(r'\b{}\b'.format(count),
659                                       arv_put.human_progress(count, None)))
660
661
662 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
663     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
664
665     def setUp(self):
666         super(ArvPutLogFormatterTest, self).setUp()
667         self.stderr = tutil.StringIO()
668         self.loggingHandler = logging.StreamHandler(self.stderr)
669         self.loggingHandler.setFormatter(
670             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
671         self.logger = logging.getLogger()
672         self.logger.addHandler(self.loggingHandler)
673         self.logger.setLevel(logging.DEBUG)
674
675     def tearDown(self):
676         self.logger.removeHandler(self.loggingHandler)
677         self.stderr.close()
678         self.stderr = None
679         super(ArvPutLogFormatterTest, self).tearDown()
680
681     def test_request_id_logged_only_once_on_error(self):
682         self.logger.error('Ooops, something bad happened.')
683         self.logger.error('Another bad thing just happened.')
684         log_lines = self.stderr.getvalue().split('\n')[:-1]
685         self.assertEqual(2, len(log_lines))
686         self.assertRegex(log_lines[0], self.matcher)
687         self.assertNotRegex(log_lines[1], self.matcher)
688
689     def test_request_id_logged_only_once_on_debug(self):
690         self.logger.debug('This is just a debug message.')
691         self.logger.debug('Another message, move along.')
692         log_lines = self.stderr.getvalue().split('\n')[:-1]
693         self.assertEqual(2, len(log_lines))
694         self.assertRegex(log_lines[0], self.matcher)
695         self.assertNotRegex(log_lines[1], self.matcher)
696
697     def test_request_id_not_logged_on_info(self):
698         self.logger.info('This should be a useful message')
699         log_lines = self.stderr.getvalue().split('\n')[:-1]
700         self.assertEqual(1, len(log_lines))
701         self.assertNotRegex(log_lines[0], self.matcher)
702
703 class ArvadosPutTest(run_test_server.TestCaseWithServers,
704                      ArvadosBaseTestCase,
705                      tutil.VersionChecker):
706     MAIN_SERVER = {}
707     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
708
709     def call_main_with_args(self, args):
710         self.main_stdout.seek(0, 0)
711         self.main_stdout.truncate(0)
712         self.main_stderr.seek(0, 0)
713         self.main_stderr.truncate(0)
714         return arv_put.main(args, self.main_stdout, self.main_stderr)
715
716     def call_main_on_test_file(self, args=[]):
717         with self.make_test_file() as testfile:
718             path = testfile.name
719             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
720         self.assertTrue(
721             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
722                                         '098f6bcd4621d373cade4e832627b4f6')),
723             "did not find file stream in Keep store")
724
725     def setUp(self):
726         super(ArvadosPutTest, self).setUp()
727         run_test_server.authorize_with('active')
728         arv_put.api_client = None
729         self.main_stdout = tutil.StringIO()
730         self.main_stderr = tutil.StringIO()
731         self.loggingHandler = logging.StreamHandler(self.main_stderr)
732         self.loggingHandler.setFormatter(
733             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
734         logging.getLogger().addHandler(self.loggingHandler)
735
736     def tearDown(self):
737         logging.getLogger().removeHandler(self.loggingHandler)
738         for outbuf in ['main_stdout', 'main_stderr']:
739             if hasattr(self, outbuf):
740                 getattr(self, outbuf).close()
741                 delattr(self, outbuf)
742         super(ArvadosPutTest, self).tearDown()
743
744     def test_version_argument(self):
745         with tutil.redirected_streams(
746                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
747             with self.assertRaises(SystemExit):
748                 self.call_main_with_args(['--version'])
749         self.assertVersionOutput(out, err)
750
751     def test_simple_file_put(self):
752         self.call_main_on_test_file()
753
754     def test_put_with_unwriteable_cache_dir(self):
755         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
756         cachedir = self.make_tmpdir()
757         os.chmod(cachedir, 0o0)
758         arv_put.ResumeCache.CACHE_DIR = cachedir
759         try:
760             self.call_main_on_test_file()
761         finally:
762             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
763             os.chmod(cachedir, 0o700)
764
765     def test_put_with_unwritable_cache_subdir(self):
766         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
767         cachedir = self.make_tmpdir()
768         os.chmod(cachedir, 0o0)
769         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
770         try:
771             self.call_main_on_test_file()
772         finally:
773             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
774             os.chmod(cachedir, 0o700)
775
776     def test_put_block_replication(self):
777         self.call_main_on_test_file()
778         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
779             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
780             self.call_main_on_test_file(['--replication', '1'])
781             self.call_main_on_test_file(['--replication', '4'])
782             self.call_main_on_test_file(['--replication', '5'])
783             self.assertEqual(
784                 [x[-1].get('copies') for x in put_mock.call_args_list],
785                 [1, 4, 5])
786
787     def test_normalize(self):
788         testfile1 = self.make_test_file()
789         testfile2 = self.make_test_file()
790         test_paths = [testfile1.name, testfile2.name]
791         # Reverse-sort the paths, so normalization must change their order.
792         test_paths.sort(reverse=True)
793         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
794                                  test_paths)
795         manifest = self.main_stdout.getvalue()
796         # Assert the second file we specified appears first in the manifest.
797         file_indices = [manifest.find(':' + os.path.basename(path))
798                         for path in test_paths]
799         self.assertGreater(*file_indices)
800
801     def test_error_name_without_collection(self):
802         self.assertRaises(SystemExit, self.call_main_with_args,
803                           ['--name', 'test without Collection',
804                            '--stream', '/dev/null'])
805
806     def test_error_when_project_not_found(self):
807         self.assertRaises(SystemExit,
808                           self.call_main_with_args,
809                           ['--project-uuid', self.Z_UUID])
810
811     def test_error_bad_project_uuid(self):
812         self.assertRaises(SystemExit,
813                           self.call_main_with_args,
814                           ['--project-uuid', self.Z_UUID, '--stream'])
815
816     def test_error_when_excluding_absolute_path(self):
817         tmpdir = self.make_tmpdir()
818         self.assertRaises(SystemExit,
819                           self.call_main_with_args,
820                           ['--exclude', '/some/absolute/path/*',
821                            tmpdir])
822
823     def test_api_error_handling(self):
824         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
825         coll_save_mock.side_effect = arvados.errors.ApiError(
826             fake_httplib2_response(403), b'{}')
827         with mock.patch('arvados.collection.Collection.save_new',
828                         new=coll_save_mock):
829             with self.assertRaises(SystemExit) as exc_test:
830                 self.call_main_with_args(['/dev/null'])
831             self.assertLess(0, exc_test.exception.args[0])
832             self.assertLess(0, coll_save_mock.call_count)
833             self.assertEqual("", self.main_stdout.getvalue())
834
835     def test_request_id_logging_on_error(self):
836         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
837         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
838         coll_save_mock.side_effect = arvados.errors.ApiError(
839             fake_httplib2_response(403), b'{}')
840         with mock.patch('arvados.collection.Collection.save_new',
841                         new=coll_save_mock):
842             with self.assertRaises(SystemExit):
843                 self.call_main_with_args(['/dev/null'])
844             self.assertRegex(
845                 self.main_stderr.getvalue(), matcher)
846
847
848 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
849                             ArvadosBaseTestCase):
850     MAIN_SERVER = {}
851     KEEP_SERVER = {'blob_signing': True}
852     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
853
854     @classmethod
855     def setUpClass(cls):
856         super(ArvPutIntegrationTest, cls).setUpClass()
857         cls.ENVIRON = os.environ.copy()
858         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
859
860     def datetime_to_hex(self, dt):
861         return hex(int(time.mktime(dt.timetuple())))[2:]
862
863     def setUp(self):
864         super(ArvPutIntegrationTest, self).setUp()
865         arv_put.api_client = None
866
867     def authorize_with(self, token_name):
868         run_test_server.authorize_with(token_name)
869         for v in ["ARVADOS_API_HOST",
870                   "ARVADOS_API_HOST_INSECURE",
871                   "ARVADOS_API_TOKEN"]:
872             self.ENVIRON[v] = arvados.config.settings()[v]
873         arv_put.api_client = arvados.api('v1')
874
875     def current_user(self):
876         return arv_put.api_client.users().current().execute()
877
878     def test_check_real_project_found(self):
879         self.authorize_with('active')
880         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
881                         "did not correctly find test fixture project")
882
883     def test_check_error_finding_nonexistent_uuid(self):
884         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
885         self.authorize_with('active')
886         try:
887             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
888                                                   0)
889         except ValueError as error:
890             self.assertIn(BAD_UUID, str(error))
891         else:
892             self.assertFalse(result, "incorrectly found nonexistent project")
893
894     def test_check_error_finding_nonexistent_project(self):
895         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
896         self.authorize_with('active')
897         with self.assertRaises(apiclient.errors.HttpError):
898             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
899                                                   0)
900
901     def test_short_put_from_stdin(self):
902         # Have to run this as an integration test since arv-put can't
903         # read from the tests' stdin.
904         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
905         # case, because the /proc entry is already gone by the time it tries.
906         pipe = subprocess.Popen(
907             [sys.executable, arv_put.__file__, '--stream'],
908             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
909             stderr=subprocess.STDOUT, env=self.ENVIRON)
910         pipe.stdin.write(b'stdin test\xa6\n')
911         pipe.stdin.close()
912         deadline = time.time() + 5
913         while (pipe.poll() is None) and (time.time() < deadline):
914             time.sleep(.1)
915         returncode = pipe.poll()
916         if returncode is None:
917             pipe.terminate()
918             self.fail("arv-put did not PUT from stdin within 5 seconds")
919         elif returncode != 0:
920             sys.stdout.write(pipe.stdout.read())
921             self.fail("arv-put returned exit code {}".format(returncode))
922         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
923                       pipe.stdout.read().decode())
924
925     def test_sigint_logs_request_id(self):
926         # Start arv-put, give it a chance to start up, send SIGINT,
927         # and check that its output includes the X-Request-Id.
928         input_stream = subprocess.Popen(
929             ['sleep', '10'],
930             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
931         pipe = subprocess.Popen(
932             [sys.executable, arv_put.__file__, '--stream'],
933             stdin=input_stream.stdout, stdout=subprocess.PIPE,
934             stderr=subprocess.STDOUT, env=self.ENVIRON)
935         # Wait for arv-put child process to print something (i.e., a
936         # log message) so we know its signal handler is installed.
937         select.select([pipe.stdout], [], [], 10)
938         pipe.send_signal(signal.SIGINT)
939         deadline = time.time() + 5
940         while (pipe.poll() is None) and (time.time() < deadline):
941             time.sleep(.1)
942         returncode = pipe.poll()
943         input_stream.terminate()
944         if returncode is None:
945             pipe.terminate()
946             self.fail("arv-put did not exit within 5 seconds")
947         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
948
949     def test_ArvPutSignedManifest(self):
950         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
951         # the newly created manifest from the API server, testing to confirm
952         # that the block locators in the returned manifest are signed.
953         self.authorize_with('active')
954
955         # Before doing anything, demonstrate that the collection
956         # we're about to create is not present in our test fixture.
957         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
958         with self.assertRaises(apiclient.errors.HttpError):
959             arv_put.api_client.collections().get(
960                 uuid=manifest_uuid).execute()
961
962         datadir = self.make_tmpdir()
963         with open(os.path.join(datadir, "foo"), "w") as f:
964             f.write("The quick brown fox jumped over the lazy dog")
965         p = subprocess.Popen([sys.executable, arv_put.__file__,
966                               os.path.join(datadir, 'foo')],
967                              stdout=subprocess.PIPE,
968                              stderr=subprocess.PIPE,
969                              env=self.ENVIRON)
970         (_, err) = p.communicate()
971         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
972         self.assertEqual(p.returncode, 0)
973
974         # The manifest text stored in the API server under the same
975         # manifest UUID must use signed locators.
976         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
977         self.assertRegex(
978             c['manifest_text'],
979             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
980
981         os.remove(os.path.join(datadir, "foo"))
982         os.rmdir(datadir)
983
984     def run_and_find_collection(self, text, extra_args=[]):
985         self.authorize_with('active')
986         pipe = subprocess.Popen(
987             [sys.executable, arv_put.__file__] + extra_args,
988             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
989             stderr=subprocess.PIPE, env=self.ENVIRON)
990         stdout, stderr = pipe.communicate(text.encode())
991         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
992         search_key = ('portable_data_hash'
993                       if '--portable-data-hash' in extra_args else 'uuid')
994         collection_list = arvados.api('v1').collections().list(
995             filters=[[search_key, '=', stdout.decode().strip()]]
996         ).execute().get('items', [])
997         self.assertEqual(1, len(collection_list))
998         return collection_list[0]
999
1000     def test_all_expired_signatures_invalidates_cache(self):
1001         self.authorize_with('active')
1002         tmpdir = self.make_tmpdir()
1003         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1004             f.write('foo')
1005         # Upload a directory and get the cache file name
1006         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1007                              stdout=subprocess.PIPE,
1008                              stderr=subprocess.PIPE,
1009                              env=self.ENVIRON)
1010         (_, err) = p.communicate()
1011         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1012         self.assertEqual(p.returncode, 0)
1013         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1014                                    err.decode()).groups()[0]
1015         self.assertTrue(os.path.isfile(cache_filepath))
1016         # Load the cache file contents and modify the manifest to simulate
1017         # an expired access token
1018         with open(cache_filepath, 'r') as c:
1019             cache = json.load(c)
1020         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1021         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1022         cache['manifest'] = re.sub(
1023             r'\@.*? ',
1024             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1025             cache['manifest'])
1026         with open(cache_filepath, 'w') as c:
1027             c.write(json.dumps(cache))
1028         # Re-run the upload and expect to get an invalid cache message
1029         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1030                              stdout=subprocess.PIPE,
1031                              stderr=subprocess.PIPE,
1032                              env=self.ENVIRON)
1033         (_, err) = p.communicate()
1034         self.assertRegex(
1035             err.decode(),
1036             r'INFO: Cache expired, starting from scratch.*')
1037         self.assertEqual(p.returncode, 0)
1038
1039     def test_invalid_signature_invalidates_cache(self):
1040         self.authorize_with('active')
1041         tmpdir = self.make_tmpdir()
1042         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1043             f.write('foo')
1044         # Upload a directory and get the cache file name
1045         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1046                              stdout=subprocess.PIPE,
1047                              stderr=subprocess.PIPE,
1048                              env=self.ENVIRON)
1049         (_, err) = p.communicate()
1050         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1051         self.assertEqual(p.returncode, 0)
1052         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1053                                    err.decode()).groups()[0]
1054         self.assertTrue(os.path.isfile(cache_filepath))
1055         # Load the cache file contents and modify the manifest to simulate
1056         # an invalid access token
1057         with open(cache_filepath, 'r') as c:
1058             cache = json.load(c)
1059         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1060         cache['manifest'] = re.sub(
1061             r'\+A.*\@',
1062             "+Aabcdef0123456789abcdef0123456789abcdef01@",
1063             cache['manifest'])
1064         with open(cache_filepath, 'w') as c:
1065             c.write(json.dumps(cache))
1066         # Re-run the upload and expect to get an invalid cache message
1067         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1068                              stdout=subprocess.PIPE,
1069                              stderr=subprocess.PIPE,
1070                              env=self.ENVIRON)
1071         (_, err) = p.communicate()
1072         self.assertRegex(
1073             err.decode(),
1074             r'ERROR: arv-put: Resume cache contains invalid signature.*')
1075         self.assertEqual(p.returncode, 1)
1076
1077     def test_single_expired_signature_reuploads_file(self):
1078         self.authorize_with('active')
1079         tmpdir = self.make_tmpdir()
1080         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1081             f.write('foo')
1082         # Write a second file on its own subdir to force a new stream
1083         os.mkdir(os.path.join(tmpdir, 'bar'))
1084         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1085             f.write('bar')
1086         # Upload a directory and get the cache file name
1087         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1088                              stdout=subprocess.PIPE,
1089                              stderr=subprocess.PIPE,
1090                              env=self.ENVIRON)
1091         (_, err) = p.communicate()
1092         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1093         self.assertEqual(p.returncode, 0)
1094         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1095                                    err.decode()).groups()[0]
1096         self.assertTrue(os.path.isfile(cache_filepath))
1097         # Load the cache file contents and modify the manifest to simulate
1098         # an expired access token
1099         with open(cache_filepath, 'r') as c:
1100             cache = json.load(c)
1101         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1102         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1103         # Make one of the signatures appear to have expired
1104         cache['manifest'] = re.sub(
1105             r'\@.*? 3:3:barfile.txt',
1106             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1107             cache['manifest'])
1108         with open(cache_filepath, 'w') as c:
1109             c.write(json.dumps(cache))
1110         # Re-run the upload and expect to get an invalid cache message
1111         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1112                              stdout=subprocess.PIPE,
1113                              stderr=subprocess.PIPE,
1114                              env=self.ENVIRON)
1115         (_, err) = p.communicate()
1116         self.assertRegex(
1117             err.decode(),
1118             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1119         self.assertEqual(p.returncode, 0)
1120         # Confirm that the resulting cache is different from the last run.
1121         with open(cache_filepath, 'r') as c2:
1122             new_cache = json.load(c2)
1123         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1124
1125     def test_put_collection_with_later_update(self):
1126         tmpdir = self.make_tmpdir()
1127         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1128             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1129         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1130         self.assertNotEqual(None, col['uuid'])
1131         # Add a new file to the directory
1132         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1133             f.write('The quick brown fox jumped over the lazy dog')
1134         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1135         self.assertEqual(col['uuid'], updated_col['uuid'])
1136         # Get the manifest and check that the new file is being included
1137         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1138         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1139
1140     def test_put_collection_with_utc_expiring_datetime(self):
1141         tmpdir = self.make_tmpdir()
1142         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1143         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1144             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1145         col = self.run_and_find_collection(
1146             "",
1147             ['--no-progress', '--trash-at', trash_at, tmpdir])
1148         self.assertNotEqual(None, col['uuid'])
1149         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1150         self.assertEqual(ciso8601.parse_datetime(trash_at),
1151             ciso8601.parse_datetime(c['trash_at']))
1152
1153     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1154         tmpdir = self.make_tmpdir()
1155         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1156         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1157             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1158         col = self.run_and_find_collection(
1159             "",
1160             ['--no-progress', '--trash-at', trash_at, tmpdir])
1161         self.assertNotEqual(None, col['uuid'])
1162         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1163         self.assertEqual(
1164             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1165             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1166
1167     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1168         tmpdir = self.make_tmpdir()
1169         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1170         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1171             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1172         col = self.run_and_find_collection(
1173             "",
1174             ['--no-progress', '--trash-at', trash_at, tmpdir])
1175         self.assertNotEqual(None, col['uuid'])
1176         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1177         if time.daylight:
1178             offset = datetime.timedelta(seconds=time.altzone)
1179         else:
1180             offset = datetime.timedelta(seconds=time.timezone)
1181         self.assertEqual(
1182             ciso8601.parse_datetime(trash_at) + offset,
1183             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1184
1185     def test_put_collection_with_expiring_date_only(self):
1186         tmpdir = self.make_tmpdir()
1187         trash_at = '2140-01-01'
1188         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1189         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1190             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1191         col = self.run_and_find_collection(
1192             "",
1193             ['--no-progress', '--trash-at', trash_at, tmpdir])
1194         self.assertNotEqual(None, col['uuid'])
1195         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1196         if time.daylight:
1197             offset = datetime.timedelta(seconds=time.altzone)
1198         else:
1199             offset = datetime.timedelta(seconds=time.timezone)
1200         self.assertEqual(
1201             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1202             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1203
1204     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1205         cases = ['2100', '210010','2100-10', '2100-Oct']
1206         tmpdir = self.make_tmpdir()
1207         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1208             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1209         for test_datetime in cases:
1210             with self.assertRaises(AssertionError):
1211                 self.run_and_find_collection(
1212                     "",
1213                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1214
1215     def test_put_collection_with_relative_expiring_datetime(self):
1216         expire_after = 7
1217         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1218         tmpdir = self.make_tmpdir()
1219         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1220             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1221         col = self.run_and_find_collection(
1222             "",
1223             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1224         self.assertNotEqual(None, col['uuid'])
1225         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1226         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1227         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1228         self.assertTrue(dt_before < trash_at)
1229         self.assertTrue(dt_after > trash_at)
1230
1231     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1232         expire_after = 0 # Must be >= 1
1233         tmpdir = self.make_tmpdir()
1234         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1235             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1236         with self.assertRaises(AssertionError):
1237             self.run_and_find_collection(
1238                 "",
1239                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1240
1241     def test_upload_directory_reference_without_trailing_slash(self):
1242         tmpdir1 = self.make_tmpdir()
1243         tmpdir2 = self.make_tmpdir()
1244         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1245             f.write('This is foo')
1246         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1247             f.write('This is not foo')
1248         # Upload one directory and one file
1249         col = self.run_and_find_collection("", ['--no-progress',
1250                                                 tmpdir1,
1251                                                 os.path.join(tmpdir2, 'bar')])
1252         self.assertNotEqual(None, col['uuid'])
1253         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1254         # Check that 'foo' was written inside a subcollection
1255         # OTOH, 'bar' should have been directly uploaded on the root collection
1256         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1257
1258     def test_upload_directory_reference_with_trailing_slash(self):
1259         tmpdir1 = self.make_tmpdir()
1260         tmpdir2 = self.make_tmpdir()
1261         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1262             f.write('This is foo')
1263         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1264             f.write('This is not foo')
1265         # Upload one directory (with trailing slash) and one file
1266         col = self.run_and_find_collection("", ['--no-progress',
1267                                                 tmpdir1 + os.sep,
1268                                                 os.path.join(tmpdir2, 'bar')])
1269         self.assertNotEqual(None, col['uuid'])
1270         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1271         # Check that 'foo' and 'bar' were written at the same level
1272         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1273
1274     def test_put_collection_with_high_redundancy(self):
1275         # Write empty data: we're not testing CollectionWriter, just
1276         # making sure collections.create tells the API server what our
1277         # desired replication level is.
1278         collection = self.run_and_find_collection("", ['--replication', '4'])
1279         self.assertEqual(4, collection['replication_desired'])
1280
1281     def test_put_collection_with_default_redundancy(self):
1282         collection = self.run_and_find_collection("")
1283         self.assertEqual(None, collection['replication_desired'])
1284
1285     def test_put_collection_with_unnamed_project_link(self):
1286         link = self.run_and_find_collection(
1287             "Test unnamed collection",
1288             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1289         username = pwd.getpwuid(os.getuid()).pw_name
1290         self.assertRegex(
1291             link['name'],
1292             r'^Saved at .* by {}@'.format(re.escape(username)))
1293
1294     def test_put_collection_with_name_and_no_project(self):
1295         link_name = 'Test Collection Link in home project'
1296         collection = self.run_and_find_collection(
1297             "Test named collection in home project",
1298             ['--portable-data-hash', '--name', link_name])
1299         self.assertEqual(link_name, collection['name'])
1300         my_user_uuid = self.current_user()['uuid']
1301         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1302
1303     def test_put_collection_with_named_project_link(self):
1304         link_name = 'Test auto Collection Link'
1305         collection = self.run_and_find_collection("Test named collection",
1306                                       ['--portable-data-hash',
1307                                        '--name', link_name,
1308                                        '--project-uuid', self.PROJECT_UUID])
1309         self.assertEqual(link_name, collection['name'])
1310
1311     def test_put_collection_with_storage_classes_specified(self):
1312         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1313         self.assertEqual(len(collection['storage_classes_desired']), 1)
1314         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1315
1316     def test_put_collection_with_multiple_storage_classes_specified(self):
1317         collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar  ,baz'])
1318         self.assertEqual(len(collection['storage_classes_desired']), 3)
1319         self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
1320
1321     def test_put_collection_without_storage_classes_specified(self):
1322         collection = self.run_and_find_collection("")
1323         self.assertEqual(len(collection['storage_classes_desired']), 1)
1324         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1325
1326     def test_exclude_filename_pattern(self):
1327         tmpdir = self.make_tmpdir()
1328         tmpsubdir = os.path.join(tmpdir, 'subdir')
1329         os.mkdir(tmpsubdir)
1330         for fname in ['file1', 'file2', 'file3']:
1331             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1332                 f.write("This is %s" % fname)
1333             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1334                 f.write("This is %s" % fname)
1335         col = self.run_and_find_collection("", ['--no-progress',
1336                                                 '--exclude', '*2.txt',
1337                                                 '--exclude', 'file3.*',
1338                                                  tmpdir])
1339         self.assertNotEqual(None, col['uuid'])
1340         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1341         # None of the file2.txt & file3.txt should have been uploaded
1342         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1343         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1344         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1345
1346     def test_exclude_filepath_pattern(self):
1347         tmpdir = self.make_tmpdir()
1348         tmpsubdir = os.path.join(tmpdir, 'subdir')
1349         os.mkdir(tmpsubdir)
1350         for fname in ['file1', 'file2', 'file3']:
1351             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1352                 f.write("This is %s" % fname)
1353             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1354                 f.write("This is %s" % fname)
1355         col = self.run_and_find_collection("", ['--no-progress',
1356                                                 '--exclude', 'subdir/*2.txt',
1357                                                 '--exclude', './file1.*',
1358                                                  tmpdir])
1359         self.assertNotEqual(None, col['uuid'])
1360         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1361         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1362         self.assertNotRegex(c['manifest_text'],
1363                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1364         self.assertNotRegex(c['manifest_text'],
1365                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1366         self.assertRegex(c['manifest_text'],
1367                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1368         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1369
1370     def test_unicode_on_filename(self):
1371         tmpdir = self.make_tmpdir()
1372         fname = u"iā¤arvados.txt"
1373         with open(os.path.join(tmpdir, fname), 'w') as f:
1374             f.write("This is a unicode named file")
1375         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1376         self.assertNotEqual(None, col['uuid'])
1377         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1378         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1379
1380     def test_silent_mode_no_errors(self):
1381         self.authorize_with('active')
1382         tmpdir = self.make_tmpdir()
1383         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1384             f.write('hello world')
1385         pipe = subprocess.Popen(
1386             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1387             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1388             stderr=subprocess.PIPE, env=self.ENVIRON)
1389         stdout, stderr = pipe.communicate()
1390         # No console output should occur on normal operations
1391         self.assertNotRegex(stderr.decode(), r'.+')
1392         self.assertNotRegex(stdout.decode(), r'.+')
1393
1394     def test_silent_mode_does_not_avoid_error_messages(self):
1395         self.authorize_with('active')
1396         pipe = subprocess.Popen(
1397             [sys.executable, arv_put.__file__] + ['--silent',
1398                                                   '/path/not/existant'],
1399             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1400             stderr=subprocess.PIPE, env=self.ENVIRON)
1401         stdout, stderr = pipe.communicate()
1402         # Error message should be displayed when errors happen
1403         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1404         self.assertNotRegex(stdout.decode(), r'.+')
1405
1406
1407 if __name__ == '__main__':
1408     unittest.main()