18480: Applies the suggested patch, adjusts the tests to kill the producer.
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 from __future__ import absolute_import
8 from __future__ import division
9 from future import standard_library
10 standard_library.install_aliases()
11 from builtins import str
12 from builtins import range
13 from functools import partial
14 import apiclient
15 import ciso8601
16 import datetime
17 import json
18 import logging
19 import mock
20 import multiprocessing
21 import os
22 import pwd
23 import random
24 import re
25 import select
26 import shutil
27 import signal
28 import subprocess
29 import sys
30 import tempfile
31 import time
32 import unittest
33 import uuid
34
35 import arvados
36 import arvados.commands.put as arv_put
37 from . import arvados_testutil as tutil
38
39 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
40 from . import run_test_server
41
42 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
43     CACHE_ARGSET = [
44         [],
45         ['/dev/null'],
46         ['/dev/null', '--filename', 'empty'],
47         ['/tmp']
48         ]
49
50     def tearDown(self):
51         super(ArvadosPutResumeCacheTest, self).tearDown()
52         try:
53             self.last_cache.destroy()
54         except AttributeError:
55             pass
56
57     def cache_path_from_arglist(self, arglist):
58         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
59
60     def test_cache_names_stable(self):
61         for argset in self.CACHE_ARGSET:
62             self.assertEqual(self.cache_path_from_arglist(argset),
63                               self.cache_path_from_arglist(argset),
64                               "cache name changed for {}".format(argset))
65
66     def test_cache_names_unique(self):
67         results = []
68         for argset in self.CACHE_ARGSET:
69             path = self.cache_path_from_arglist(argset)
70             self.assertNotIn(path, results)
71             results.append(path)
72
73     def test_cache_names_simple(self):
74         # The goal here is to make sure the filename doesn't use characters
75         # reserved by the filesystem.  Feel free to adjust this regexp as
76         # long as it still does that.
77         bad_chars = re.compile(r'[^-\.\w]')
78         for argset in self.CACHE_ARGSET:
79             path = self.cache_path_from_arglist(argset)
80             self.assertFalse(bad_chars.search(os.path.basename(path)),
81                              "path too exotic: {}".format(path))
82
83     def test_cache_names_ignore_argument_order(self):
84         self.assertEqual(
85             self.cache_path_from_arglist(['a', 'b', 'c']),
86             self.cache_path_from_arglist(['c', 'a', 'b']))
87         self.assertEqual(
88             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
89             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
90
91     def test_cache_names_differ_for_similar_paths(self):
92         # This test needs names at / that don't exist on the real filesystem.
93         self.assertNotEqual(
94             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
95             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
96
97     def test_cache_names_ignore_irrelevant_arguments(self):
98         # Workaround: parse_arguments bails on --filename with a directory.
99         path1 = self.cache_path_from_arglist(['/tmp'])
100         args = arv_put.parse_arguments(['/tmp'])
101         args.filename = 'tmp'
102         path2 = arv_put.ResumeCache.make_path(args)
103         self.assertEqual(path1, path2,
104                          "cache path considered --filename for directory")
105         self.assertEqual(
106             self.cache_path_from_arglist(['-']),
107             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
108             "cache path considered --max-manifest-depth for file")
109
110     def test_cache_names_treat_negative_manifest_depths_identically(self):
111         base_args = ['/tmp', '--max-manifest-depth']
112         self.assertEqual(
113             self.cache_path_from_arglist(base_args + ['-1']),
114             self.cache_path_from_arglist(base_args + ['-2']))
115
116     def test_cache_names_treat_stdin_consistently(self):
117         self.assertEqual(
118             self.cache_path_from_arglist(['-', '--filename', 'test']),
119             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
120
121     def test_cache_names_identical_for_synonymous_names(self):
122         self.assertEqual(
123             self.cache_path_from_arglist(['.']),
124             self.cache_path_from_arglist([os.path.realpath('.')]))
125         testdir = self.make_tmpdir()
126         looplink = os.path.join(testdir, 'loop')
127         os.symlink(testdir, looplink)
128         self.assertEqual(
129             self.cache_path_from_arglist([testdir]),
130             self.cache_path_from_arglist([looplink]))
131
132     def test_cache_names_different_by_api_host(self):
133         config = arvados.config.settings()
134         orig_host = config.get('ARVADOS_API_HOST')
135         try:
136             name1 = self.cache_path_from_arglist(['.'])
137             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
138             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
139         finally:
140             if orig_host is None:
141                 del config['ARVADOS_API_HOST']
142             else:
143                 config['ARVADOS_API_HOST'] = orig_host
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams(self, keep_client_head):
159         keep_client_head.side_effect = [True]
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168
169     @mock.patch('arvados.keep.KeepClient.head')
170     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
171         keep_client_head.side_effect = Exception('Locator not found')
172         thing = {}
173         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
174         with tempfile.NamedTemporaryFile() as cachefile:
175             self.last_cache = arv_put.ResumeCache(cachefile.name)
176         self.last_cache.save(thing)
177         self.last_cache.close()
178         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
179         self.assertNotEqual(None, resume_cache)
180         resume_cache.check_cache()
181
182     def test_basic_cache_storage(self):
183         thing = ['test', 'list']
184         with tempfile.NamedTemporaryFile() as cachefile:
185             self.last_cache = arv_put.ResumeCache(cachefile.name)
186         self.last_cache.save(thing)
187         self.assertEqual(thing, self.last_cache.load())
188
189     def test_empty_cache(self):
190         with tempfile.NamedTemporaryFile() as cachefile:
191             cache = arv_put.ResumeCache(cachefile.name)
192         self.assertRaises(ValueError, cache.load)
193
194     def test_cache_persistent(self):
195         thing = ['test', 'list']
196         path = os.path.join(self.make_tmpdir(), 'cache')
197         cache = arv_put.ResumeCache(path)
198         cache.save(thing)
199         cache.close()
200         self.last_cache = arv_put.ResumeCache(path)
201         self.assertEqual(thing, self.last_cache.load())
202
203     def test_multiple_cache_writes(self):
204         thing = ['short', 'list']
205         with tempfile.NamedTemporaryFile() as cachefile:
206             self.last_cache = arv_put.ResumeCache(cachefile.name)
207         # Start writing an object longer than the one we test, to make
208         # sure the cache file gets truncated.
209         self.last_cache.save(['long', 'long', 'list'])
210         self.last_cache.save(thing)
211         self.assertEqual(thing, self.last_cache.load())
212
213     def test_cache_is_locked(self):
214         with tempfile.NamedTemporaryFile() as cachefile:
215             _ = arv_put.ResumeCache(cachefile.name)
216             self.assertRaises(arv_put.ResumeCacheConflict,
217                               arv_put.ResumeCache, cachefile.name)
218
219     def test_cache_stays_locked(self):
220         with tempfile.NamedTemporaryFile() as cachefile:
221             self.last_cache = arv_put.ResumeCache(cachefile.name)
222             path = cachefile.name
223         self.last_cache.save('test')
224         self.assertRaises(arv_put.ResumeCacheConflict,
225                           arv_put.ResumeCache, path)
226
227     def test_destroy_cache(self):
228         cachefile = tempfile.NamedTemporaryFile(delete=False)
229         try:
230             cache = arv_put.ResumeCache(cachefile.name)
231             cache.save('test')
232             cache.destroy()
233             try:
234                 arv_put.ResumeCache(cachefile.name)
235             except arv_put.ResumeCacheConflict:
236                 self.fail("could not load cache after destroying it")
237             self.assertRaises(ValueError, cache.load)
238         finally:
239             if os.path.exists(cachefile.name):
240                 os.unlink(cachefile.name)
241
242     def test_restart_cache(self):
243         path = os.path.join(self.make_tmpdir(), 'cache')
244         cache = arv_put.ResumeCache(path)
245         cache.save('test')
246         cache.restart()
247         self.assertRaises(ValueError, cache.load)
248         self.assertRaises(arv_put.ResumeCacheConflict,
249                           arv_put.ResumeCache, path)
250
251
252 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
253                           ArvadosBaseTestCase):
254
255     def setUp(self):
256         super(ArvPutUploadJobTest, self).setUp()
257         run_test_server.authorize_with('active')
258         # Temp files creation
259         self.tempdir = tempfile.mkdtemp()
260         subdir = os.path.join(self.tempdir, 'subdir')
261         os.mkdir(subdir)
262         data = "x" * 1024 # 1 KB
263         for i in range(1, 5):
264             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
265                 f.write(data * i)
266         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
267             f.write(data * 5)
268         # Large temp file for resume test
269         _, self.large_file_name = tempfile.mkstemp()
270         fileobj = open(self.large_file_name, 'w')
271         # Make sure to write just a little more than one block
272         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
273             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
274             fileobj.write(data)
275         fileobj.close()
276         # Temp dir containing small files to be repacked
277         self.small_files_dir = tempfile.mkdtemp()
278         data = 'y' * 1024 * 1024 # 1 MB
279         for i in range(1, 70):
280             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
281                 f.write(data + str(i))
282         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
283         # Temp dir to hold a symlink to other temp dir
284         self.tempdir_with_symlink = tempfile.mkdtemp()
285         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
286         os.symlink(os.path.join(self.tempdir, '1'),
287                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
288
289     def tearDown(self):
290         super(ArvPutUploadJobTest, self).tearDown()
291         shutil.rmtree(self.tempdir)
292         os.unlink(self.large_file_name)
293         shutil.rmtree(self.small_files_dir)
294         shutil.rmtree(self.tempdir_with_symlink)
295
296     def test_non_regular_files_are_ignored(self):
297         def pfunc(x):
298             with open(x, 'w') as f:
299                 f.write('test')
300         fifo_filename = 'fifo-file'
301         fifo_path = os.path.join(self.tempdir, fifo_filename)
302         os.mkfifo(fifo_path)
303         producer = multiprocessing.Process(target=pfunc, args=(fifo_path,))
304         producer.start()
305         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
306         cwriter.start(save_collection=False)
307         self.assertNotIn(fifo_filename, cwriter.manifest_text())
308         if producer.exitcode is None:
309             # If the producer is still running, kill it.
310             producer.terminate()
311             producer.join(1)
312
313     def test_symlinks_are_followed_by_default(self):
314         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
315         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
316         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
317         cwriter.start(save_collection=False)
318         self.assertIn('linkeddir', cwriter.manifest_text())
319         self.assertIn('linkedfile', cwriter.manifest_text())
320         cwriter.destroy_cache()
321
322     def test_symlinks_are_not_followed_when_requested(self):
323         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
324         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
325         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
326                                           follow_links=False)
327         cwriter.start(save_collection=False)
328         self.assertNotIn('linkeddir', cwriter.manifest_text())
329         self.assertNotIn('linkedfile', cwriter.manifest_text())
330         cwriter.destroy_cache()
331         # Check for bug #17800: passed symlinks should also be ignored.
332         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
333         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
334         cwriter.start(save_collection=False)
335         self.assertNotIn('linkeddir', cwriter.manifest_text())
336         cwriter.destroy_cache()
337
338     def test_no_empty_collection_saved(self):
339         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
340         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
341         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
342         cwriter.start(save_collection=True)
343         self.assertIsNone(cwriter.manifest_locator())
344         self.assertEqual('', cwriter.manifest_text())
345         cwriter.destroy_cache()
346
347     def test_passing_nonexistant_path_raise_exception(self):
348         uuid_str = str(uuid.uuid4())
349         with self.assertRaises(arv_put.PathDoesNotExistError):
350             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
351
352     def test_writer_works_without_cache(self):
353         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
354         cwriter.start(save_collection=False)
355         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
356
357     def test_writer_works_with_cache(self):
358         with tempfile.NamedTemporaryFile() as f:
359             f.write(b'foo')
360             f.flush()
361             cwriter = arv_put.ArvPutUploadJob([f.name])
362             cwriter.start(save_collection=False)
363             self.assertEqual(0, cwriter.bytes_skipped)
364             self.assertEqual(3, cwriter.bytes_written)
365             # Don't destroy the cache, and start another upload
366             cwriter_new = arv_put.ArvPutUploadJob([f.name])
367             cwriter_new.start(save_collection=False)
368             cwriter_new.destroy_cache()
369             self.assertEqual(3, cwriter_new.bytes_skipped)
370             self.assertEqual(3, cwriter_new.bytes_written)
371
372     def make_progress_tester(self):
373         progression = []
374         def record_func(written, expected):
375             progression.append((written, expected))
376         return progression, record_func
377
378     def test_progress_reporting(self):
379         with tempfile.NamedTemporaryFile() as f:
380             f.write(b'foo')
381             f.flush()
382             for expect_count in (None, 8):
383                 progression, reporter = self.make_progress_tester()
384                 cwriter = arv_put.ArvPutUploadJob([f.name],
385                                                   reporter=reporter)
386                 cwriter.bytes_expected = expect_count
387                 cwriter.start(save_collection=False)
388                 cwriter.destroy_cache()
389                 self.assertIn((3, expect_count), progression)
390
391     def test_writer_upload_directory(self):
392         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
393         cwriter.start(save_collection=False)
394         cwriter.destroy_cache()
395         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
396
397     def test_resume_large_file_upload(self):
398         def wrapped_write(*args, **kwargs):
399             data = args[1]
400             # Exit only on last block
401             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
402                 # Simulate a checkpoint before quitting. Ensure block commit.
403                 self.writer._update(final=True)
404                 raise SystemExit("Simulated error")
405             return self.arvfile_write(*args, **kwargs)
406
407         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
408                         autospec=True) as mocked_write:
409             mocked_write.side_effect = wrapped_write
410             writer = arv_put.ArvPutUploadJob([self.large_file_name],
411                                              replication_desired=1)
412             # We'll be accessing from inside the wrapper
413             self.writer = writer
414             with self.assertRaises(SystemExit):
415                 writer.start(save_collection=False)
416             # Confirm that the file was partially uploaded
417             self.assertGreater(writer.bytes_written, 0)
418             self.assertLess(writer.bytes_written,
419                             os.path.getsize(self.large_file_name))
420         # Retry the upload
421         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
422                                           replication_desired=1)
423         writer2.start(save_collection=False)
424         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
425                          os.path.getsize(self.large_file_name))
426         writer2.destroy_cache()
427         del(self.writer)
428
429     # Test for bug #11002
430     def test_graceful_exit_while_repacking_small_blocks(self):
431         def wrapped_commit(*args, **kwargs):
432             raise SystemExit("Simulated error")
433
434         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
435                         autospec=True) as mocked_commit:
436             mocked_commit.side_effect = wrapped_commit
437             # Upload a little more than 1 block, wrapped_commit will make the first block
438             # commit to fail.
439             # arv-put should not exit with an exception by trying to commit the collection
440             # as it's in an inconsistent state.
441             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
442                                              replication_desired=1)
443             try:
444                 with self.assertRaises(SystemExit):
445                     writer.start(save_collection=False)
446             except arvados.arvfile.UnownedBlockError:
447                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
448         writer.destroy_cache()
449
450     def test_no_resume_when_asked(self):
451         def wrapped_write(*args, **kwargs):
452             data = args[1]
453             # Exit only on last block
454             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
455                 # Simulate a checkpoint before quitting.
456                 self.writer._update()
457                 raise SystemExit("Simulated error")
458             return self.arvfile_write(*args, **kwargs)
459
460         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
461                         autospec=True) as mocked_write:
462             mocked_write.side_effect = wrapped_write
463             writer = arv_put.ArvPutUploadJob([self.large_file_name],
464                                              replication_desired=1)
465             # We'll be accessing from inside the wrapper
466             self.writer = writer
467             with self.assertRaises(SystemExit):
468                 writer.start(save_collection=False)
469             # Confirm that the file was partially uploaded
470             self.assertGreater(writer.bytes_written, 0)
471             self.assertLess(writer.bytes_written,
472                             os.path.getsize(self.large_file_name))
473         # Retry the upload, this time without resume
474         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
475                                           replication_desired=1,
476                                           resume=False)
477         writer2.start(save_collection=False)
478         self.assertEqual(writer2.bytes_skipped, 0)
479         self.assertEqual(writer2.bytes_written,
480                          os.path.getsize(self.large_file_name))
481         writer2.destroy_cache()
482         del(self.writer)
483
484     def test_no_resume_when_no_cache(self):
485         def wrapped_write(*args, **kwargs):
486             data = args[1]
487             # Exit only on last block
488             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
489                 # Simulate a checkpoint before quitting.
490                 self.writer._update()
491                 raise SystemExit("Simulated error")
492             return self.arvfile_write(*args, **kwargs)
493
494         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
495                         autospec=True) as mocked_write:
496             mocked_write.side_effect = wrapped_write
497             writer = arv_put.ArvPutUploadJob([self.large_file_name],
498                                              replication_desired=1)
499             # We'll be accessing from inside the wrapper
500             self.writer = writer
501             with self.assertRaises(SystemExit):
502                 writer.start(save_collection=False)
503             # Confirm that the file was partially uploaded
504             self.assertGreater(writer.bytes_written, 0)
505             self.assertLess(writer.bytes_written,
506                             os.path.getsize(self.large_file_name))
507         # Retry the upload, this time without cache usage
508         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
509                                           replication_desired=1,
510                                           resume=False,
511                                           use_cache=False)
512         writer2.start(save_collection=False)
513         self.assertEqual(writer2.bytes_skipped, 0)
514         self.assertEqual(writer2.bytes_written,
515                          os.path.getsize(self.large_file_name))
516         writer2.destroy_cache()
517         del(self.writer)
518
519     def test_dry_run_feature(self):
520         def wrapped_write(*args, **kwargs):
521             data = args[1]
522             # Exit only on last block
523             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
524                 # Simulate a checkpoint before quitting.
525                 self.writer._update()
526                 raise SystemExit("Simulated error")
527             return self.arvfile_write(*args, **kwargs)
528
529         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
530                         autospec=True) as mocked_write:
531             mocked_write.side_effect = wrapped_write
532             writer = arv_put.ArvPutUploadJob([self.large_file_name],
533                                              replication_desired=1)
534             # We'll be accessing from inside the wrapper
535             self.writer = writer
536             with self.assertRaises(SystemExit):
537                 writer.start(save_collection=False)
538             # Confirm that the file was partially uploaded
539             self.assertGreater(writer.bytes_written, 0)
540             self.assertLess(writer.bytes_written,
541                             os.path.getsize(self.large_file_name))
542         with self.assertRaises(arv_put.ArvPutUploadIsPending):
543             # Retry the upload using dry_run to check if there is a pending upload
544             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
545                                               replication_desired=1,
546                                               dry_run=True)
547         # Complete the pending upload
548         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
549                                           replication_desired=1)
550         writer3.start(save_collection=False)
551         with self.assertRaises(arv_put.ArvPutUploadNotPending):
552             # Confirm there's no pending upload with dry_run=True
553             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
554                                               replication_desired=1,
555                                               dry_run=True)
556         # Test obvious cases
557         with self.assertRaises(arv_put.ArvPutUploadIsPending):
558             arv_put.ArvPutUploadJob([self.large_file_name],
559                                     replication_desired=1,
560                                     dry_run=True,
561                                     resume=False,
562                                     use_cache=False)
563         with self.assertRaises(arv_put.ArvPutUploadIsPending):
564             arv_put.ArvPutUploadJob([self.large_file_name],
565                                     replication_desired=1,
566                                     dry_run=True,
567                                     resume=False)
568         del(self.writer)
569
570 class CachedManifestValidationTest(ArvadosBaseTestCase):
571     class MockedPut(arv_put.ArvPutUploadJob):
572         def __init__(self, cached_manifest=None):
573             self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
574             self._state['manifest'] = cached_manifest
575             self._api_client = mock.MagicMock()
576             self.logger = mock.MagicMock()
577             self.num_retries = 1
578
579     def datetime_to_hex(self, dt):
580         return hex(int(time.mktime(dt.timetuple())))[2:]
581
582     def setUp(self):
583         super(CachedManifestValidationTest, self).setUp()
584         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
585         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
586         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
587
588     def test_empty_cached_manifest_is_valid(self):
589         put_mock = self.MockedPut()
590         self.assertEqual(None, put_mock._state.get('manifest'))
591         self.assertTrue(put_mock._cached_manifest_valid())
592         put_mock._state['manifest'] = ''
593         self.assertTrue(put_mock._cached_manifest_valid())
594
595     def test_signature_cases(self):
596         now = datetime.datetime.utcnow()
597         yesterday = now - datetime.timedelta(days=1)
598         lastweek = now - datetime.timedelta(days=7)
599         tomorrow = now + datetime.timedelta(days=1)
600         nextweek = now + datetime.timedelta(days=7)
601
602         def mocked_head(blocks={}, loc=None):
603             blk = loc.split('+', 1)[0]
604             if blocks.get(blk):
605                 return True
606             raise arvados.errors.KeepRequestError("mocked error - block invalid")
607
608         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
609         cases = [
610             # All expired, reset cache - OK
611             (yesterday, lastweek, False, False, True),
612             (lastweek, yesterday, False, False, True),
613             # All non-expired valid blocks - OK
614             (tomorrow, nextweek, True, True, True),
615             (nextweek, tomorrow, True, True, True),
616             # All non-expired invalid blocks - Not OK
617             (tomorrow, nextweek, False, False, False),
618             (nextweek, tomorrow, False, False, False),
619             # One non-expired valid block - OK
620             (tomorrow, yesterday, True, False, True),
621             (yesterday, tomorrow, False, True, True),
622             # One non-expired invalid block - Not OK
623             (tomorrow, yesterday, False, False, False),
624             (yesterday, tomorrow, False, False, False),
625         ]
626         for case in cases:
627             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
628             head_responses = {
629                 self.block1: b1_valid,
630                 self.block2: b2_valid,
631             }
632             cached_manifest = self.template % (
633                 self.datetime_to_hex(b1_expiration),
634                 self.datetime_to_hex(b2_expiration),
635             )
636             arvput = self.MockedPut(cached_manifest)
637             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
638                 head_mock.side_effect = partial(mocked_head, head_responses)
639                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
640                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
641                 )
642                 if b1_expiration > now or b2_expiration > now:
643                     # A HEAD request should have been done
644                     head_mock.assert_called_once()
645                 else:
646                     head_mock.assert_not_called()
647
648
649 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
650     TEST_SIZE = os.path.getsize(__file__)
651
652     def test_expected_bytes_for_file(self):
653         writer = arv_put.ArvPutUploadJob([__file__])
654         self.assertEqual(self.TEST_SIZE,
655                          writer.bytes_expected)
656
657     def test_expected_bytes_for_tree(self):
658         tree = self.make_tmpdir()
659         shutil.copyfile(__file__, os.path.join(tree, 'one'))
660         shutil.copyfile(__file__, os.path.join(tree, 'two'))
661
662         writer = arv_put.ArvPutUploadJob([tree])
663         self.assertEqual(self.TEST_SIZE * 2,
664                          writer.bytes_expected)
665         writer = arv_put.ArvPutUploadJob([tree, __file__])
666         self.assertEqual(self.TEST_SIZE * 3,
667                          writer.bytes_expected)
668
669     def test_expected_bytes_for_device(self):
670         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
671         self.assertIsNone(writer.bytes_expected)
672         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
673         self.assertIsNone(writer.bytes_expected)
674
675
676 class ArvadosPutReportTest(ArvadosBaseTestCase):
677     def test_machine_progress(self):
678         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
679             expect = ": {} written {} total\n".format(
680                 count, -1 if (total is None) else total)
681             self.assertTrue(
682                 arv_put.machine_progress(count, total).endswith(expect))
683
684     def test_known_human_progress(self):
685         for count, total in [(0, 1), (2, 4), (45, 60)]:
686             expect = '{:.1%}'.format(1.0*count/total)
687             actual = arv_put.human_progress(count, total)
688             self.assertTrue(actual.startswith('\r'))
689             self.assertIn(expect, actual)
690
691     def test_unknown_human_progress(self):
692         for count in [1, 20, 300, 4000, 50000]:
693             self.assertTrue(re.search(r'\b{}\b'.format(count),
694                                       arv_put.human_progress(count, None)))
695
696
697 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
698     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
699
700     def setUp(self):
701         super(ArvPutLogFormatterTest, self).setUp()
702         self.stderr = tutil.StringIO()
703         self.loggingHandler = logging.StreamHandler(self.stderr)
704         self.loggingHandler.setFormatter(
705             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
706         self.logger = logging.getLogger()
707         self.logger.addHandler(self.loggingHandler)
708         self.logger.setLevel(logging.DEBUG)
709
710     def tearDown(self):
711         self.logger.removeHandler(self.loggingHandler)
712         self.stderr.close()
713         self.stderr = None
714         super(ArvPutLogFormatterTest, self).tearDown()
715
716     def test_request_id_logged_only_once_on_error(self):
717         self.logger.error('Ooops, something bad happened.')
718         self.logger.error('Another bad thing just happened.')
719         log_lines = self.stderr.getvalue().split('\n')[:-1]
720         self.assertEqual(2, len(log_lines))
721         self.assertRegex(log_lines[0], self.matcher)
722         self.assertNotRegex(log_lines[1], self.matcher)
723
724     def test_request_id_logged_only_once_on_debug(self):
725         self.logger.debug('This is just a debug message.')
726         self.logger.debug('Another message, move along.')
727         log_lines = self.stderr.getvalue().split('\n')[:-1]
728         self.assertEqual(2, len(log_lines))
729         self.assertRegex(log_lines[0], self.matcher)
730         self.assertNotRegex(log_lines[1], self.matcher)
731
732     def test_request_id_not_logged_on_info(self):
733         self.logger.info('This should be a useful message')
734         log_lines = self.stderr.getvalue().split('\n')[:-1]
735         self.assertEqual(1, len(log_lines))
736         self.assertNotRegex(log_lines[0], self.matcher)
737
738 class ArvadosPutTest(run_test_server.TestCaseWithServers,
739                      ArvadosBaseTestCase,
740                      tutil.VersionChecker):
741     MAIN_SERVER = {}
742     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
743
744     def call_main_with_args(self, args):
745         self.main_stdout.seek(0, 0)
746         self.main_stdout.truncate(0)
747         self.main_stderr.seek(0, 0)
748         self.main_stderr.truncate(0)
749         return arv_put.main(args, self.main_stdout, self.main_stderr)
750
751     def call_main_on_test_file(self, args=[]):
752         with self.make_test_file() as testfile:
753             path = testfile.name
754             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
755         self.assertTrue(
756             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
757                                         '098f6bcd4621d373cade4e832627b4f6')),
758             "did not find file stream in Keep store")
759
760     def setUp(self):
761         super(ArvadosPutTest, self).setUp()
762         run_test_server.authorize_with('active')
763         arv_put.api_client = None
764         self.main_stdout = tutil.StringIO()
765         self.main_stderr = tutil.StringIO()
766         self.loggingHandler = logging.StreamHandler(self.main_stderr)
767         self.loggingHandler.setFormatter(
768             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
769         logging.getLogger().addHandler(self.loggingHandler)
770
771     def tearDown(self):
772         logging.getLogger().removeHandler(self.loggingHandler)
773         for outbuf in ['main_stdout', 'main_stderr']:
774             if hasattr(self, outbuf):
775                 getattr(self, outbuf).close()
776                 delattr(self, outbuf)
777         super(ArvadosPutTest, self).tearDown()
778
779     def test_version_argument(self):
780         with tutil.redirected_streams(
781                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
782             with self.assertRaises(SystemExit):
783                 self.call_main_with_args(['--version'])
784         self.assertVersionOutput(out, err)
785
786     def test_simple_file_put(self):
787         self.call_main_on_test_file()
788
789     def test_put_with_unwriteable_cache_dir(self):
790         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
791         cachedir = self.make_tmpdir()
792         os.chmod(cachedir, 0o0)
793         arv_put.ResumeCache.CACHE_DIR = cachedir
794         try:
795             self.call_main_on_test_file()
796         finally:
797             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
798             os.chmod(cachedir, 0o700)
799
800     def test_put_with_unwritable_cache_subdir(self):
801         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
802         cachedir = self.make_tmpdir()
803         os.chmod(cachedir, 0o0)
804         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
805         try:
806             self.call_main_on_test_file()
807         finally:
808             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
809             os.chmod(cachedir, 0o700)
810
811     def test_put_block_replication(self):
812         self.call_main_on_test_file()
813         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
814             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
815             self.call_main_on_test_file(['--replication', '1'])
816             self.call_main_on_test_file(['--replication', '4'])
817             self.call_main_on_test_file(['--replication', '5'])
818             self.assertEqual(
819                 [x[-1].get('copies') for x in put_mock.call_args_list],
820                 [1, 4, 5])
821
822     def test_normalize(self):
823         testfile1 = self.make_test_file()
824         testfile2 = self.make_test_file()
825         test_paths = [testfile1.name, testfile2.name]
826         # Reverse-sort the paths, so normalization must change their order.
827         test_paths.sort(reverse=True)
828         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
829                                  test_paths)
830         manifest = self.main_stdout.getvalue()
831         # Assert the second file we specified appears first in the manifest.
832         file_indices = [manifest.find(':' + os.path.basename(path))
833                         for path in test_paths]
834         self.assertGreater(*file_indices)
835
836     def test_error_name_without_collection(self):
837         self.assertRaises(SystemExit, self.call_main_with_args,
838                           ['--name', 'test without Collection',
839                            '--stream', '/dev/null'])
840
841     def test_error_when_project_not_found(self):
842         self.assertRaises(SystemExit,
843                           self.call_main_with_args,
844                           ['--project-uuid', self.Z_UUID])
845
846     def test_error_bad_project_uuid(self):
847         self.assertRaises(SystemExit,
848                           self.call_main_with_args,
849                           ['--project-uuid', self.Z_UUID, '--stream'])
850
851     def test_error_when_excluding_absolute_path(self):
852         tmpdir = self.make_tmpdir()
853         self.assertRaises(SystemExit,
854                           self.call_main_with_args,
855                           ['--exclude', '/some/absolute/path/*',
856                            tmpdir])
857
858     def test_api_error_handling(self):
859         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
860         coll_save_mock.side_effect = arvados.errors.ApiError(
861             fake_httplib2_response(403), b'{}')
862         with mock.patch('arvados.collection.Collection.save_new',
863                         new=coll_save_mock):
864             with self.assertRaises(SystemExit) as exc_test:
865                 self.call_main_with_args(['/dev/null'])
866             self.assertLess(0, exc_test.exception.args[0])
867             self.assertLess(0, coll_save_mock.call_count)
868             self.assertEqual("", self.main_stdout.getvalue())
869
870     def test_request_id_logging_on_error(self):
871         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
872         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
873         coll_save_mock.side_effect = arvados.errors.ApiError(
874             fake_httplib2_response(403), b'{}')
875         with mock.patch('arvados.collection.Collection.save_new',
876                         new=coll_save_mock):
877             with self.assertRaises(SystemExit):
878                 self.call_main_with_args(['/dev/null'])
879             self.assertRegex(
880                 self.main_stderr.getvalue(), matcher)
881
882
883 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
884                             ArvadosBaseTestCase):
885     MAIN_SERVER = {}
886     KEEP_SERVER = {'blob_signing': True}
887     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
888
889     @classmethod
890     def setUpClass(cls):
891         super(ArvPutIntegrationTest, cls).setUpClass()
892         cls.ENVIRON = os.environ.copy()
893         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
894
895     def datetime_to_hex(self, dt):
896         return hex(int(time.mktime(dt.timetuple())))[2:]
897
898     def setUp(self):
899         super(ArvPutIntegrationTest, self).setUp()
900         arv_put.api_client = None
901
902     def authorize_with(self, token_name):
903         run_test_server.authorize_with(token_name)
904         for v in ["ARVADOS_API_HOST",
905                   "ARVADOS_API_HOST_INSECURE",
906                   "ARVADOS_API_TOKEN"]:
907             self.ENVIRON[v] = arvados.config.settings()[v]
908         arv_put.api_client = arvados.api('v1')
909
910     def current_user(self):
911         return arv_put.api_client.users().current().execute()
912
913     def test_check_real_project_found(self):
914         self.authorize_with('active')
915         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
916                         "did not correctly find test fixture project")
917
918     def test_check_error_finding_nonexistent_uuid(self):
919         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
920         self.authorize_with('active')
921         try:
922             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
923                                                   0)
924         except ValueError as error:
925             self.assertIn(BAD_UUID, str(error))
926         else:
927             self.assertFalse(result, "incorrectly found nonexistent project")
928
929     def test_check_error_finding_nonexistent_project(self):
930         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
931         self.authorize_with('active')
932         with self.assertRaises(apiclient.errors.HttpError):
933             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
934                                                   0)
935
936     def test_short_put_from_stdin(self):
937         # Have to run this as an integration test since arv-put can't
938         # read from the tests' stdin.
939         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
940         # case, because the /proc entry is already gone by the time it tries.
941         pipe = subprocess.Popen(
942             [sys.executable, arv_put.__file__, '--stream'],
943             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
944             stderr=subprocess.STDOUT, env=self.ENVIRON)
945         pipe.stdin.write(b'stdin test\xa6\n')
946         pipe.stdin.close()
947         deadline = time.time() + 5
948         while (pipe.poll() is None) and (time.time() < deadline):
949             time.sleep(.1)
950         returncode = pipe.poll()
951         if returncode is None:
952             pipe.terminate()
953             self.fail("arv-put did not PUT from stdin within 5 seconds")
954         elif returncode != 0:
955             sys.stdout.write(pipe.stdout.read())
956             self.fail("arv-put returned exit code {}".format(returncode))
957         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
958                       pipe.stdout.read().decode())
959
960     def test_sigint_logs_request_id(self):
961         # Start arv-put, give it a chance to start up, send SIGINT,
962         # and check that its output includes the X-Request-Id.
963         input_stream = subprocess.Popen(
964             ['sleep', '10'],
965             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
966         pipe = subprocess.Popen(
967             [sys.executable, arv_put.__file__, '--stream'],
968             stdin=input_stream.stdout, stdout=subprocess.PIPE,
969             stderr=subprocess.STDOUT, env=self.ENVIRON)
970         # Wait for arv-put child process to print something (i.e., a
971         # log message) so we know its signal handler is installed.
972         select.select([pipe.stdout], [], [], 10)
973         pipe.send_signal(signal.SIGINT)
974         deadline = time.time() + 5
975         while (pipe.poll() is None) and (time.time() < deadline):
976             time.sleep(.1)
977         returncode = pipe.poll()
978         input_stream.terminate()
979         if returncode is None:
980             pipe.terminate()
981             self.fail("arv-put did not exit within 5 seconds")
982         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
983
984     def test_ArvPutSignedManifest(self):
985         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
986         # the newly created manifest from the API server, testing to confirm
987         # that the block locators in the returned manifest are signed.
988         self.authorize_with('active')
989
990         # Before doing anything, demonstrate that the collection
991         # we're about to create is not present in our test fixture.
992         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
993         with self.assertRaises(apiclient.errors.HttpError):
994             arv_put.api_client.collections().get(
995                 uuid=manifest_uuid).execute()
996
997         datadir = self.make_tmpdir()
998         with open(os.path.join(datadir, "foo"), "w") as f:
999             f.write("The quick brown fox jumped over the lazy dog")
1000         p = subprocess.Popen([sys.executable, arv_put.__file__,
1001                               os.path.join(datadir, 'foo')],
1002                              stdout=subprocess.PIPE,
1003                              stderr=subprocess.PIPE,
1004                              env=self.ENVIRON)
1005         (_, err) = p.communicate()
1006         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
1007         self.assertEqual(p.returncode, 0)
1008
1009         # The manifest text stored in the API server under the same
1010         # manifest UUID must use signed locators.
1011         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
1012         self.assertRegex(
1013             c['manifest_text'],
1014             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1015
1016         os.remove(os.path.join(datadir, "foo"))
1017         os.rmdir(datadir)
1018
1019     def run_and_find_collection(self, text, extra_args=[]):
1020         self.authorize_with('active')
1021         pipe = subprocess.Popen(
1022             [sys.executable, arv_put.__file__] + extra_args,
1023             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1024             stderr=subprocess.PIPE, env=self.ENVIRON)
1025         stdout, stderr = pipe.communicate(text.encode())
1026         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1027         search_key = ('portable_data_hash'
1028                       if '--portable-data-hash' in extra_args else 'uuid')
1029         collection_list = arvados.api('v1').collections().list(
1030             filters=[[search_key, '=', stdout.decode().strip()]]
1031         ).execute().get('items', [])
1032         self.assertEqual(1, len(collection_list))
1033         return collection_list[0]
1034
1035     def test_all_expired_signatures_invalidates_cache(self):
1036         self.authorize_with('active')
1037         tmpdir = self.make_tmpdir()
1038         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1039             f.write('foo')
1040         # Upload a directory and get the cache file name
1041         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1042                              stdout=subprocess.PIPE,
1043                              stderr=subprocess.PIPE,
1044                              env=self.ENVIRON)
1045         (_, err) = p.communicate()
1046         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1047         self.assertEqual(p.returncode, 0)
1048         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1049                                    err.decode()).groups()[0]
1050         self.assertTrue(os.path.isfile(cache_filepath))
1051         # Load the cache file contents and modify the manifest to simulate
1052         # an expired access token
1053         with open(cache_filepath, 'r') as c:
1054             cache = json.load(c)
1055         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1056         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1057         cache['manifest'] = re.sub(
1058             r'\@.*? ',
1059             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1060             cache['manifest'])
1061         with open(cache_filepath, 'w') as c:
1062             c.write(json.dumps(cache))
1063         # Re-run the upload and expect to get an invalid cache message
1064         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1065                              stdout=subprocess.PIPE,
1066                              stderr=subprocess.PIPE,
1067                              env=self.ENVIRON)
1068         (_, err) = p.communicate()
1069         self.assertRegex(
1070             err.decode(),
1071             r'INFO: Cache expired, starting from scratch.*')
1072         self.assertEqual(p.returncode, 0)
1073
1074     def test_invalid_signature_in_cache(self):
1075         for batch_mode in [False, True]:
1076             self.authorize_with('active')
1077             tmpdir = self.make_tmpdir()
1078             with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1079                 f.write('foo')
1080             # Upload a directory and get the cache file name
1081             arv_put_args = [tmpdir]
1082             if batch_mode:
1083                 arv_put_args = ['--batch'] + arv_put_args
1084             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1085                                 stdout=subprocess.PIPE,
1086                                 stderr=subprocess.PIPE,
1087                                 env=self.ENVIRON)
1088             (_, err) = p.communicate()
1089             self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1090             self.assertEqual(p.returncode, 0)
1091             cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1092                                     err.decode()).groups()[0]
1093             self.assertTrue(os.path.isfile(cache_filepath))
1094             # Load the cache file contents and modify the manifest to simulate
1095             # an invalid access token
1096             with open(cache_filepath, 'r') as c:
1097                 cache = json.load(c)
1098             self.assertRegex(cache['manifest'], r'\+A\S+\@')
1099             cache['manifest'] = re.sub(
1100                 r'\+A.*\@',
1101                 "+Aabcdef0123456789abcdef0123456789abcdef01@",
1102                 cache['manifest'])
1103             with open(cache_filepath, 'w') as c:
1104                 c.write(json.dumps(cache))
1105             # Re-run the upload and expect to get an invalid cache message
1106             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1107                                 stdout=subprocess.PIPE,
1108                                 stderr=subprocess.PIPE,
1109                                 env=self.ENVIRON)
1110             (_, err) = p.communicate()
1111             if not batch_mode:
1112                 self.assertRegex(
1113                     err.decode(),
1114                     r'ERROR: arv-put: Resume cache contains invalid signature.*')
1115                 self.assertEqual(p.returncode, 1)
1116             else:
1117                 self.assertRegex(
1118                     err.decode(),
1119                     r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
1120                 self.assertEqual(p.returncode, 0)
1121
1122     def test_single_expired_signature_reuploads_file(self):
1123         self.authorize_with('active')
1124         tmpdir = self.make_tmpdir()
1125         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1126             f.write('foo')
1127         # Write a second file on its own subdir to force a new stream
1128         os.mkdir(os.path.join(tmpdir, 'bar'))
1129         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1130             f.write('bar')
1131         # Upload a directory and get the cache file name
1132         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1133                              stdout=subprocess.PIPE,
1134                              stderr=subprocess.PIPE,
1135                              env=self.ENVIRON)
1136         (_, err) = p.communicate()
1137         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1138         self.assertEqual(p.returncode, 0)
1139         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1140                                    err.decode()).groups()[0]
1141         self.assertTrue(os.path.isfile(cache_filepath))
1142         # Load the cache file contents and modify the manifest to simulate
1143         # an expired access token
1144         with open(cache_filepath, 'r') as c:
1145             cache = json.load(c)
1146         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1147         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1148         # Make one of the signatures appear to have expired
1149         cache['manifest'] = re.sub(
1150             r'\@.*? 3:3:barfile.txt',
1151             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1152             cache['manifest'])
1153         with open(cache_filepath, 'w') as c:
1154             c.write(json.dumps(cache))
1155         # Re-run the upload and expect to get an invalid cache message
1156         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1157                              stdout=subprocess.PIPE,
1158                              stderr=subprocess.PIPE,
1159                              env=self.ENVIRON)
1160         (_, err) = p.communicate()
1161         self.assertRegex(
1162             err.decode(),
1163             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1164         self.assertEqual(p.returncode, 0)
1165         # Confirm that the resulting cache is different from the last run.
1166         with open(cache_filepath, 'r') as c2:
1167             new_cache = json.load(c2)
1168         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1169
1170     def test_put_collection_with_later_update(self):
1171         tmpdir = self.make_tmpdir()
1172         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1173             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1174         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1175         self.assertNotEqual(None, col['uuid'])
1176         # Add a new file to the directory
1177         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1178             f.write('The quick brown fox jumped over the lazy dog')
1179         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1180         self.assertEqual(col['uuid'], updated_col['uuid'])
1181         # Get the manifest and check that the new file is being included
1182         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1183         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1184
1185     def test_put_collection_with_utc_expiring_datetime(self):
1186         tmpdir = self.make_tmpdir()
1187         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1188         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1189             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1190         col = self.run_and_find_collection(
1191             "",
1192             ['--no-progress', '--trash-at', trash_at, tmpdir])
1193         self.assertNotEqual(None, col['uuid'])
1194         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1195         self.assertEqual(ciso8601.parse_datetime(trash_at),
1196             ciso8601.parse_datetime(c['trash_at']))
1197
1198     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1199         tmpdir = self.make_tmpdir()
1200         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1201         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1202             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1203         col = self.run_and_find_collection(
1204             "",
1205             ['--no-progress', '--trash-at', trash_at, tmpdir])
1206         self.assertNotEqual(None, col['uuid'])
1207         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1208         self.assertEqual(
1209             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1210             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1211
1212     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1213         tmpdir = self.make_tmpdir()
1214         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1215         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1216             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1217         col = self.run_and_find_collection(
1218             "",
1219             ['--no-progress', '--trash-at', trash_at, tmpdir])
1220         self.assertNotEqual(None, col['uuid'])
1221         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1222         if time.daylight:
1223             offset = datetime.timedelta(seconds=time.altzone)
1224         else:
1225             offset = datetime.timedelta(seconds=time.timezone)
1226         self.assertEqual(
1227             ciso8601.parse_datetime(trash_at) + offset,
1228             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1229
1230     def test_put_collection_with_expiring_date_only(self):
1231         tmpdir = self.make_tmpdir()
1232         trash_at = '2140-01-01'
1233         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1234         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1235             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1236         col = self.run_and_find_collection(
1237             "",
1238             ['--no-progress', '--trash-at', trash_at, tmpdir])
1239         self.assertNotEqual(None, col['uuid'])
1240         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1241         if time.daylight:
1242             offset = datetime.timedelta(seconds=time.altzone)
1243         else:
1244             offset = datetime.timedelta(seconds=time.timezone)
1245         self.assertEqual(
1246             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1247             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1248
1249     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1250         cases = ['2100', '210010','2100-10', '2100-Oct']
1251         tmpdir = self.make_tmpdir()
1252         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1253             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1254         for test_datetime in cases:
1255             with self.assertRaises(AssertionError):
1256                 self.run_and_find_collection(
1257                     "",
1258                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1259
1260     def test_put_collection_with_relative_expiring_datetime(self):
1261         expire_after = 7
1262         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1263         tmpdir = self.make_tmpdir()
1264         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1265             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1266         col = self.run_and_find_collection(
1267             "",
1268             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1269         self.assertNotEqual(None, col['uuid'])
1270         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1271         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1272         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1273         self.assertTrue(dt_before < trash_at)
1274         self.assertTrue(dt_after > trash_at)
1275
1276     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1277         expire_after = 0 # Must be >= 1
1278         tmpdir = self.make_tmpdir()
1279         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1280             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1281         with self.assertRaises(AssertionError):
1282             self.run_and_find_collection(
1283                 "",
1284                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1285
1286     def test_upload_directory_reference_without_trailing_slash(self):
1287         tmpdir1 = self.make_tmpdir()
1288         tmpdir2 = self.make_tmpdir()
1289         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1290             f.write('This is foo')
1291         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1292             f.write('This is not foo')
1293         # Upload one directory and one file
1294         col = self.run_and_find_collection("", ['--no-progress',
1295                                                 tmpdir1,
1296                                                 os.path.join(tmpdir2, 'bar')])
1297         self.assertNotEqual(None, col['uuid'])
1298         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1299         # Check that 'foo' was written inside a subcollection
1300         # OTOH, 'bar' should have been directly uploaded on the root collection
1301         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1302
1303     def test_upload_directory_reference_with_trailing_slash(self):
1304         tmpdir1 = self.make_tmpdir()
1305         tmpdir2 = self.make_tmpdir()
1306         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1307             f.write('This is foo')
1308         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1309             f.write('This is not foo')
1310         # Upload one directory (with trailing slash) and one file
1311         col = self.run_and_find_collection("", ['--no-progress',
1312                                                 tmpdir1 + os.sep,
1313                                                 os.path.join(tmpdir2, 'bar')])
1314         self.assertNotEqual(None, col['uuid'])
1315         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1316         # Check that 'foo' and 'bar' were written at the same level
1317         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1318
1319     def test_put_collection_with_high_redundancy(self):
1320         # Write empty data: we're not testing CollectionWriter, just
1321         # making sure collections.create tells the API server what our
1322         # desired replication level is.
1323         collection = self.run_and_find_collection("", ['--replication', '4'])
1324         self.assertEqual(4, collection['replication_desired'])
1325
1326     def test_put_collection_with_default_redundancy(self):
1327         collection = self.run_and_find_collection("")
1328         self.assertEqual(None, collection['replication_desired'])
1329
1330     def test_put_collection_with_unnamed_project_link(self):
1331         link = self.run_and_find_collection(
1332             "Test unnamed collection",
1333             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1334         username = pwd.getpwuid(os.getuid()).pw_name
1335         self.assertRegex(
1336             link['name'],
1337             r'^Saved at .* by {}@'.format(re.escape(username)))
1338
1339     def test_put_collection_with_name_and_no_project(self):
1340         link_name = 'Test Collection Link in home project'
1341         collection = self.run_and_find_collection(
1342             "Test named collection in home project",
1343             ['--portable-data-hash', '--name', link_name])
1344         self.assertEqual(link_name, collection['name'])
1345         my_user_uuid = self.current_user()['uuid']
1346         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1347
1348     def test_put_collection_with_named_project_link(self):
1349         link_name = 'Test auto Collection Link'
1350         collection = self.run_and_find_collection("Test named collection",
1351                                       ['--portable-data-hash',
1352                                        '--name', link_name,
1353                                        '--project-uuid', self.PROJECT_UUID])
1354         self.assertEqual(link_name, collection['name'])
1355
1356     def test_put_collection_with_storage_classes_specified(self):
1357         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1358         self.assertEqual(len(collection['storage_classes_desired']), 1)
1359         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1360
1361     def test_put_collection_with_multiple_storage_classes_specified(self):
1362         collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar  ,baz'])
1363         self.assertEqual(len(collection['storage_classes_desired']), 3)
1364         self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
1365
1366     def test_put_collection_without_storage_classes_specified(self):
1367         collection = self.run_and_find_collection("")
1368         self.assertEqual(len(collection['storage_classes_desired']), 1)
1369         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1370
1371     def test_exclude_filename_pattern(self):
1372         tmpdir = self.make_tmpdir()
1373         tmpsubdir = os.path.join(tmpdir, 'subdir')
1374         os.mkdir(tmpsubdir)
1375         for fname in ['file1', 'file2', 'file3']:
1376             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1377                 f.write("This is %s" % fname)
1378             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1379                 f.write("This is %s" % fname)
1380         col = self.run_and_find_collection("", ['--no-progress',
1381                                                 '--exclude', '*2.txt',
1382                                                 '--exclude', 'file3.*',
1383                                                  tmpdir])
1384         self.assertNotEqual(None, col['uuid'])
1385         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1386         # None of the file2.txt & file3.txt should have been uploaded
1387         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1388         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1389         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1390
1391     def test_exclude_filepath_pattern(self):
1392         tmpdir = self.make_tmpdir()
1393         tmpsubdir = os.path.join(tmpdir, 'subdir')
1394         os.mkdir(tmpsubdir)
1395         for fname in ['file1', 'file2', 'file3']:
1396             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1397                 f.write("This is %s" % fname)
1398             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1399                 f.write("This is %s" % fname)
1400         col = self.run_and_find_collection("", ['--no-progress',
1401                                                 '--exclude', 'subdir/*2.txt',
1402                                                 '--exclude', './file1.*',
1403                                                  tmpdir])
1404         self.assertNotEqual(None, col['uuid'])
1405         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1406         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1407         self.assertNotRegex(c['manifest_text'],
1408                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1409         self.assertNotRegex(c['manifest_text'],
1410                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1411         self.assertRegex(c['manifest_text'],
1412                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1413         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1414
1415     def test_unicode_on_filename(self):
1416         tmpdir = self.make_tmpdir()
1417         fname = u"iā¤arvados.txt"
1418         with open(os.path.join(tmpdir, fname), 'w') as f:
1419             f.write("This is a unicode named file")
1420         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1421         self.assertNotEqual(None, col['uuid'])
1422         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1423         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1424
1425     def test_silent_mode_no_errors(self):
1426         self.authorize_with('active')
1427         tmpdir = self.make_tmpdir()
1428         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1429             f.write('hello world')
1430         pipe = subprocess.Popen(
1431             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1432             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1433             stderr=subprocess.PIPE, env=self.ENVIRON)
1434         stdout, stderr = pipe.communicate()
1435         # No console output should occur on normal operations
1436         self.assertNotRegex(stderr.decode(), r'.+')
1437         self.assertNotRegex(stdout.decode(), r'.+')
1438
1439     def test_silent_mode_does_not_avoid_error_messages(self):
1440         self.authorize_with('active')
1441         pipe = subprocess.Popen(
1442             [sys.executable, arv_put.__file__] + ['--silent',
1443                                                   '/path/not/existant'],
1444             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1445             stderr=subprocess.PIPE, env=self.ENVIRON)
1446         stdout, stderr = pipe.communicate()
1447         # Error message should be displayed when errors happen
1448         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1449         self.assertNotRegex(stdout.decode(), r'.+')
1450
1451
1452 if __name__ == '__main__':
1453     unittest.main()