Merge branch '21718-memoryview-readfrom-v2' refs #21718
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 import apiclient
8 import ciso8601
9 import copy
10 import datetime
11 import json
12 import logging
13 import multiprocessing
14 import os
15 import pwd
16 import random
17 import re
18 import select
19 import shutil
20 import signal
21 import subprocess
22 import sys
23 import tempfile
24 import time
25 import unittest
26 import uuid
27
28 from functools import partial
29 from unittest import mock
30
31 import arvados
32 import arvados.commands.put as arv_put
33 from . import arvados_testutil as tutil
34
35 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
36 from . import run_test_server
37
38 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
39     CACHE_ARGSET = [
40         [],
41         ['/dev/null'],
42         ['/dev/null', '--filename', 'empty'],
43         ['/tmp']
44         ]
45
46     def tearDown(self):
47         super(ArvadosPutResumeCacheTest, self).tearDown()
48         try:
49             self.last_cache.destroy()
50         except AttributeError:
51             pass
52
53     def cache_path_from_arglist(self, arglist):
54         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
55
56     def test_cache_names_stable(self):
57         for argset in self.CACHE_ARGSET:
58             self.assertEqual(self.cache_path_from_arglist(argset),
59                               self.cache_path_from_arglist(argset),
60                               "cache name changed for {}".format(argset))
61
62     def test_cache_names_unique(self):
63         results = []
64         for argset in self.CACHE_ARGSET:
65             path = self.cache_path_from_arglist(argset)
66             self.assertNotIn(path, results)
67             results.append(path)
68
69     def test_cache_names_simple(self):
70         # The goal here is to make sure the filename doesn't use characters
71         # reserved by the filesystem.  Feel free to adjust this regexp as
72         # long as it still does that.
73         bad_chars = re.compile(r'[^-\.\w]')
74         for argset in self.CACHE_ARGSET:
75             path = self.cache_path_from_arglist(argset)
76             self.assertFalse(bad_chars.search(os.path.basename(path)),
77                              "path too exotic: {}".format(path))
78
79     def test_cache_names_ignore_argument_order(self):
80         self.assertEqual(
81             self.cache_path_from_arglist(['a', 'b', 'c']),
82             self.cache_path_from_arglist(['c', 'a', 'b']))
83         self.assertEqual(
84             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
85             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
86
87     def test_cache_names_differ_for_similar_paths(self):
88         # This test needs names at / that don't exist on the real filesystem.
89         self.assertNotEqual(
90             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
91             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
92
93     def test_cache_names_ignore_irrelevant_arguments(self):
94         # Workaround: parse_arguments bails on --filename with a directory.
95         path1 = self.cache_path_from_arglist(['/tmp'])
96         args = arv_put.parse_arguments(['/tmp'])
97         args.filename = 'tmp'
98         path2 = arv_put.ResumeCache.make_path(args)
99         self.assertEqual(path1, path2,
100                          "cache path considered --filename for directory")
101         self.assertEqual(
102             self.cache_path_from_arglist(['-']),
103             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
104             "cache path considered --max-manifest-depth for file")
105
106     def test_cache_names_treat_negative_manifest_depths_identically(self):
107         base_args = ['/tmp', '--max-manifest-depth']
108         self.assertEqual(
109             self.cache_path_from_arglist(base_args + ['-1']),
110             self.cache_path_from_arglist(base_args + ['-2']))
111
112     def test_cache_names_treat_stdin_consistently(self):
113         self.assertEqual(
114             self.cache_path_from_arglist(['-', '--filename', 'test']),
115             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
116
117     def test_cache_names_identical_for_synonymous_names(self):
118         self.assertEqual(
119             self.cache_path_from_arglist(['.']),
120             self.cache_path_from_arglist([os.path.realpath('.')]))
121         testdir = self.make_tmpdir()
122         looplink = os.path.join(testdir, 'loop')
123         os.symlink(testdir, looplink)
124         self.assertEqual(
125             self.cache_path_from_arglist([testdir]),
126             self.cache_path_from_arglist([looplink]))
127
128     def test_cache_names_different_by_api_host(self):
129         config = arvados.config.settings()
130         orig_host = config.get('ARVADOS_API_HOST')
131         try:
132             name1 = self.cache_path_from_arglist(['.'])
133             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
134             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
135         finally:
136             if orig_host is None:
137                 del config['ARVADOS_API_HOST']
138             else:
139                 config['ARVADOS_API_HOST'] = orig_host
140
141     @mock.patch('arvados.keep.KeepClient.head')
142     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
143         keep_client_head.side_effect = [True]
144         thing = {}
145         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
146         with tempfile.NamedTemporaryFile() as cachefile:
147             self.last_cache = arv_put.ResumeCache(cachefile.name)
148         self.last_cache.save(thing)
149         self.last_cache.close()
150         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
151         self.assertNotEqual(None, resume_cache)
152
153     @mock.patch('arvados.keep.KeepClient.head')
154     def test_resume_cache_with_finished_streams(self, keep_client_head):
155         keep_client_head.side_effect = [True]
156         thing = {}
157         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
158         with tempfile.NamedTemporaryFile() as cachefile:
159             self.last_cache = arv_put.ResumeCache(cachefile.name)
160         self.last_cache.save(thing)
161         self.last_cache.close()
162         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
163         self.assertNotEqual(None, resume_cache)
164
165     @mock.patch('arvados.keep.KeepClient.head')
166     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
167         keep_client_head.side_effect = Exception('Locator not found')
168         thing = {}
169         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
170         with tempfile.NamedTemporaryFile() as cachefile:
171             self.last_cache = arv_put.ResumeCache(cachefile.name)
172         self.last_cache.save(thing)
173         self.last_cache.close()
174         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
175         self.assertNotEqual(None, resume_cache)
176         resume_cache.check_cache()
177
178     def test_basic_cache_storage(self):
179         thing = ['test', 'list']
180         with tempfile.NamedTemporaryFile() as cachefile:
181             self.last_cache = arv_put.ResumeCache(cachefile.name)
182         self.last_cache.save(thing)
183         self.assertEqual(thing, self.last_cache.load())
184
185     def test_empty_cache(self):
186         with tempfile.NamedTemporaryFile() as cachefile:
187             cache = arv_put.ResumeCache(cachefile.name)
188         self.assertRaises(ValueError, cache.load)
189
190     def test_cache_persistent(self):
191         thing = ['test', 'list']
192         path = os.path.join(self.make_tmpdir(), 'cache')
193         cache = arv_put.ResumeCache(path)
194         cache.save(thing)
195         cache.close()
196         self.last_cache = arv_put.ResumeCache(path)
197         self.assertEqual(thing, self.last_cache.load())
198
199     def test_multiple_cache_writes(self):
200         thing = ['short', 'list']
201         with tempfile.NamedTemporaryFile() as cachefile:
202             self.last_cache = arv_put.ResumeCache(cachefile.name)
203         # Start writing an object longer than the one we test, to make
204         # sure the cache file gets truncated.
205         self.last_cache.save(['long', 'long', 'list'])
206         self.last_cache.save(thing)
207         self.assertEqual(thing, self.last_cache.load())
208
209     def test_cache_is_locked(self):
210         with tempfile.NamedTemporaryFile() as cachefile:
211             _ = arv_put.ResumeCache(cachefile.name)
212             self.assertRaises(arv_put.ResumeCacheConflict,
213                               arv_put.ResumeCache, cachefile.name)
214
215     def test_cache_stays_locked(self):
216         with tempfile.NamedTemporaryFile() as cachefile:
217             self.last_cache = arv_put.ResumeCache(cachefile.name)
218             path = cachefile.name
219         self.last_cache.save('test')
220         self.assertRaises(arv_put.ResumeCacheConflict,
221                           arv_put.ResumeCache, path)
222
223     def test_destroy_cache(self):
224         cachefile = tempfile.NamedTemporaryFile(delete=False)
225         try:
226             cache = arv_put.ResumeCache(cachefile.name)
227             cache.save('test')
228             cache.destroy()
229             try:
230                 arv_put.ResumeCache(cachefile.name)
231             except arv_put.ResumeCacheConflict:
232                 self.fail("could not load cache after destroying it")
233             self.assertRaises(ValueError, cache.load)
234         finally:
235             if os.path.exists(cachefile.name):
236                 os.unlink(cachefile.name)
237
238     def test_restart_cache(self):
239         path = os.path.join(self.make_tmpdir(), 'cache')
240         cache = arv_put.ResumeCache(path)
241         cache.save('test')
242         cache.restart()
243         self.assertRaises(ValueError, cache.load)
244         self.assertRaises(arv_put.ResumeCacheConflict,
245                           arv_put.ResumeCache, path)
246
247
248 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
249                           ArvadosBaseTestCase):
250
251     def setUp(self):
252         super(ArvPutUploadJobTest, self).setUp()
253         run_test_server.authorize_with('active')
254         # Temp files creation
255         self.tempdir = tempfile.mkdtemp()
256         subdir = os.path.join(self.tempdir, 'subdir')
257         os.mkdir(subdir)
258         data = "x" * 1024 # 1 KB
259         for i in range(1, 5):
260             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
261                 f.write(data * i)
262         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
263             f.write(data * 5)
264         # Large temp file for resume test
265         _, self.large_file_name = tempfile.mkstemp()
266         fileobj = open(self.large_file_name, 'w')
267         # Make sure to write just a little more than one block
268         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
269             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
270             fileobj.write(data)
271         fileobj.close()
272         # Temp dir containing small files to be repacked
273         self.small_files_dir = tempfile.mkdtemp()
274         data = 'y' * 1024 * 1024 # 1 MB
275         for i in range(1, 70):
276             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
277                 f.write(data + str(i))
278         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
279         # Temp dir to hold a symlink to other temp dir
280         self.tempdir_with_symlink = tempfile.mkdtemp()
281         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
282         os.symlink(os.path.join(self.tempdir, '1'),
283                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
284
285     def tearDown(self):
286         super(ArvPutUploadJobTest, self).tearDown()
287         shutil.rmtree(self.tempdir)
288         os.unlink(self.large_file_name)
289         shutil.rmtree(self.small_files_dir)
290         shutil.rmtree(self.tempdir_with_symlink)
291
292     def test_non_regular_files_are_ignored_except_symlinks_to_dirs(self):
293         def pfunc(x):
294             with open(x, 'w') as f:
295                 f.write('test')
296         fifo_filename = 'fifo-file'
297         fifo_path = os.path.join(self.tempdir_with_symlink, fifo_filename)
298         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
299         os.mkfifo(fifo_path)
300         producer = multiprocessing.Process(target=pfunc, args=(fifo_path,))
301         producer.start()
302         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
303         cwriter.start(save_collection=False)
304         if producer.exitcode is None:
305             # If the producer is still running, kill it. This should always be
306             # before any assertion that may fail.
307             producer.terminate()
308             producer.join(1)
309         self.assertIn('linkeddir', cwriter.manifest_text())
310         self.assertNotIn(fifo_filename, cwriter.manifest_text())
311
312     def test_symlinks_are_followed_by_default(self):
313         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
314         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
315         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
316         cwriter.start(save_collection=False)
317         self.assertIn('linkeddir', cwriter.manifest_text())
318         self.assertIn('linkedfile', cwriter.manifest_text())
319         cwriter.destroy_cache()
320
321     def test_symlinks_are_not_followed_when_requested(self):
322         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
323         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
324         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
325                                           follow_links=False)
326         cwriter.start(save_collection=False)
327         self.assertNotIn('linkeddir', cwriter.manifest_text())
328         self.assertNotIn('linkedfile', cwriter.manifest_text())
329         cwriter.destroy_cache()
330         # Check for bug #17800: passed symlinks should also be ignored.
331         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
332         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
333         cwriter.start(save_collection=False)
334         self.assertNotIn('linkeddir', cwriter.manifest_text())
335         cwriter.destroy_cache()
336
337     def test_no_empty_collection_saved(self):
338         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
339         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
340         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
341         cwriter.start(save_collection=True)
342         self.assertIsNone(cwriter.manifest_locator())
343         self.assertEqual('', cwriter.manifest_text())
344         cwriter.destroy_cache()
345
346     def test_passing_nonexistant_path_raise_exception(self):
347         uuid_str = str(uuid.uuid4())
348         with self.assertRaises(arv_put.PathDoesNotExistError):
349             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
350
351     def test_writer_works_without_cache(self):
352         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
353         cwriter.start(save_collection=False)
354         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
355
356     def test_writer_works_with_cache(self):
357         with tempfile.NamedTemporaryFile() as f:
358             f.write(b'foo')
359             f.flush()
360             cwriter = arv_put.ArvPutUploadJob([f.name])
361             cwriter.start(save_collection=False)
362             self.assertEqual(0, cwriter.bytes_skipped)
363             self.assertEqual(3, cwriter.bytes_written)
364             # Don't destroy the cache, and start another upload
365             cwriter_new = arv_put.ArvPutUploadJob([f.name])
366             cwriter_new.start(save_collection=False)
367             cwriter_new.destroy_cache()
368             self.assertEqual(3, cwriter_new.bytes_skipped)
369             self.assertEqual(3, cwriter_new.bytes_written)
370
371     def make_progress_tester(self):
372         progression = []
373         def record_func(written, expected):
374             progression.append((written, expected))
375         return progression, record_func
376
377     def test_progress_reporting(self):
378         with tempfile.NamedTemporaryFile() as f:
379             f.write(b'foo')
380             f.flush()
381             for expect_count in (None, 8):
382                 progression, reporter = self.make_progress_tester()
383                 cwriter = arv_put.ArvPutUploadJob([f.name],
384                                                   reporter=reporter)
385                 cwriter.bytes_expected = expect_count
386                 cwriter.start(save_collection=False)
387                 cwriter.destroy_cache()
388                 self.assertIn((3, expect_count), progression)
389
390     def test_writer_upload_directory(self):
391         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
392         cwriter.start(save_collection=False)
393         cwriter.destroy_cache()
394         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
395
396     def test_resume_large_file_upload(self):
397         def wrapped_write(*args, **kwargs):
398             data = args[1]
399             # Exit only on last block
400             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
401                 # Simulate a checkpoint before quitting. Ensure block commit.
402                 self.writer._update(final=True)
403                 raise SystemExit("Simulated error")
404             return self.arvfile_write(*args, **kwargs)
405
406         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
407                         autospec=True) as mocked_write:
408             mocked_write.side_effect = wrapped_write
409             writer = arv_put.ArvPutUploadJob([self.large_file_name],
410                                              replication_desired=1)
411             # We'll be accessing from inside the wrapper
412             self.writer = writer
413             with self.assertRaises(SystemExit):
414                 writer.start(save_collection=False)
415             # Confirm that the file was partially uploaded
416             self.assertGreater(writer.bytes_written, 0)
417             self.assertLess(writer.bytes_written,
418                             os.path.getsize(self.large_file_name))
419         # Retry the upload
420         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
421                                           replication_desired=1)
422         writer2.start(save_collection=False)
423         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
424                          os.path.getsize(self.large_file_name))
425         writer2.destroy_cache()
426         del(self.writer)
427
428     # Test for bug #11002
429     def test_graceful_exit_while_repacking_small_blocks(self):
430         def wrapped_commit(*args, **kwargs):
431             raise SystemExit("Simulated error")
432
433         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
434                         autospec=True) as mocked_commit:
435             mocked_commit.side_effect = wrapped_commit
436             # Upload a little more than 1 block, wrapped_commit will make the first block
437             # commit to fail.
438             # arv-put should not exit with an exception by trying to commit the collection
439             # as it's in an inconsistent state.
440             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
441                                              replication_desired=1)
442             try:
443                 with self.assertRaises(SystemExit):
444                     writer.start(save_collection=False)
445             except arvados.arvfile.UnownedBlockError:
446                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
447         writer.destroy_cache()
448
449     def test_no_resume_when_asked(self):
450         def wrapped_write(*args, **kwargs):
451             data = args[1]
452             # Exit only on last block
453             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
454                 # Simulate a checkpoint before quitting.
455                 self.writer._update()
456                 raise SystemExit("Simulated error")
457             return self.arvfile_write(*args, **kwargs)
458
459         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
460                         autospec=True) as mocked_write:
461             mocked_write.side_effect = wrapped_write
462             writer = arv_put.ArvPutUploadJob([self.large_file_name],
463                                              replication_desired=1)
464             # We'll be accessing from inside the wrapper
465             self.writer = writer
466             with self.assertRaises(SystemExit):
467                 writer.start(save_collection=False)
468             # Confirm that the file was partially uploaded
469             self.assertGreater(writer.bytes_written, 0)
470             self.assertLess(writer.bytes_written,
471                             os.path.getsize(self.large_file_name))
472         # Retry the upload, this time without resume
473         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
474                                           replication_desired=1,
475                                           resume=False)
476         writer2.start(save_collection=False)
477         self.assertEqual(writer2.bytes_skipped, 0)
478         self.assertEqual(writer2.bytes_written,
479                          os.path.getsize(self.large_file_name))
480         writer2.destroy_cache()
481         del(self.writer)
482
483     def test_no_resume_when_no_cache(self):
484         def wrapped_write(*args, **kwargs):
485             data = args[1]
486             # Exit only on last block
487             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
488                 # Simulate a checkpoint before quitting.
489                 self.writer._update()
490                 raise SystemExit("Simulated error")
491             return self.arvfile_write(*args, **kwargs)
492
493         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
494                         autospec=True) as mocked_write:
495             mocked_write.side_effect = wrapped_write
496             writer = arv_put.ArvPutUploadJob([self.large_file_name],
497                                              replication_desired=1)
498             # We'll be accessing from inside the wrapper
499             self.writer = writer
500             with self.assertRaises(SystemExit):
501                 writer.start(save_collection=False)
502             # Confirm that the file was partially uploaded
503             self.assertGreater(writer.bytes_written, 0)
504             self.assertLess(writer.bytes_written,
505                             os.path.getsize(self.large_file_name))
506         # Retry the upload, this time without cache usage
507         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
508                                           replication_desired=1,
509                                           resume=False,
510                                           use_cache=False)
511         writer2.start(save_collection=False)
512         self.assertEqual(writer2.bytes_skipped, 0)
513         self.assertEqual(writer2.bytes_written,
514                          os.path.getsize(self.large_file_name))
515         writer2.destroy_cache()
516         del(self.writer)
517
518     def test_dry_run_feature(self):
519         def wrapped_write(*args, **kwargs):
520             data = args[1]
521             # Exit only on last block
522             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
523                 # Simulate a checkpoint before quitting.
524                 self.writer._update()
525                 raise SystemExit("Simulated error")
526             return self.arvfile_write(*args, **kwargs)
527
528         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
529                         autospec=True) as mocked_write:
530             mocked_write.side_effect = wrapped_write
531             writer = arv_put.ArvPutUploadJob([self.large_file_name],
532                                              replication_desired=1)
533             # We'll be accessing from inside the wrapper
534             self.writer = writer
535             with self.assertRaises(SystemExit):
536                 writer.start(save_collection=False)
537             # Confirm that the file was partially uploaded
538             self.assertGreater(writer.bytes_written, 0)
539             self.assertLess(writer.bytes_written,
540                             os.path.getsize(self.large_file_name))
541         with self.assertRaises(arv_put.ArvPutUploadIsPending):
542             # Retry the upload using dry_run to check if there is a pending upload
543             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
544                                               replication_desired=1,
545                                               dry_run=True)
546         # Complete the pending upload
547         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
548                                           replication_desired=1)
549         writer3.start(save_collection=False)
550         with self.assertRaises(arv_put.ArvPutUploadNotPending):
551             # Confirm there's no pending upload with dry_run=True
552             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
553                                               replication_desired=1,
554                                               dry_run=True)
555         # Test obvious cases
556         with self.assertRaises(arv_put.ArvPutUploadIsPending):
557             arv_put.ArvPutUploadJob([self.large_file_name],
558                                     replication_desired=1,
559                                     dry_run=True,
560                                     resume=False,
561                                     use_cache=False)
562         with self.assertRaises(arv_put.ArvPutUploadIsPending):
563             arv_put.ArvPutUploadJob([self.large_file_name],
564                                     replication_desired=1,
565                                     dry_run=True,
566                                     resume=False)
567         del(self.writer)
568
569 class CachedManifestValidationTest(ArvadosBaseTestCase):
570     class MockedPut(arv_put.ArvPutUploadJob):
571         def __init__(self, cached_manifest=None):
572             self._state = copy.deepcopy(arv_put.ArvPutUploadJob.EMPTY_STATE)
573             self._state['manifest'] = cached_manifest
574             self._api_client = mock.MagicMock()
575             self.logger = mock.MagicMock()
576             self.num_retries = 1
577
578     def datetime_to_hex(self, dt):
579         return hex(int(time.mktime(dt.timetuple())))[2:]
580
581     def setUp(self):
582         super(CachedManifestValidationTest, self).setUp()
583         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
584         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
585         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
586
587     def test_empty_cached_manifest_is_valid(self):
588         put_mock = self.MockedPut()
589         self.assertEqual(None, put_mock._state.get('manifest'))
590         self.assertTrue(put_mock._cached_manifest_valid())
591         put_mock._state['manifest'] = ''
592         self.assertTrue(put_mock._cached_manifest_valid())
593
594     def test_signature_cases(self):
595         now = datetime.datetime.utcnow()
596         yesterday = now - datetime.timedelta(days=1)
597         lastweek = now - datetime.timedelta(days=7)
598         tomorrow = now + datetime.timedelta(days=1)
599         nextweek = now + datetime.timedelta(days=7)
600
601         def mocked_head(blocks={}, loc=None):
602             blk = loc.split('+', 1)[0]
603             if blocks.get(blk):
604                 return True
605             raise arvados.errors.KeepRequestError("mocked error - block invalid")
606
607         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
608         cases = [
609             # All expired, reset cache - OK
610             (yesterday, lastweek, False, False, True),
611             (lastweek, yesterday, False, False, True),
612             # All non-expired valid blocks - OK
613             (tomorrow, nextweek, True, True, True),
614             (nextweek, tomorrow, True, True, True),
615             # All non-expired invalid blocks - Not OK
616             (tomorrow, nextweek, False, False, False),
617             (nextweek, tomorrow, False, False, False),
618             # One non-expired valid block - OK
619             (tomorrow, yesterday, True, False, True),
620             (yesterday, tomorrow, False, True, True),
621             # One non-expired invalid block - Not OK
622             (tomorrow, yesterday, False, False, False),
623             (yesterday, tomorrow, False, False, False),
624         ]
625         for case in cases:
626             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
627             head_responses = {
628                 self.block1: b1_valid,
629                 self.block2: b2_valid,
630             }
631             cached_manifest = self.template % (
632                 self.datetime_to_hex(b1_expiration),
633                 self.datetime_to_hex(b2_expiration),
634             )
635             arvput = self.MockedPut(cached_manifest)
636             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
637                 head_mock.side_effect = partial(mocked_head, head_responses)
638                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
639                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
640                 )
641                 if b1_expiration > now or b2_expiration > now:
642                     # A HEAD request should have been done
643                     head_mock.assert_called_once()
644                 else:
645                     head_mock.assert_not_called()
646
647
648 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
649     TEST_SIZE = os.path.getsize(__file__)
650
651     def test_expected_bytes_for_file(self):
652         writer = arv_put.ArvPutUploadJob([__file__])
653         self.assertEqual(self.TEST_SIZE,
654                          writer.bytes_expected)
655
656     def test_expected_bytes_for_tree(self):
657         tree = self.make_tmpdir()
658         shutil.copyfile(__file__, os.path.join(tree, 'one'))
659         shutil.copyfile(__file__, os.path.join(tree, 'two'))
660
661         writer = arv_put.ArvPutUploadJob([tree])
662         self.assertEqual(self.TEST_SIZE * 2,
663                          writer.bytes_expected)
664         writer = arv_put.ArvPutUploadJob([tree, __file__])
665         self.assertEqual(self.TEST_SIZE * 3,
666                          writer.bytes_expected)
667
668     def test_expected_bytes_for_device(self):
669         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
670         self.assertIsNone(writer.bytes_expected)
671         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
672         self.assertIsNone(writer.bytes_expected)
673
674
675 class ArvadosPutReportTest(ArvadosBaseTestCase):
676     def test_machine_progress(self):
677         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
678             expect = ": {} written {} total\n".format(
679                 count, -1 if (total is None) else total)
680             self.assertTrue(
681                 arv_put.machine_progress(count, total).endswith(expect))
682
683     def test_known_human_progress(self):
684         for count, total in [(0, 1), (2, 4), (45, 60)]:
685             expect = '{:.1%}'.format(1.0*count/total)
686             actual = arv_put.human_progress(count, total)
687             self.assertTrue(actual.startswith('\r'))
688             self.assertIn(expect, actual)
689
690     def test_unknown_human_progress(self):
691         for count in [1, 20, 300, 4000, 50000]:
692             self.assertTrue(re.search(r'\b{}\b'.format(count),
693                                       arv_put.human_progress(count, None)))
694
695
696 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
697     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
698
699     def setUp(self):
700         super(ArvPutLogFormatterTest, self).setUp()
701         self.stderr = tutil.StringIO()
702         self.loggingHandler = logging.StreamHandler(self.stderr)
703         self.loggingHandler.setFormatter(
704             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
705         self.logger = logging.getLogger()
706         self.logger.addHandler(self.loggingHandler)
707         self.logger.setLevel(logging.DEBUG)
708
709     def tearDown(self):
710         self.logger.removeHandler(self.loggingHandler)
711         self.stderr.close()
712         self.stderr = None
713         super(ArvPutLogFormatterTest, self).tearDown()
714
715     def test_request_id_logged_only_once_on_error(self):
716         self.logger.error('Ooops, something bad happened.')
717         self.logger.error('Another bad thing just happened.')
718         log_lines = self.stderr.getvalue().split('\n')[:-1]
719         self.assertEqual(2, len(log_lines))
720         self.assertRegex(log_lines[0], self.matcher)
721         self.assertNotRegex(log_lines[1], self.matcher)
722
723     def test_request_id_logged_only_once_on_debug(self):
724         self.logger.debug('This is just a debug message.')
725         self.logger.debug('Another message, move along.')
726         log_lines = self.stderr.getvalue().split('\n')[:-1]
727         self.assertEqual(2, len(log_lines))
728         self.assertRegex(log_lines[0], self.matcher)
729         self.assertNotRegex(log_lines[1], self.matcher)
730
731     def test_request_id_not_logged_on_info(self):
732         self.logger.info('This should be a useful message')
733         log_lines = self.stderr.getvalue().split('\n')[:-1]
734         self.assertEqual(1, len(log_lines))
735         self.assertNotRegex(log_lines[0], self.matcher)
736
737 class ArvadosPutTest(run_test_server.TestCaseWithServers,
738                      ArvadosBaseTestCase,
739                      tutil.VersionChecker):
740     MAIN_SERVER = {}
741     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
742
743     def call_main_with_args(self, args):
744         self.main_stdout.seek(0, 0)
745         self.main_stdout.truncate(0)
746         self.main_stderr.seek(0, 0)
747         self.main_stderr.truncate(0)
748         return arv_put.main(args, self.main_stdout, self.main_stderr)
749
750     def call_main_on_test_file(self, args=[]):
751         with self.make_test_file() as testfile:
752             path = testfile.name
753             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
754         self.assertTrue(
755             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
756                                         '098f6bcd4621d373cade4e832627b4f6')),
757             "did not find file stream in Keep store")
758
759     def setUp(self):
760         super(ArvadosPutTest, self).setUp()
761         run_test_server.authorize_with('active')
762         arv_put.api_client = None
763         self.main_stdout = tutil.StringIO()
764         self.main_stderr = tutil.StringIO()
765         self.loggingHandler = logging.StreamHandler(self.main_stderr)
766         self.loggingHandler.setFormatter(
767             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
768         logging.getLogger().addHandler(self.loggingHandler)
769
770     def tearDown(self):
771         logging.getLogger().removeHandler(self.loggingHandler)
772         for outbuf in ['main_stdout', 'main_stderr']:
773             if hasattr(self, outbuf):
774                 getattr(self, outbuf).close()
775                 delattr(self, outbuf)
776         super(ArvadosPutTest, self).tearDown()
777
778     def test_version_argument(self):
779         with tutil.redirected_streams(
780                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
781             with self.assertRaises(SystemExit):
782                 self.call_main_with_args(['--version'])
783         self.assertVersionOutput(out, err)
784
785     def test_simple_file_put(self):
786         self.call_main_on_test_file()
787
788     def test_put_with_unwriteable_cache_dir(self):
789         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
790         cachedir = self.make_tmpdir()
791         os.chmod(cachedir, 0o0)
792         arv_put.ResumeCache.CACHE_DIR = cachedir
793         try:
794             self.call_main_on_test_file()
795         finally:
796             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
797             os.chmod(cachedir, 0o700)
798
799     def test_put_with_unwritable_cache_subdir(self):
800         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
801         cachedir = self.make_tmpdir()
802         os.chmod(cachedir, 0o0)
803         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
804         try:
805             self.call_main_on_test_file()
806         finally:
807             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
808             os.chmod(cachedir, 0o700)
809
810     def test_put_block_replication(self):
811         self.call_main_on_test_file()
812         arv_put.api_client = None
813         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
814             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
815             self.call_main_on_test_file(['--replication', '1'])
816             self.call_main_on_test_file(['--replication', '4'])
817             self.call_main_on_test_file(['--replication', '5'])
818             self.assertEqual(
819                 [x[-1].get('copies') for x in put_mock.call_args_list],
820                 [1, 4, 5])
821
822     def test_normalize(self):
823         testfile1 = self.make_test_file()
824         testfile2 = self.make_test_file()
825         test_paths = [testfile1.name, testfile2.name]
826         # Reverse-sort the paths, so normalization must change their order.
827         test_paths.sort(reverse=True)
828         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
829                                  test_paths)
830         manifest = self.main_stdout.getvalue()
831         # Assert the second file we specified appears first in the manifest.
832         file_indices = [manifest.find(':' + os.path.basename(path))
833                         for path in test_paths]
834         self.assertGreater(*file_indices)
835
836     def test_error_name_without_collection(self):
837         self.assertRaises(SystemExit, self.call_main_with_args,
838                           ['--name', 'test without Collection',
839                            '--stream', '/dev/null'])
840
841     def test_error_when_project_not_found(self):
842         self.assertRaises(SystemExit,
843                           self.call_main_with_args,
844                           ['--project-uuid', self.Z_UUID])
845
846     def test_error_bad_project_uuid(self):
847         self.assertRaises(SystemExit,
848                           self.call_main_with_args,
849                           ['--project-uuid', self.Z_UUID, '--stream'])
850
851     def test_error_when_excluding_absolute_path(self):
852         tmpdir = self.make_tmpdir()
853         self.assertRaises(SystemExit,
854                           self.call_main_with_args,
855                           ['--exclude', '/some/absolute/path/*',
856                            tmpdir])
857
858     def test_api_error_handling(self):
859         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
860         coll_save_mock.side_effect = arvados.errors.ApiError(
861             fake_httplib2_response(403), b'{}')
862         with mock.patch('arvados.collection.Collection.save_new',
863                         new=coll_save_mock):
864             with self.assertRaises(SystemExit) as exc_test:
865                 self.call_main_with_args(['/dev/null'])
866             self.assertLess(0, exc_test.exception.args[0])
867             self.assertLess(0, coll_save_mock.call_count)
868             self.assertEqual("", self.main_stdout.getvalue())
869
870     def test_request_id_logging_on_error(self):
871         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
872         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
873         coll_save_mock.side_effect = arvados.errors.ApiError(
874             fake_httplib2_response(403), b'{}')
875         with mock.patch('arvados.collection.Collection.save_new',
876                         new=coll_save_mock):
877             with self.assertRaises(SystemExit):
878                 self.call_main_with_args(['/dev/null'])
879             self.assertRegex(
880                 self.main_stderr.getvalue(), matcher)
881
882
883 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
884                             ArvadosBaseTestCase):
885     MAIN_SERVER = {}
886     KEEP_SERVER = {'blob_signing': True}
887     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
888
889     @classmethod
890     def setUpClass(cls):
891         super(ArvPutIntegrationTest, cls).setUpClass()
892         cls.ENVIRON = os.environ.copy()
893         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
894
895     def datetime_to_hex(self, dt):
896         return hex(int(time.mktime(dt.timetuple())))[2:]
897
898     def setUp(self):
899         super(ArvPutIntegrationTest, self).setUp()
900         arv_put.api_client = None
901
902     def authorize_with(self, token_name):
903         run_test_server.authorize_with(token_name)
904         for v in ["ARVADOS_API_HOST",
905                   "ARVADOS_API_HOST_INSECURE",
906                   "ARVADOS_API_TOKEN"]:
907             self.ENVIRON[v] = arvados.config.settings()[v]
908         arv_put.api_client = arvados.api('v1')
909
910     def current_user(self):
911         return arv_put.api_client.users().current().execute()
912
913     def test_check_real_project_found(self):
914         self.authorize_with('active')
915         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
916                         "did not correctly find test fixture project")
917
918     def test_check_error_finding_nonexistent_uuid(self):
919         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
920         self.authorize_with('active')
921         try:
922             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
923                                                   0)
924         except ValueError as error:
925             self.assertIn(BAD_UUID, str(error))
926         else:
927             self.assertFalse(result, "incorrectly found nonexistent project")
928
929     def test_check_error_finding_nonexistent_project(self):
930         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
931         self.authorize_with('active')
932         with self.assertRaises(apiclient.errors.HttpError):
933             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
934                                                   0)
935
936     def test_short_put_from_stdin(self):
937         # Have to run this as an integration test since arv-put can't
938         # read from the tests' stdin.
939         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
940         # case, because the /proc entry is already gone by the time it tries.
941         pipe = subprocess.Popen(
942             [sys.executable, arv_put.__file__, '--stream'],
943             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
944             stderr=subprocess.STDOUT, env=self.ENVIRON)
945         pipe.stdin.write(b'stdin test\xa6\n')
946         pipe.stdin.close()
947         deadline = time.time() + 5
948         while (pipe.poll() is None) and (time.time() < deadline):
949             time.sleep(.1)
950         returncode = pipe.poll()
951         if returncode is None:
952             pipe.terminate()
953             self.fail("arv-put did not PUT from stdin within 5 seconds")
954         elif returncode != 0:
955             sys.stdout.write(pipe.stdout.read())
956             self.fail("arv-put returned exit code {}".format(returncode))
957         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
958                       pipe.stdout.read().decode())
959
960     def test_sigint_logs_request_id(self):
961         # Start arv-put, give it a chance to start up, send SIGINT,
962         # and check that its output includes the X-Request-Id.
963         input_stream = subprocess.Popen(
964             ['sleep', '10'],
965             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
966         pipe = subprocess.Popen(
967             [sys.executable, arv_put.__file__, '--stream'],
968             stdin=input_stream.stdout, stdout=subprocess.PIPE,
969             stderr=subprocess.STDOUT, env=self.ENVIRON)
970         # Wait for arv-put child process to print something (i.e., a
971         # log message) so we know its signal handler is installed.
972         select.select([pipe.stdout], [], [], 10)
973         pipe.send_signal(signal.SIGINT)
974         deadline = time.time() + 5
975         while (pipe.poll() is None) and (time.time() < deadline):
976             time.sleep(.1)
977         returncode = pipe.poll()
978         input_stream.terminate()
979         if returncode is None:
980             pipe.terminate()
981             self.fail("arv-put did not exit within 5 seconds")
982         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
983
984     def test_ArvPutSignedManifest(self):
985         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
986         # the newly created manifest from the API server, testing to confirm
987         # that the block locators in the returned manifest are signed.
988         self.authorize_with('active')
989
990         # Before doing anything, demonstrate that the collection
991         # we're about to create is not present in our test fixture.
992         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
993         with self.assertRaises(apiclient.errors.HttpError):
994             arv_put.api_client.collections().get(
995                 uuid=manifest_uuid).execute()
996
997         datadir = self.make_tmpdir()
998         with open(os.path.join(datadir, "foo"), "w") as f:
999             f.write("The quick brown fox jumped over the lazy dog")
1000         p = subprocess.Popen([sys.executable, arv_put.__file__,
1001                               os.path.join(datadir, 'foo')],
1002                              stdout=subprocess.PIPE,
1003                              stderr=subprocess.PIPE,
1004                              env=self.ENVIRON)
1005         (_, err) = p.communicate()
1006         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
1007         self.assertEqual(p.returncode, 0)
1008
1009         # The manifest text stored in the API server under the same
1010         # manifest UUID must use signed locators.
1011         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
1012         self.assertRegex(
1013             c['manifest_text'],
1014             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1015
1016         os.remove(os.path.join(datadir, "foo"))
1017         os.rmdir(datadir)
1018
1019     def run_and_find_collection(self, text, extra_args=[]):
1020         self.authorize_with('active')
1021         pipe = subprocess.Popen(
1022             [sys.executable, arv_put.__file__] + extra_args,
1023             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1024             stderr=subprocess.PIPE, env=self.ENVIRON)
1025         stdout, stderr = pipe.communicate(text.encode())
1026         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1027         search_key = ('portable_data_hash'
1028                       if '--portable-data-hash' in extra_args else 'uuid')
1029         collection_list = arvados.api('v1').collections().list(
1030             filters=[[search_key, '=', stdout.decode().strip()]]
1031         ).execute().get('items', [])
1032         self.assertEqual(1, len(collection_list))
1033         return collection_list[0]
1034
1035     def test_all_expired_signatures_invalidates_cache(self):
1036         self.authorize_with('active')
1037         tmpdir = self.make_tmpdir()
1038         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1039             f.write('foo')
1040         # Upload a directory and get the cache file name
1041         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1042                              stdout=subprocess.PIPE,
1043                              stderr=subprocess.PIPE,
1044                              env=self.ENVIRON)
1045         (_, err) = p.communicate()
1046         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1047         self.assertEqual(p.returncode, 0)
1048         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1049                                    err.decode()).groups()[0]
1050         self.assertTrue(os.path.isfile(cache_filepath))
1051         # Load the cache file contents and modify the manifest to simulate
1052         # an expired access token
1053         with open(cache_filepath, 'r') as c:
1054             cache = json.load(c)
1055         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1056         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1057         cache['manifest'] = re.sub(
1058             r'\@.*? ',
1059             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1060             cache['manifest'])
1061         with open(cache_filepath, 'w') as c:
1062             c.write(json.dumps(cache))
1063         # Re-run the upload and expect to get an invalid cache message
1064         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1065                              stdout=subprocess.PIPE,
1066                              stderr=subprocess.PIPE,
1067                              env=self.ENVIRON)
1068         (_, err) = p.communicate()
1069         self.assertRegex(
1070             err.decode(),
1071             r'INFO: Cache expired, starting from scratch.*')
1072         self.assertEqual(p.returncode, 0)
1073
1074     def test_invalid_signature_in_cache(self):
1075         for batch_mode in [False, True]:
1076             self.authorize_with('active')
1077             tmpdir = self.make_tmpdir()
1078             with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1079                 f.write('foo')
1080             # Upload a directory and get the cache file name
1081             arv_put_args = [tmpdir]
1082             if batch_mode:
1083                 arv_put_args = ['--batch'] + arv_put_args
1084             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1085                                 stdout=subprocess.PIPE,
1086                                 stderr=subprocess.PIPE,
1087                                 env=self.ENVIRON)
1088             (_, err) = p.communicate()
1089             self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1090             self.assertEqual(p.returncode, 0)
1091             cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1092                                     err.decode()).groups()[0]
1093             self.assertTrue(os.path.isfile(cache_filepath))
1094             # Load the cache file contents and modify the manifest to simulate
1095             # an invalid access token
1096             with open(cache_filepath, 'r') as c:
1097                 cache = json.load(c)
1098             self.assertRegex(cache['manifest'], r'\+A\S+\@')
1099             cache['manifest'] = re.sub(
1100                 r'\+A.*\@',
1101                 "+Aabcdef0123456789abcdef0123456789abcdef01@",
1102                 cache['manifest'])
1103             with open(cache_filepath, 'w') as c:
1104                 c.write(json.dumps(cache))
1105             # Re-run the upload and expect to get an invalid cache message
1106             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1107                                 stdout=subprocess.PIPE,
1108                                 stderr=subprocess.PIPE,
1109                                 env=self.ENVIRON)
1110             (_, err) = p.communicate()
1111             if not batch_mode:
1112                 self.assertRegex(
1113                     err.decode(),
1114                     r'ERROR: arv-put: Resume cache contains invalid signature.*')
1115                 self.assertEqual(p.returncode, 1)
1116             else:
1117                 self.assertRegex(
1118                     err.decode(),
1119                     r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
1120                 self.assertEqual(p.returncode, 0)
1121
1122     def test_single_expired_signature_reuploads_file(self):
1123         self.authorize_with('active')
1124         tmpdir = self.make_tmpdir()
1125         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1126             f.write('foo')
1127         # Write a second file on its own subdir to force a new stream
1128         os.mkdir(os.path.join(tmpdir, 'bar'))
1129         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1130             f.write('bar')
1131         # Upload a directory and get the cache file name
1132         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1133                              stdout=subprocess.PIPE,
1134                              stderr=subprocess.PIPE,
1135                              env=self.ENVIRON)
1136         (_, err) = p.communicate()
1137         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1138         self.assertEqual(p.returncode, 0)
1139         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1140                                    err.decode()).groups()[0]
1141         self.assertTrue(os.path.isfile(cache_filepath))
1142         # Load the cache file contents and modify the manifest to simulate
1143         # an expired access token
1144         with open(cache_filepath, 'r') as c:
1145             cache = json.load(c)
1146         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1147         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1148         # Make one of the signatures appear to have expired
1149         cache['manifest'] = re.sub(
1150             r'\@.*? 3:3:barfile.txt',
1151             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1152             cache['manifest'])
1153         with open(cache_filepath, 'w') as c:
1154             c.write(json.dumps(cache))
1155         # Re-run the upload and expect to get an invalid cache message
1156         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1157                              stdout=subprocess.PIPE,
1158                              stderr=subprocess.PIPE,
1159                              env=self.ENVIRON)
1160         (_, err) = p.communicate()
1161         self.assertRegex(
1162             err.decode(),
1163             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1164         self.assertEqual(p.returncode, 0)
1165         # Confirm that the resulting cache is different from the last run.
1166         with open(cache_filepath, 'r') as c2:
1167             new_cache = json.load(c2)
1168         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1169
1170     def test_put_collection_with_later_update(self):
1171         tmpdir = self.make_tmpdir()
1172         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1173             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1174         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1175         self.assertNotEqual(None, col['uuid'])
1176         # Add a new file to the directory
1177         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1178             f.write('The quick brown fox jumped over the lazy dog')
1179         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1180         self.assertEqual(col['uuid'], updated_col['uuid'])
1181         # Get the manifest and check that the new file is being included
1182         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1183         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1184
1185     def test_put_collection_with_utc_expiring_datetime(self):
1186         tmpdir = self.make_tmpdir()
1187         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1188         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1189             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1190         col = self.run_and_find_collection(
1191             "",
1192             ['--no-progress', '--trash-at', trash_at, tmpdir])
1193         self.assertNotEqual(None, col['uuid'])
1194         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1195         self.assertEqual(ciso8601.parse_datetime(trash_at),
1196             ciso8601.parse_datetime(c['trash_at']))
1197
1198     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1199         tmpdir = self.make_tmpdir()
1200         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1201         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1202             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1203         col = self.run_and_find_collection(
1204             "",
1205             ['--no-progress', '--trash-at', trash_at, tmpdir])
1206         self.assertNotEqual(None, col['uuid'])
1207         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1208         self.assertEqual(
1209             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1210             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1211
1212     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1213         tmpdir = self.make_tmpdir()
1214         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1215         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1216             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1217         col = self.run_and_find_collection(
1218             "",
1219             ['--no-progress', '--trash-at', trash_at, tmpdir])
1220         self.assertNotEqual(None, col['uuid'])
1221         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1222         if time.daylight:
1223             offset = datetime.timedelta(seconds=time.altzone)
1224         else:
1225             offset = datetime.timedelta(seconds=time.timezone)
1226         self.assertEqual(
1227             ciso8601.parse_datetime(trash_at) + offset,
1228             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1229
1230     def test_put_collection_with_expiring_date_only(self):
1231         tmpdir = self.make_tmpdir()
1232         trash_at = '2140-01-01'
1233         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1234         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1235             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1236         col = self.run_and_find_collection(
1237             "",
1238             ['--no-progress', '--trash-at', trash_at, tmpdir])
1239         self.assertNotEqual(None, col['uuid'])
1240         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1241         if time.daylight:
1242             offset = datetime.timedelta(seconds=time.altzone)
1243         else:
1244             offset = datetime.timedelta(seconds=time.timezone)
1245         self.assertEqual(
1246             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1247             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1248
1249     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1250         cases = ['2100', '210010','2100-10', '2100-Oct']
1251         tmpdir = self.make_tmpdir()
1252         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1253             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1254         for test_datetime in cases:
1255             with self.assertRaises(AssertionError):
1256                 self.run_and_find_collection(
1257                     "",
1258                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1259
1260     def test_put_collection_with_relative_expiring_datetime(self):
1261         expire_after = 7
1262         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1263         tmpdir = self.make_tmpdir()
1264         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1265             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1266         col = self.run_and_find_collection(
1267             "",
1268             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1269         self.assertNotEqual(None, col['uuid'])
1270         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1271         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1272         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1273         self.assertTrue(dt_before < trash_at)
1274         self.assertTrue(dt_after > trash_at)
1275
1276     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1277         expire_after = 0 # Must be >= 1
1278         tmpdir = self.make_tmpdir()
1279         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1280             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1281         with self.assertRaises(AssertionError):
1282             self.run_and_find_collection(
1283                 "",
1284                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1285
1286     def test_upload_directory_reference_without_trailing_slash(self):
1287         tmpdir1 = self.make_tmpdir()
1288         tmpdir2 = self.make_tmpdir()
1289         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1290             f.write('This is foo')
1291         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1292             f.write('This is not foo')
1293         # Upload one directory and one file
1294         col = self.run_and_find_collection("", ['--no-progress',
1295                                                 tmpdir1,
1296                                                 os.path.join(tmpdir2, 'bar')])
1297         self.assertNotEqual(None, col['uuid'])
1298         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1299         # Check that 'foo' was written inside a subcollection
1300         # OTOH, 'bar' should have been directly uploaded on the root collection
1301         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1302
1303     def test_upload_directory_reference_with_trailing_slash(self):
1304         tmpdir1 = self.make_tmpdir()
1305         tmpdir2 = self.make_tmpdir()
1306         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1307             f.write('This is foo')
1308         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1309             f.write('This is not foo')
1310         # Upload one directory (with trailing slash) and one file
1311         col = self.run_and_find_collection("", ['--no-progress',
1312                                                 tmpdir1 + os.sep,
1313                                                 os.path.join(tmpdir2, 'bar')])
1314         self.assertNotEqual(None, col['uuid'])
1315         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1316         # Check that 'foo' and 'bar' were written at the same level
1317         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1318
1319     def test_put_collection_with_high_redundancy(self):
1320         # Write empty data: we're not testing CollectionWriter, just
1321         # making sure collections.create tells the API server what our
1322         # desired replication level is.
1323         collection = self.run_and_find_collection("", ['--replication', '4'])
1324         self.assertEqual(4, collection['replication_desired'])
1325
1326     def test_put_collection_with_default_redundancy(self):
1327         collection = self.run_and_find_collection("")
1328         self.assertEqual(None, collection['replication_desired'])
1329
1330     def test_put_collection_with_unnamed_project_link(self):
1331         link = self.run_and_find_collection(
1332             "Test unnamed collection",
1333             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1334         username = pwd.getpwuid(os.getuid()).pw_name
1335         self.assertRegex(
1336             link['name'],
1337             r'^Saved at .* by {}@'.format(re.escape(username)))
1338
1339     def test_put_collection_with_name_and_no_project(self):
1340         link_name = 'Test Collection Link in home project'
1341         collection = self.run_and_find_collection(
1342             "Test named collection in home project",
1343             ['--portable-data-hash', '--name', link_name])
1344         self.assertEqual(link_name, collection['name'])
1345         my_user_uuid = self.current_user()['uuid']
1346         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1347
1348     def test_put_collection_with_named_project_link(self):
1349         link_name = 'Test auto Collection Link'
1350         collection = self.run_and_find_collection("Test named collection",
1351                                       ['--portable-data-hash',
1352                                        '--name', link_name,
1353                                        '--project-uuid', self.PROJECT_UUID])
1354         self.assertEqual(link_name, collection['name'])
1355
1356     def test_put_collection_with_storage_classes_specified(self):
1357         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1358         self.assertEqual(len(collection['storage_classes_desired']), 1)
1359         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1360
1361     def test_put_collection_with_multiple_storage_classes_specified(self):
1362         collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar  ,baz'])
1363         self.assertEqual(len(collection['storage_classes_desired']), 3)
1364         self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
1365
1366     def test_put_collection_without_storage_classes_specified(self):
1367         collection = self.run_and_find_collection("")
1368         self.assertEqual(len(collection['storage_classes_desired']), 1)
1369         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1370
1371     def test_exclude_filename_pattern(self):
1372         tmpdir = self.make_tmpdir()
1373         tmpsubdir = os.path.join(tmpdir, 'subdir')
1374         os.mkdir(tmpsubdir)
1375         for fname in ['file1', 'file2', 'file3']:
1376             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1377                 f.write("This is %s" % fname)
1378             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1379                 f.write("This is %s" % fname)
1380         col = self.run_and_find_collection("", ['--no-progress',
1381                                                 '--exclude', '*2.txt',
1382                                                 '--exclude', 'file3.*',
1383                                                  tmpdir])
1384         self.assertNotEqual(None, col['uuid'])
1385         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1386         # None of the file2.txt & file3.txt should have been uploaded
1387         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1388         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1389         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1390
1391     def test_exclude_filepath_pattern(self):
1392         tmpdir = self.make_tmpdir()
1393         tmpsubdir = os.path.join(tmpdir, 'subdir')
1394         os.mkdir(tmpsubdir)
1395         for fname in ['file1', 'file2', 'file3']:
1396             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1397                 f.write("This is %s" % fname)
1398             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1399                 f.write("This is %s" % fname)
1400         col = self.run_and_find_collection("", ['--no-progress',
1401                                                 '--exclude', 'subdir/*2.txt',
1402                                                 '--exclude', './file1.*',
1403                                                  tmpdir])
1404         self.assertNotEqual(None, col['uuid'])
1405         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1406         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1407         self.assertNotRegex(c['manifest_text'],
1408                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1409         self.assertNotRegex(c['manifest_text'],
1410                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1411         self.assertRegex(c['manifest_text'],
1412                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1413         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1414
1415     def test_unicode_on_filename(self):
1416         tmpdir = self.make_tmpdir()
1417         fname = u"iā¤arvados.txt"
1418         with open(os.path.join(tmpdir, fname), 'w') as f:
1419             f.write("This is a unicode named file")
1420         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1421         self.assertNotEqual(None, col['uuid'])
1422         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1423         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1424
1425     def test_silent_mode_no_errors(self):
1426         self.authorize_with('active')
1427         tmpdir = self.make_tmpdir()
1428         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1429             f.write('hello world')
1430         pipe = subprocess.Popen(
1431             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1432             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1433             stderr=subprocess.PIPE, env=self.ENVIRON)
1434         stdout, stderr = pipe.communicate()
1435         # No console output should occur on normal operations
1436         self.assertNotRegex(stderr.decode(), r'.+')
1437         self.assertNotRegex(stdout.decode(), r'.+')
1438
1439     def test_silent_mode_does_not_avoid_error_messages(self):
1440         self.authorize_with('active')
1441         pipe = subprocess.Popen(
1442             [sys.executable, arv_put.__file__] + ['--silent',
1443                                                   '/path/not/existant'],
1444             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1445             stderr=subprocess.PIPE, env=self.ENVIRON)
1446         stdout, stderr = pipe.communicate()
1447         # Error message should be displayed when errors happen
1448         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1449         self.assertNotRegex(stdout.decode(), r'.+')
1450
1451
1452 if __name__ == '__main__':
1453     unittest.main()