17936: Adds --batch implementation, making the new test pass.
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 from __future__ import absolute_import
8 from __future__ import division
9 from future import standard_library
10 standard_library.install_aliases()
11 from builtins import str
12 from builtins import range
13 from functools import partial
14 import apiclient
15 import ciso8601
16 import datetime
17 import hashlib
18 import json
19 import logging
20 import mock
21 import os
22 import pwd
23 import random
24 import re
25 import select
26 import shutil
27 import signal
28 import subprocess
29 import sys
30 import tempfile
31 import time
32 import unittest
33 import uuid
34 import yaml
35
36 import arvados
37 import arvados.commands.put as arv_put
38 from . import arvados_testutil as tutil
39
40 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
41 from . import run_test_server
42
43 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
44     CACHE_ARGSET = [
45         [],
46         ['/dev/null'],
47         ['/dev/null', '--filename', 'empty'],
48         ['/tmp']
49         ]
50
51     def tearDown(self):
52         super(ArvadosPutResumeCacheTest, self).tearDown()
53         try:
54             self.last_cache.destroy()
55         except AttributeError:
56             pass
57
58     def cache_path_from_arglist(self, arglist):
59         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
60
61     def test_cache_names_stable(self):
62         for argset in self.CACHE_ARGSET:
63             self.assertEqual(self.cache_path_from_arglist(argset),
64                               self.cache_path_from_arglist(argset),
65                               "cache name changed for {}".format(argset))
66
67     def test_cache_names_unique(self):
68         results = []
69         for argset in self.CACHE_ARGSET:
70             path = self.cache_path_from_arglist(argset)
71             self.assertNotIn(path, results)
72             results.append(path)
73
74     def test_cache_names_simple(self):
75         # The goal here is to make sure the filename doesn't use characters
76         # reserved by the filesystem.  Feel free to adjust this regexp as
77         # long as it still does that.
78         bad_chars = re.compile(r'[^-\.\w]')
79         for argset in self.CACHE_ARGSET:
80             path = self.cache_path_from_arglist(argset)
81             self.assertFalse(bad_chars.search(os.path.basename(path)),
82                              "path too exotic: {}".format(path))
83
84     def test_cache_names_ignore_argument_order(self):
85         self.assertEqual(
86             self.cache_path_from_arglist(['a', 'b', 'c']),
87             self.cache_path_from_arglist(['c', 'a', 'b']))
88         self.assertEqual(
89             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
90             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
91
92     def test_cache_names_differ_for_similar_paths(self):
93         # This test needs names at / that don't exist on the real filesystem.
94         self.assertNotEqual(
95             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
96             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
97
98     def test_cache_names_ignore_irrelevant_arguments(self):
99         # Workaround: parse_arguments bails on --filename with a directory.
100         path1 = self.cache_path_from_arglist(['/tmp'])
101         args = arv_put.parse_arguments(['/tmp'])
102         args.filename = 'tmp'
103         path2 = arv_put.ResumeCache.make_path(args)
104         self.assertEqual(path1, path2,
105                          "cache path considered --filename for directory")
106         self.assertEqual(
107             self.cache_path_from_arglist(['-']),
108             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
109             "cache path considered --max-manifest-depth for file")
110
111     def test_cache_names_treat_negative_manifest_depths_identically(self):
112         base_args = ['/tmp', '--max-manifest-depth']
113         self.assertEqual(
114             self.cache_path_from_arglist(base_args + ['-1']),
115             self.cache_path_from_arglist(base_args + ['-2']))
116
117     def test_cache_names_treat_stdin_consistently(self):
118         self.assertEqual(
119             self.cache_path_from_arglist(['-', '--filename', 'test']),
120             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
121
122     def test_cache_names_identical_for_synonymous_names(self):
123         self.assertEqual(
124             self.cache_path_from_arglist(['.']),
125             self.cache_path_from_arglist([os.path.realpath('.')]))
126         testdir = self.make_tmpdir()
127         looplink = os.path.join(testdir, 'loop')
128         os.symlink(testdir, looplink)
129         self.assertEqual(
130             self.cache_path_from_arglist([testdir]),
131             self.cache_path_from_arglist([looplink]))
132
133     def test_cache_names_different_by_api_host(self):
134         config = arvados.config.settings()
135         orig_host = config.get('ARVADOS_API_HOST')
136         try:
137             name1 = self.cache_path_from_arglist(['.'])
138             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
139             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
140         finally:
141             if orig_host is None:
142                 del config['ARVADOS_API_HOST']
143             else:
144                 config['ARVADOS_API_HOST'] = orig_host
145
146     @mock.patch('arvados.keep.KeepClient.head')
147     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
148         keep_client_head.side_effect = [True]
149         thing = {}
150         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
151         with tempfile.NamedTemporaryFile() as cachefile:
152             self.last_cache = arv_put.ResumeCache(cachefile.name)
153         self.last_cache.save(thing)
154         self.last_cache.close()
155         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
156         self.assertNotEqual(None, resume_cache)
157
158     @mock.patch('arvados.keep.KeepClient.head')
159     def test_resume_cache_with_finished_streams(self, keep_client_head):
160         keep_client_head.side_effect = [True]
161         thing = {}
162         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
163         with tempfile.NamedTemporaryFile() as cachefile:
164             self.last_cache = arv_put.ResumeCache(cachefile.name)
165         self.last_cache.save(thing)
166         self.last_cache.close()
167         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
168         self.assertNotEqual(None, resume_cache)
169
170     @mock.patch('arvados.keep.KeepClient.head')
171     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
172         keep_client_head.side_effect = Exception('Locator not found')
173         thing = {}
174         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
175         with tempfile.NamedTemporaryFile() as cachefile:
176             self.last_cache = arv_put.ResumeCache(cachefile.name)
177         self.last_cache.save(thing)
178         self.last_cache.close()
179         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
180         self.assertNotEqual(None, resume_cache)
181         resume_cache.check_cache()
182
183     def test_basic_cache_storage(self):
184         thing = ['test', 'list']
185         with tempfile.NamedTemporaryFile() as cachefile:
186             self.last_cache = arv_put.ResumeCache(cachefile.name)
187         self.last_cache.save(thing)
188         self.assertEqual(thing, self.last_cache.load())
189
190     def test_empty_cache(self):
191         with tempfile.NamedTemporaryFile() as cachefile:
192             cache = arv_put.ResumeCache(cachefile.name)
193         self.assertRaises(ValueError, cache.load)
194
195     def test_cache_persistent(self):
196         thing = ['test', 'list']
197         path = os.path.join(self.make_tmpdir(), 'cache')
198         cache = arv_put.ResumeCache(path)
199         cache.save(thing)
200         cache.close()
201         self.last_cache = arv_put.ResumeCache(path)
202         self.assertEqual(thing, self.last_cache.load())
203
204     def test_multiple_cache_writes(self):
205         thing = ['short', 'list']
206         with tempfile.NamedTemporaryFile() as cachefile:
207             self.last_cache = arv_put.ResumeCache(cachefile.name)
208         # Start writing an object longer than the one we test, to make
209         # sure the cache file gets truncated.
210         self.last_cache.save(['long', 'long', 'list'])
211         self.last_cache.save(thing)
212         self.assertEqual(thing, self.last_cache.load())
213
214     def test_cache_is_locked(self):
215         with tempfile.NamedTemporaryFile() as cachefile:
216             _ = arv_put.ResumeCache(cachefile.name)
217             self.assertRaises(arv_put.ResumeCacheConflict,
218                               arv_put.ResumeCache, cachefile.name)
219
220     def test_cache_stays_locked(self):
221         with tempfile.NamedTemporaryFile() as cachefile:
222             self.last_cache = arv_put.ResumeCache(cachefile.name)
223             path = cachefile.name
224         self.last_cache.save('test')
225         self.assertRaises(arv_put.ResumeCacheConflict,
226                           arv_put.ResumeCache, path)
227
228     def test_destroy_cache(self):
229         cachefile = tempfile.NamedTemporaryFile(delete=False)
230         try:
231             cache = arv_put.ResumeCache(cachefile.name)
232             cache.save('test')
233             cache.destroy()
234             try:
235                 arv_put.ResumeCache(cachefile.name)
236             except arv_put.ResumeCacheConflict:
237                 self.fail("could not load cache after destroying it")
238             self.assertRaises(ValueError, cache.load)
239         finally:
240             if os.path.exists(cachefile.name):
241                 os.unlink(cachefile.name)
242
243     def test_restart_cache(self):
244         path = os.path.join(self.make_tmpdir(), 'cache')
245         cache = arv_put.ResumeCache(path)
246         cache.save('test')
247         cache.restart()
248         self.assertRaises(ValueError, cache.load)
249         self.assertRaises(arv_put.ResumeCacheConflict,
250                           arv_put.ResumeCache, path)
251
252
253 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
254                           ArvadosBaseTestCase):
255
256     def setUp(self):
257         super(ArvPutUploadJobTest, self).setUp()
258         run_test_server.authorize_with('active')
259         # Temp files creation
260         self.tempdir = tempfile.mkdtemp()
261         subdir = os.path.join(self.tempdir, 'subdir')
262         os.mkdir(subdir)
263         data = "x" * 1024 # 1 KB
264         for i in range(1, 5):
265             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
266                 f.write(data * i)
267         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
268             f.write(data * 5)
269         # Large temp file for resume test
270         _, self.large_file_name = tempfile.mkstemp()
271         fileobj = open(self.large_file_name, 'w')
272         # Make sure to write just a little more than one block
273         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
274             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
275             fileobj.write(data)
276         fileobj.close()
277         # Temp dir containing small files to be repacked
278         self.small_files_dir = tempfile.mkdtemp()
279         data = 'y' * 1024 * 1024 # 1 MB
280         for i in range(1, 70):
281             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
282                 f.write(data + str(i))
283         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
284         # Temp dir to hold a symlink to other temp dir
285         self.tempdir_with_symlink = tempfile.mkdtemp()
286         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
287         os.symlink(os.path.join(self.tempdir, '1'),
288                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
289
290     def tearDown(self):
291         super(ArvPutUploadJobTest, self).tearDown()
292         shutil.rmtree(self.tempdir)
293         os.unlink(self.large_file_name)
294         shutil.rmtree(self.small_files_dir)
295         shutil.rmtree(self.tempdir_with_symlink)
296
297     def test_symlinks_are_followed_by_default(self):
298         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
299         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
300         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
301         cwriter.start(save_collection=False)
302         self.assertIn('linkeddir', cwriter.manifest_text())
303         self.assertIn('linkedfile', cwriter.manifest_text())
304         cwriter.destroy_cache()
305
306     def test_symlinks_are_not_followed_when_requested(self):
307         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
308         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
309         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
310                                           follow_links=False)
311         cwriter.start(save_collection=False)
312         self.assertNotIn('linkeddir', cwriter.manifest_text())
313         self.assertNotIn('linkedfile', cwriter.manifest_text())
314         cwriter.destroy_cache()
315         # Check for bug #17800: passed symlinks should also be ignored.
316         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
317         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
318         cwriter.start(save_collection=False)
319         self.assertNotIn('linkeddir', cwriter.manifest_text())
320         cwriter.destroy_cache()
321
322     def test_no_empty_collection_saved(self):
323         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
324         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
325         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
326         cwriter.start(save_collection=True)
327         self.assertIsNone(cwriter.manifest_locator())
328         self.assertEqual('', cwriter.manifest_text())
329         cwriter.destroy_cache()
330
331     def test_passing_nonexistant_path_raise_exception(self):
332         uuid_str = str(uuid.uuid4())
333         with self.assertRaises(arv_put.PathDoesNotExistError):
334             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
335
336     def test_writer_works_without_cache(self):
337         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
338         cwriter.start(save_collection=False)
339         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
340
341     def test_writer_works_with_cache(self):
342         with tempfile.NamedTemporaryFile() as f:
343             f.write(b'foo')
344             f.flush()
345             cwriter = arv_put.ArvPutUploadJob([f.name])
346             cwriter.start(save_collection=False)
347             self.assertEqual(0, cwriter.bytes_skipped)
348             self.assertEqual(3, cwriter.bytes_written)
349             # Don't destroy the cache, and start another upload
350             cwriter_new = arv_put.ArvPutUploadJob([f.name])
351             cwriter_new.start(save_collection=False)
352             cwriter_new.destroy_cache()
353             self.assertEqual(3, cwriter_new.bytes_skipped)
354             self.assertEqual(3, cwriter_new.bytes_written)
355
356     def make_progress_tester(self):
357         progression = []
358         def record_func(written, expected):
359             progression.append((written, expected))
360         return progression, record_func
361
362     def test_progress_reporting(self):
363         with tempfile.NamedTemporaryFile() as f:
364             f.write(b'foo')
365             f.flush()
366             for expect_count in (None, 8):
367                 progression, reporter = self.make_progress_tester()
368                 cwriter = arv_put.ArvPutUploadJob([f.name],
369                                                   reporter=reporter)
370                 cwriter.bytes_expected = expect_count
371                 cwriter.start(save_collection=False)
372                 cwriter.destroy_cache()
373                 self.assertIn((3, expect_count), progression)
374
375     def test_writer_upload_directory(self):
376         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
377         cwriter.start(save_collection=False)
378         cwriter.destroy_cache()
379         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
380
381     def test_resume_large_file_upload(self):
382         def wrapped_write(*args, **kwargs):
383             data = args[1]
384             # Exit only on last block
385             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
386                 # Simulate a checkpoint before quitting. Ensure block commit.
387                 self.writer._update(final=True)
388                 raise SystemExit("Simulated error")
389             return self.arvfile_write(*args, **kwargs)
390
391         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
392                         autospec=True) as mocked_write:
393             mocked_write.side_effect = wrapped_write
394             writer = arv_put.ArvPutUploadJob([self.large_file_name],
395                                              replication_desired=1)
396             # We'll be accessing from inside the wrapper
397             self.writer = writer
398             with self.assertRaises(SystemExit):
399                 writer.start(save_collection=False)
400             # Confirm that the file was partially uploaded
401             self.assertGreater(writer.bytes_written, 0)
402             self.assertLess(writer.bytes_written,
403                             os.path.getsize(self.large_file_name))
404         # Retry the upload
405         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
406                                           replication_desired=1)
407         writer2.start(save_collection=False)
408         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
409                          os.path.getsize(self.large_file_name))
410         writer2.destroy_cache()
411         del(self.writer)
412
413     # Test for bug #11002
414     def test_graceful_exit_while_repacking_small_blocks(self):
415         def wrapped_commit(*args, **kwargs):
416             raise SystemExit("Simulated error")
417
418         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
419                         autospec=True) as mocked_commit:
420             mocked_commit.side_effect = wrapped_commit
421             # Upload a little more than 1 block, wrapped_commit will make the first block
422             # commit to fail.
423             # arv-put should not exit with an exception by trying to commit the collection
424             # as it's in an inconsistent state.
425             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
426                                              replication_desired=1)
427             try:
428                 with self.assertRaises(SystemExit):
429                     writer.start(save_collection=False)
430             except arvados.arvfile.UnownedBlockError:
431                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
432         writer.destroy_cache()
433
434     def test_no_resume_when_asked(self):
435         def wrapped_write(*args, **kwargs):
436             data = args[1]
437             # Exit only on last block
438             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
439                 # Simulate a checkpoint before quitting.
440                 self.writer._update()
441                 raise SystemExit("Simulated error")
442             return self.arvfile_write(*args, **kwargs)
443
444         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
445                         autospec=True) as mocked_write:
446             mocked_write.side_effect = wrapped_write
447             writer = arv_put.ArvPutUploadJob([self.large_file_name],
448                                              replication_desired=1)
449             # We'll be accessing from inside the wrapper
450             self.writer = writer
451             with self.assertRaises(SystemExit):
452                 writer.start(save_collection=False)
453             # Confirm that the file was partially uploaded
454             self.assertGreater(writer.bytes_written, 0)
455             self.assertLess(writer.bytes_written,
456                             os.path.getsize(self.large_file_name))
457         # Retry the upload, this time without resume
458         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
459                                           replication_desired=1,
460                                           resume=False)
461         writer2.start(save_collection=False)
462         self.assertEqual(writer2.bytes_skipped, 0)
463         self.assertEqual(writer2.bytes_written,
464                          os.path.getsize(self.large_file_name))
465         writer2.destroy_cache()
466         del(self.writer)
467
468     def test_no_resume_when_no_cache(self):
469         def wrapped_write(*args, **kwargs):
470             data = args[1]
471             # Exit only on last block
472             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
473                 # Simulate a checkpoint before quitting.
474                 self.writer._update()
475                 raise SystemExit("Simulated error")
476             return self.arvfile_write(*args, **kwargs)
477
478         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
479                         autospec=True) as mocked_write:
480             mocked_write.side_effect = wrapped_write
481             writer = arv_put.ArvPutUploadJob([self.large_file_name],
482                                              replication_desired=1)
483             # We'll be accessing from inside the wrapper
484             self.writer = writer
485             with self.assertRaises(SystemExit):
486                 writer.start(save_collection=False)
487             # Confirm that the file was partially uploaded
488             self.assertGreater(writer.bytes_written, 0)
489             self.assertLess(writer.bytes_written,
490                             os.path.getsize(self.large_file_name))
491         # Retry the upload, this time without cache usage
492         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
493                                           replication_desired=1,
494                                           resume=False,
495                                           use_cache=False)
496         writer2.start(save_collection=False)
497         self.assertEqual(writer2.bytes_skipped, 0)
498         self.assertEqual(writer2.bytes_written,
499                          os.path.getsize(self.large_file_name))
500         writer2.destroy_cache()
501         del(self.writer)
502
503     def test_dry_run_feature(self):
504         def wrapped_write(*args, **kwargs):
505             data = args[1]
506             # Exit only on last block
507             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
508                 # Simulate a checkpoint before quitting.
509                 self.writer._update()
510                 raise SystemExit("Simulated error")
511             return self.arvfile_write(*args, **kwargs)
512
513         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
514                         autospec=True) as mocked_write:
515             mocked_write.side_effect = wrapped_write
516             writer = arv_put.ArvPutUploadJob([self.large_file_name],
517                                              replication_desired=1)
518             # We'll be accessing from inside the wrapper
519             self.writer = writer
520             with self.assertRaises(SystemExit):
521                 writer.start(save_collection=False)
522             # Confirm that the file was partially uploaded
523             self.assertGreater(writer.bytes_written, 0)
524             self.assertLess(writer.bytes_written,
525                             os.path.getsize(self.large_file_name))
526         with self.assertRaises(arv_put.ArvPutUploadIsPending):
527             # Retry the upload using dry_run to check if there is a pending upload
528             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
529                                               replication_desired=1,
530                                               dry_run=True)
531         # Complete the pending upload
532         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
533                                           replication_desired=1)
534         writer3.start(save_collection=False)
535         with self.assertRaises(arv_put.ArvPutUploadNotPending):
536             # Confirm there's no pending upload with dry_run=True
537             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
538                                               replication_desired=1,
539                                               dry_run=True)
540         # Test obvious cases
541         with self.assertRaises(arv_put.ArvPutUploadIsPending):
542             arv_put.ArvPutUploadJob([self.large_file_name],
543                                     replication_desired=1,
544                                     dry_run=True,
545                                     resume=False,
546                                     use_cache=False)
547         with self.assertRaises(arv_put.ArvPutUploadIsPending):
548             arv_put.ArvPutUploadJob([self.large_file_name],
549                                     replication_desired=1,
550                                     dry_run=True,
551                                     resume=False)
552         del(self.writer)
553
554 class CachedManifestValidationTest(ArvadosBaseTestCase):
555     class MockedPut(arv_put.ArvPutUploadJob):
556         def __init__(self, cached_manifest=None):
557             self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
558             self._state['manifest'] = cached_manifest
559             self._api_client = mock.MagicMock()
560             self.logger = mock.MagicMock()
561             self.num_retries = 1
562
563     def datetime_to_hex(self, dt):
564         return hex(int(time.mktime(dt.timetuple())))[2:]
565
566     def setUp(self):
567         super(CachedManifestValidationTest, self).setUp()
568         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
569         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
570         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
571
572     def test_empty_cached_manifest_is_valid(self):
573         put_mock = self.MockedPut()
574         self.assertEqual(None, put_mock._state.get('manifest'))
575         self.assertTrue(put_mock._cached_manifest_valid())
576         put_mock._state['manifest'] = ''
577         self.assertTrue(put_mock._cached_manifest_valid())
578
579     def test_signature_cases(self):
580         now = datetime.datetime.utcnow()
581         yesterday = now - datetime.timedelta(days=1)
582         lastweek = now - datetime.timedelta(days=7)
583         tomorrow = now + datetime.timedelta(days=1)
584         nextweek = now + datetime.timedelta(days=7)
585
586         def mocked_head(blocks={}, loc=None):
587             blk = loc.split('+', 1)[0]
588             if blocks.get(blk):
589                 return True
590             raise arvados.errors.KeepRequestError("mocked error - block invalid")
591
592         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
593         cases = [
594             # All expired, reset cache - OK
595             (yesterday, lastweek, False, False, True),
596             (lastweek, yesterday, False, False, True),
597             # All non-expired valid blocks - OK
598             (tomorrow, nextweek, True, True, True),
599             (nextweek, tomorrow, True, True, True),
600             # All non-expired invalid blocks - Not OK
601             (tomorrow, nextweek, False, False, False),
602             (nextweek, tomorrow, False, False, False),
603             # One non-expired valid block - OK
604             (tomorrow, yesterday, True, False, True),
605             (yesterday, tomorrow, False, True, True),
606             # One non-expired invalid block - Not OK
607             (tomorrow, yesterday, False, False, False),
608             (yesterday, tomorrow, False, False, False),
609         ]
610         for case in cases:
611             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
612             head_responses = {
613                 self.block1: b1_valid,
614                 self.block2: b2_valid,
615             }
616             cached_manifest = self.template % (
617                 self.datetime_to_hex(b1_expiration),
618                 self.datetime_to_hex(b2_expiration),
619             )
620             arvput = self.MockedPut(cached_manifest)
621             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
622                 head_mock.side_effect = partial(mocked_head, head_responses)
623                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
624                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
625                 )
626                 if b1_expiration > now or b2_expiration > now:
627                     # A HEAD request should have been done
628                     head_mock.assert_called_once()
629                 else:
630                     head_mock.assert_not_called()
631
632
633 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
634     TEST_SIZE = os.path.getsize(__file__)
635
636     def test_expected_bytes_for_file(self):
637         writer = arv_put.ArvPutUploadJob([__file__])
638         self.assertEqual(self.TEST_SIZE,
639                          writer.bytes_expected)
640
641     def test_expected_bytes_for_tree(self):
642         tree = self.make_tmpdir()
643         shutil.copyfile(__file__, os.path.join(tree, 'one'))
644         shutil.copyfile(__file__, os.path.join(tree, 'two'))
645
646         writer = arv_put.ArvPutUploadJob([tree])
647         self.assertEqual(self.TEST_SIZE * 2,
648                          writer.bytes_expected)
649         writer = arv_put.ArvPutUploadJob([tree, __file__])
650         self.assertEqual(self.TEST_SIZE * 3,
651                          writer.bytes_expected)
652
653     def test_expected_bytes_for_device(self):
654         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
655         self.assertIsNone(writer.bytes_expected)
656         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
657         self.assertIsNone(writer.bytes_expected)
658
659
660 class ArvadosPutReportTest(ArvadosBaseTestCase):
661     def test_machine_progress(self):
662         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
663             expect = ": {} written {} total\n".format(
664                 count, -1 if (total is None) else total)
665             self.assertTrue(
666                 arv_put.machine_progress(count, total).endswith(expect))
667
668     def test_known_human_progress(self):
669         for count, total in [(0, 1), (2, 4), (45, 60)]:
670             expect = '{:.1%}'.format(1.0*count/total)
671             actual = arv_put.human_progress(count, total)
672             self.assertTrue(actual.startswith('\r'))
673             self.assertIn(expect, actual)
674
675     def test_unknown_human_progress(self):
676         for count in [1, 20, 300, 4000, 50000]:
677             self.assertTrue(re.search(r'\b{}\b'.format(count),
678                                       arv_put.human_progress(count, None)))
679
680
681 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
682     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
683
684     def setUp(self):
685         super(ArvPutLogFormatterTest, self).setUp()
686         self.stderr = tutil.StringIO()
687         self.loggingHandler = logging.StreamHandler(self.stderr)
688         self.loggingHandler.setFormatter(
689             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
690         self.logger = logging.getLogger()
691         self.logger.addHandler(self.loggingHandler)
692         self.logger.setLevel(logging.DEBUG)
693
694     def tearDown(self):
695         self.logger.removeHandler(self.loggingHandler)
696         self.stderr.close()
697         self.stderr = None
698         super(ArvPutLogFormatterTest, self).tearDown()
699
700     def test_request_id_logged_only_once_on_error(self):
701         self.logger.error('Ooops, something bad happened.')
702         self.logger.error('Another bad thing just happened.')
703         log_lines = self.stderr.getvalue().split('\n')[:-1]
704         self.assertEqual(2, len(log_lines))
705         self.assertRegex(log_lines[0], self.matcher)
706         self.assertNotRegex(log_lines[1], self.matcher)
707
708     def test_request_id_logged_only_once_on_debug(self):
709         self.logger.debug('This is just a debug message.')
710         self.logger.debug('Another message, move along.')
711         log_lines = self.stderr.getvalue().split('\n')[:-1]
712         self.assertEqual(2, len(log_lines))
713         self.assertRegex(log_lines[0], self.matcher)
714         self.assertNotRegex(log_lines[1], self.matcher)
715
716     def test_request_id_not_logged_on_info(self):
717         self.logger.info('This should be a useful message')
718         log_lines = self.stderr.getvalue().split('\n')[:-1]
719         self.assertEqual(1, len(log_lines))
720         self.assertNotRegex(log_lines[0], self.matcher)
721
722 class ArvadosPutTest(run_test_server.TestCaseWithServers,
723                      ArvadosBaseTestCase,
724                      tutil.VersionChecker):
725     MAIN_SERVER = {}
726     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
727
728     def call_main_with_args(self, args):
729         self.main_stdout.seek(0, 0)
730         self.main_stdout.truncate(0)
731         self.main_stderr.seek(0, 0)
732         self.main_stderr.truncate(0)
733         return arv_put.main(args, self.main_stdout, self.main_stderr)
734
735     def call_main_on_test_file(self, args=[]):
736         with self.make_test_file() as testfile:
737             path = testfile.name
738             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
739         self.assertTrue(
740             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
741                                         '098f6bcd4621d373cade4e832627b4f6')),
742             "did not find file stream in Keep store")
743
744     def setUp(self):
745         super(ArvadosPutTest, self).setUp()
746         run_test_server.authorize_with('active')
747         arv_put.api_client = None
748         self.main_stdout = tutil.StringIO()
749         self.main_stderr = tutil.StringIO()
750         self.loggingHandler = logging.StreamHandler(self.main_stderr)
751         self.loggingHandler.setFormatter(
752             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
753         logging.getLogger().addHandler(self.loggingHandler)
754
755     def tearDown(self):
756         logging.getLogger().removeHandler(self.loggingHandler)
757         for outbuf in ['main_stdout', 'main_stderr']:
758             if hasattr(self, outbuf):
759                 getattr(self, outbuf).close()
760                 delattr(self, outbuf)
761         super(ArvadosPutTest, self).tearDown()
762
763     def test_version_argument(self):
764         with tutil.redirected_streams(
765                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
766             with self.assertRaises(SystemExit):
767                 self.call_main_with_args(['--version'])
768         self.assertVersionOutput(out, err)
769
770     def test_simple_file_put(self):
771         self.call_main_on_test_file()
772
773     def test_put_with_unwriteable_cache_dir(self):
774         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
775         cachedir = self.make_tmpdir()
776         os.chmod(cachedir, 0o0)
777         arv_put.ResumeCache.CACHE_DIR = cachedir
778         try:
779             self.call_main_on_test_file()
780         finally:
781             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
782             os.chmod(cachedir, 0o700)
783
784     def test_put_with_unwritable_cache_subdir(self):
785         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
786         cachedir = self.make_tmpdir()
787         os.chmod(cachedir, 0o0)
788         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
789         try:
790             self.call_main_on_test_file()
791         finally:
792             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
793             os.chmod(cachedir, 0o700)
794
795     def test_put_block_replication(self):
796         self.call_main_on_test_file()
797         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
798             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
799             self.call_main_on_test_file(['--replication', '1'])
800             self.call_main_on_test_file(['--replication', '4'])
801             self.call_main_on_test_file(['--replication', '5'])
802             self.assertEqual(
803                 [x[-1].get('copies') for x in put_mock.call_args_list],
804                 [1, 4, 5])
805
806     def test_normalize(self):
807         testfile1 = self.make_test_file()
808         testfile2 = self.make_test_file()
809         test_paths = [testfile1.name, testfile2.name]
810         # Reverse-sort the paths, so normalization must change their order.
811         test_paths.sort(reverse=True)
812         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
813                                  test_paths)
814         manifest = self.main_stdout.getvalue()
815         # Assert the second file we specified appears first in the manifest.
816         file_indices = [manifest.find(':' + os.path.basename(path))
817                         for path in test_paths]
818         self.assertGreater(*file_indices)
819
820     def test_error_name_without_collection(self):
821         self.assertRaises(SystemExit, self.call_main_with_args,
822                           ['--name', 'test without Collection',
823                            '--stream', '/dev/null'])
824
825     def test_error_when_project_not_found(self):
826         self.assertRaises(SystemExit,
827                           self.call_main_with_args,
828                           ['--project-uuid', self.Z_UUID])
829
830     def test_error_bad_project_uuid(self):
831         self.assertRaises(SystemExit,
832                           self.call_main_with_args,
833                           ['--project-uuid', self.Z_UUID, '--stream'])
834
835     def test_error_when_multiple_storage_classes_specified(self):
836         self.assertRaises(SystemExit,
837                           self.call_main_with_args,
838                           ['--storage-classes', 'hot,cold'])
839
840     def test_error_when_excluding_absolute_path(self):
841         tmpdir = self.make_tmpdir()
842         self.assertRaises(SystemExit,
843                           self.call_main_with_args,
844                           ['--exclude', '/some/absolute/path/*',
845                            tmpdir])
846
847     def test_api_error_handling(self):
848         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
849         coll_save_mock.side_effect = arvados.errors.ApiError(
850             fake_httplib2_response(403), b'{}')
851         with mock.patch('arvados.collection.Collection.save_new',
852                         new=coll_save_mock):
853             with self.assertRaises(SystemExit) as exc_test:
854                 self.call_main_with_args(['/dev/null'])
855             self.assertLess(0, exc_test.exception.args[0])
856             self.assertLess(0, coll_save_mock.call_count)
857             self.assertEqual("", self.main_stdout.getvalue())
858
859     def test_request_id_logging_on_error(self):
860         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
861         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
862         coll_save_mock.side_effect = arvados.errors.ApiError(
863             fake_httplib2_response(403), b'{}')
864         with mock.patch('arvados.collection.Collection.save_new',
865                         new=coll_save_mock):
866             with self.assertRaises(SystemExit):
867                 self.call_main_with_args(['/dev/null'])
868             self.assertRegex(
869                 self.main_stderr.getvalue(), matcher)
870
871
872 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
873                             ArvadosBaseTestCase):
874     MAIN_SERVER = {}
875     KEEP_SERVER = {'blob_signing': True}
876     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
877
878     @classmethod
879     def setUpClass(cls):
880         super(ArvPutIntegrationTest, cls).setUpClass()
881         cls.ENVIRON = os.environ.copy()
882         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
883
884     def datetime_to_hex(self, dt):
885         return hex(int(time.mktime(dt.timetuple())))[2:]
886
887     def setUp(self):
888         super(ArvPutIntegrationTest, self).setUp()
889         arv_put.api_client = None
890
891     def authorize_with(self, token_name):
892         run_test_server.authorize_with(token_name)
893         for v in ["ARVADOS_API_HOST",
894                   "ARVADOS_API_HOST_INSECURE",
895                   "ARVADOS_API_TOKEN"]:
896             self.ENVIRON[v] = arvados.config.settings()[v]
897         arv_put.api_client = arvados.api('v1')
898
899     def current_user(self):
900         return arv_put.api_client.users().current().execute()
901
902     def test_check_real_project_found(self):
903         self.authorize_with('active')
904         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
905                         "did not correctly find test fixture project")
906
907     def test_check_error_finding_nonexistent_uuid(self):
908         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
909         self.authorize_with('active')
910         try:
911             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
912                                                   0)
913         except ValueError as error:
914             self.assertIn(BAD_UUID, str(error))
915         else:
916             self.assertFalse(result, "incorrectly found nonexistent project")
917
918     def test_check_error_finding_nonexistent_project(self):
919         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
920         self.authorize_with('active')
921         with self.assertRaises(apiclient.errors.HttpError):
922             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
923                                                   0)
924
925     def test_short_put_from_stdin(self):
926         # Have to run this as an integration test since arv-put can't
927         # read from the tests' stdin.
928         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
929         # case, because the /proc entry is already gone by the time it tries.
930         pipe = subprocess.Popen(
931             [sys.executable, arv_put.__file__, '--stream'],
932             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
933             stderr=subprocess.STDOUT, env=self.ENVIRON)
934         pipe.stdin.write(b'stdin test\xa6\n')
935         pipe.stdin.close()
936         deadline = time.time() + 5
937         while (pipe.poll() is None) and (time.time() < deadline):
938             time.sleep(.1)
939         returncode = pipe.poll()
940         if returncode is None:
941             pipe.terminate()
942             self.fail("arv-put did not PUT from stdin within 5 seconds")
943         elif returncode != 0:
944             sys.stdout.write(pipe.stdout.read())
945             self.fail("arv-put returned exit code {}".format(returncode))
946         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
947                       pipe.stdout.read().decode())
948
949     def test_sigint_logs_request_id(self):
950         # Start arv-put, give it a chance to start up, send SIGINT,
951         # and check that its output includes the X-Request-Id.
952         input_stream = subprocess.Popen(
953             ['sleep', '10'],
954             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
955         pipe = subprocess.Popen(
956             [sys.executable, arv_put.__file__, '--stream'],
957             stdin=input_stream.stdout, stdout=subprocess.PIPE,
958             stderr=subprocess.STDOUT, env=self.ENVIRON)
959         # Wait for arv-put child process to print something (i.e., a
960         # log message) so we know its signal handler is installed.
961         select.select([pipe.stdout], [], [], 10)
962         pipe.send_signal(signal.SIGINT)
963         deadline = time.time() + 5
964         while (pipe.poll() is None) and (time.time() < deadline):
965             time.sleep(.1)
966         returncode = pipe.poll()
967         input_stream.terminate()
968         if returncode is None:
969             pipe.terminate()
970             self.fail("arv-put did not exit within 5 seconds")
971         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
972
973     def test_ArvPutSignedManifest(self):
974         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
975         # the newly created manifest from the API server, testing to confirm
976         # that the block locators in the returned manifest are signed.
977         self.authorize_with('active')
978
979         # Before doing anything, demonstrate that the collection
980         # we're about to create is not present in our test fixture.
981         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
982         with self.assertRaises(apiclient.errors.HttpError):
983             arv_put.api_client.collections().get(
984                 uuid=manifest_uuid).execute()
985
986         datadir = self.make_tmpdir()
987         with open(os.path.join(datadir, "foo"), "w") as f:
988             f.write("The quick brown fox jumped over the lazy dog")
989         p = subprocess.Popen([sys.executable, arv_put.__file__,
990                               os.path.join(datadir, 'foo')],
991                              stdout=subprocess.PIPE,
992                              stderr=subprocess.PIPE,
993                              env=self.ENVIRON)
994         (_, err) = p.communicate()
995         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
996         self.assertEqual(p.returncode, 0)
997
998         # The manifest text stored in the API server under the same
999         # manifest UUID must use signed locators.
1000         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
1001         self.assertRegex(
1002             c['manifest_text'],
1003             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1004
1005         os.remove(os.path.join(datadir, "foo"))
1006         os.rmdir(datadir)
1007
1008     def run_and_find_collection(self, text, extra_args=[]):
1009         self.authorize_with('active')
1010         pipe = subprocess.Popen(
1011             [sys.executable, arv_put.__file__] + extra_args,
1012             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1013             stderr=subprocess.PIPE, env=self.ENVIRON)
1014         stdout, stderr = pipe.communicate(text.encode())
1015         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1016         search_key = ('portable_data_hash'
1017                       if '--portable-data-hash' in extra_args else 'uuid')
1018         collection_list = arvados.api('v1').collections().list(
1019             filters=[[search_key, '=', stdout.decode().strip()]]
1020         ).execute().get('items', [])
1021         self.assertEqual(1, len(collection_list))
1022         return collection_list[0]
1023
1024     def test_all_expired_signatures_invalidates_cache(self):
1025         self.authorize_with('active')
1026         tmpdir = self.make_tmpdir()
1027         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1028             f.write('foo')
1029         # Upload a directory and get the cache file name
1030         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1031                              stdout=subprocess.PIPE,
1032                              stderr=subprocess.PIPE,
1033                              env=self.ENVIRON)
1034         (_, err) = p.communicate()
1035         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1036         self.assertEqual(p.returncode, 0)
1037         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1038                                    err.decode()).groups()[0]
1039         self.assertTrue(os.path.isfile(cache_filepath))
1040         # Load the cache file contents and modify the manifest to simulate
1041         # an expired access token
1042         with open(cache_filepath, 'r') as c:
1043             cache = json.load(c)
1044         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1045         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1046         cache['manifest'] = re.sub(
1047             r'\@.*? ',
1048             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1049             cache['manifest'])
1050         with open(cache_filepath, 'w') as c:
1051             c.write(json.dumps(cache))
1052         # Re-run the upload and expect to get an invalid cache message
1053         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1054                              stdout=subprocess.PIPE,
1055                              stderr=subprocess.PIPE,
1056                              env=self.ENVIRON)
1057         (_, err) = p.communicate()
1058         self.assertRegex(
1059             err.decode(),
1060             r'INFO: Cache expired, starting from scratch.*')
1061         self.assertEqual(p.returncode, 0)
1062
1063     def test_invalid_signature_in_cache(self):
1064         for batch_mode in [False, True]:
1065             self.authorize_with('active')
1066             tmpdir = self.make_tmpdir()
1067             with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1068                 f.write('foo')
1069             # Upload a directory and get the cache file name
1070             arv_put_args = [tmpdir]
1071             if batch_mode:
1072                 arv_put_args = ['--batch'] + arv_put_args
1073             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1074                                 stdout=subprocess.PIPE,
1075                                 stderr=subprocess.PIPE,
1076                                 env=self.ENVIRON)
1077             (_, err) = p.communicate()
1078             self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1079             self.assertEqual(p.returncode, 0)
1080             cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1081                                     err.decode()).groups()[0]
1082             self.assertTrue(os.path.isfile(cache_filepath))
1083             # Load the cache file contents and modify the manifest to simulate
1084             # an invalid access token
1085             with open(cache_filepath, 'r') as c:
1086                 cache = json.load(c)
1087             self.assertRegex(cache['manifest'], r'\+A\S+\@')
1088             cache['manifest'] = re.sub(
1089                 r'\+A.*\@',
1090                 "+Aabcdef0123456789abcdef0123456789abcdef01@",
1091                 cache['manifest'])
1092             with open(cache_filepath, 'w') as c:
1093                 c.write(json.dumps(cache))
1094             # Re-run the upload and expect to get an invalid cache message
1095             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1096                                 stdout=subprocess.PIPE,
1097                                 stderr=subprocess.PIPE,
1098                                 env=self.ENVIRON)
1099             (_, err) = p.communicate()
1100             if not batch_mode:
1101                 self.assertRegex(
1102                     err.decode(),
1103                     r'ERROR: arv-put: Resume cache contains invalid signature.*')
1104                 self.assertEqual(p.returncode, 1)
1105             else:
1106                 self.assertRegex(
1107                     err.decode(),
1108                     r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
1109                 self.assertEqual(p.returncode, 0)
1110
1111     def test_single_expired_signature_reuploads_file(self):
1112         self.authorize_with('active')
1113         tmpdir = self.make_tmpdir()
1114         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1115             f.write('foo')
1116         # Write a second file on its own subdir to force a new stream
1117         os.mkdir(os.path.join(tmpdir, 'bar'))
1118         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1119             f.write('bar')
1120         # Upload a directory and get the cache file name
1121         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1122                              stdout=subprocess.PIPE,
1123                              stderr=subprocess.PIPE,
1124                              env=self.ENVIRON)
1125         (_, err) = p.communicate()
1126         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1127         self.assertEqual(p.returncode, 0)
1128         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1129                                    err.decode()).groups()[0]
1130         self.assertTrue(os.path.isfile(cache_filepath))
1131         # Load the cache file contents and modify the manifest to simulate
1132         # an expired access token
1133         with open(cache_filepath, 'r') as c:
1134             cache = json.load(c)
1135         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1136         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1137         # Make one of the signatures appear to have expired
1138         cache['manifest'] = re.sub(
1139             r'\@.*? 3:3:barfile.txt',
1140             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1141             cache['manifest'])
1142         with open(cache_filepath, 'w') as c:
1143             c.write(json.dumps(cache))
1144         # Re-run the upload and expect to get an invalid cache message
1145         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1146                              stdout=subprocess.PIPE,
1147                              stderr=subprocess.PIPE,
1148                              env=self.ENVIRON)
1149         (_, err) = p.communicate()
1150         self.assertRegex(
1151             err.decode(),
1152             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1153         self.assertEqual(p.returncode, 0)
1154         # Confirm that the resulting cache is different from the last run.
1155         with open(cache_filepath, 'r') as c2:
1156             new_cache = json.load(c2)
1157         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1158
1159     def test_put_collection_with_later_update(self):
1160         tmpdir = self.make_tmpdir()
1161         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1162             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1163         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1164         self.assertNotEqual(None, col['uuid'])
1165         # Add a new file to the directory
1166         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1167             f.write('The quick brown fox jumped over the lazy dog')
1168         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1169         self.assertEqual(col['uuid'], updated_col['uuid'])
1170         # Get the manifest and check that the new file is being included
1171         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1172         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1173
1174     def test_put_collection_with_utc_expiring_datetime(self):
1175         tmpdir = self.make_tmpdir()
1176         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1177         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1178             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1179         col = self.run_and_find_collection(
1180             "",
1181             ['--no-progress', '--trash-at', trash_at, tmpdir])
1182         self.assertNotEqual(None, col['uuid'])
1183         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1184         self.assertEqual(ciso8601.parse_datetime(trash_at),
1185             ciso8601.parse_datetime(c['trash_at']))
1186
1187     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1188         tmpdir = self.make_tmpdir()
1189         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1190         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1191             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1192         col = self.run_and_find_collection(
1193             "",
1194             ['--no-progress', '--trash-at', trash_at, tmpdir])
1195         self.assertNotEqual(None, col['uuid'])
1196         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1197         self.assertEqual(
1198             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1199             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1200
1201     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1202         tmpdir = self.make_tmpdir()
1203         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1204         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1205             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1206         col = self.run_and_find_collection(
1207             "",
1208             ['--no-progress', '--trash-at', trash_at, tmpdir])
1209         self.assertNotEqual(None, col['uuid'])
1210         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1211         if time.daylight:
1212             offset = datetime.timedelta(seconds=time.altzone)
1213         else:
1214             offset = datetime.timedelta(seconds=time.timezone)
1215         self.assertEqual(
1216             ciso8601.parse_datetime(trash_at) + offset,
1217             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1218
1219     def test_put_collection_with_expiring_date_only(self):
1220         tmpdir = self.make_tmpdir()
1221         trash_at = '2140-01-01'
1222         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1223         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1224             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1225         col = self.run_and_find_collection(
1226             "",
1227             ['--no-progress', '--trash-at', trash_at, tmpdir])
1228         self.assertNotEqual(None, col['uuid'])
1229         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1230         if time.daylight:
1231             offset = datetime.timedelta(seconds=time.altzone)
1232         else:
1233             offset = datetime.timedelta(seconds=time.timezone)
1234         self.assertEqual(
1235             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1236             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1237
1238     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1239         cases = ['2100', '210010','2100-10', '2100-Oct']
1240         tmpdir = self.make_tmpdir()
1241         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1242             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1243         for test_datetime in cases:
1244             with self.assertRaises(AssertionError):
1245                 self.run_and_find_collection(
1246                     "",
1247                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1248
1249     def test_put_collection_with_relative_expiring_datetime(self):
1250         expire_after = 7
1251         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1252         tmpdir = self.make_tmpdir()
1253         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1254             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1255         col = self.run_and_find_collection(
1256             "",
1257             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1258         self.assertNotEqual(None, col['uuid'])
1259         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1260         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1261         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1262         self.assertTrue(dt_before < trash_at)
1263         self.assertTrue(dt_after > trash_at)
1264
1265     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1266         expire_after = 0 # Must be >= 1
1267         tmpdir = self.make_tmpdir()
1268         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1269             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1270         with self.assertRaises(AssertionError):
1271             self.run_and_find_collection(
1272                 "",
1273                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1274
1275     def test_upload_directory_reference_without_trailing_slash(self):
1276         tmpdir1 = self.make_tmpdir()
1277         tmpdir2 = self.make_tmpdir()
1278         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1279             f.write('This is foo')
1280         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1281             f.write('This is not foo')
1282         # Upload one directory and one file
1283         col = self.run_and_find_collection("", ['--no-progress',
1284                                                 tmpdir1,
1285                                                 os.path.join(tmpdir2, 'bar')])
1286         self.assertNotEqual(None, col['uuid'])
1287         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1288         # Check that 'foo' was written inside a subcollection
1289         # OTOH, 'bar' should have been directly uploaded on the root collection
1290         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1291
1292     def test_upload_directory_reference_with_trailing_slash(self):
1293         tmpdir1 = self.make_tmpdir()
1294         tmpdir2 = self.make_tmpdir()
1295         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1296             f.write('This is foo')
1297         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1298             f.write('This is not foo')
1299         # Upload one directory (with trailing slash) and one file
1300         col = self.run_and_find_collection("", ['--no-progress',
1301                                                 tmpdir1 + os.sep,
1302                                                 os.path.join(tmpdir2, 'bar')])
1303         self.assertNotEqual(None, col['uuid'])
1304         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1305         # Check that 'foo' and 'bar' were written at the same level
1306         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1307
1308     def test_put_collection_with_high_redundancy(self):
1309         # Write empty data: we're not testing CollectionWriter, just
1310         # making sure collections.create tells the API server what our
1311         # desired replication level is.
1312         collection = self.run_and_find_collection("", ['--replication', '4'])
1313         self.assertEqual(4, collection['replication_desired'])
1314
1315     def test_put_collection_with_default_redundancy(self):
1316         collection = self.run_and_find_collection("")
1317         self.assertEqual(None, collection['replication_desired'])
1318
1319     def test_put_collection_with_unnamed_project_link(self):
1320         link = self.run_and_find_collection(
1321             "Test unnamed collection",
1322             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1323         username = pwd.getpwuid(os.getuid()).pw_name
1324         self.assertRegex(
1325             link['name'],
1326             r'^Saved at .* by {}@'.format(re.escape(username)))
1327
1328     def test_put_collection_with_name_and_no_project(self):
1329         link_name = 'Test Collection Link in home project'
1330         collection = self.run_and_find_collection(
1331             "Test named collection in home project",
1332             ['--portable-data-hash', '--name', link_name])
1333         self.assertEqual(link_name, collection['name'])
1334         my_user_uuid = self.current_user()['uuid']
1335         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1336
1337     def test_put_collection_with_named_project_link(self):
1338         link_name = 'Test auto Collection Link'
1339         collection = self.run_and_find_collection("Test named collection",
1340                                       ['--portable-data-hash',
1341                                        '--name', link_name,
1342                                        '--project-uuid', self.PROJECT_UUID])
1343         self.assertEqual(link_name, collection['name'])
1344
1345     def test_put_collection_with_storage_classes_specified(self):
1346         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1347
1348         self.assertEqual(len(collection['storage_classes_desired']), 1)
1349         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1350
1351     def test_put_collection_without_storage_classes_specified(self):
1352         collection = self.run_and_find_collection("")
1353
1354         self.assertEqual(len(collection['storage_classes_desired']), 1)
1355         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1356
1357     def test_exclude_filename_pattern(self):
1358         tmpdir = self.make_tmpdir()
1359         tmpsubdir = os.path.join(tmpdir, 'subdir')
1360         os.mkdir(tmpsubdir)
1361         for fname in ['file1', 'file2', 'file3']:
1362             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1363                 f.write("This is %s" % fname)
1364             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1365                 f.write("This is %s" % fname)
1366         col = self.run_and_find_collection("", ['--no-progress',
1367                                                 '--exclude', '*2.txt',
1368                                                 '--exclude', 'file3.*',
1369                                                  tmpdir])
1370         self.assertNotEqual(None, col['uuid'])
1371         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1372         # None of the file2.txt & file3.txt should have been uploaded
1373         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1374         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1375         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1376
1377     def test_exclude_filepath_pattern(self):
1378         tmpdir = self.make_tmpdir()
1379         tmpsubdir = os.path.join(tmpdir, 'subdir')
1380         os.mkdir(tmpsubdir)
1381         for fname in ['file1', 'file2', 'file3']:
1382             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1383                 f.write("This is %s" % fname)
1384             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1385                 f.write("This is %s" % fname)
1386         col = self.run_and_find_collection("", ['--no-progress',
1387                                                 '--exclude', 'subdir/*2.txt',
1388                                                 '--exclude', './file1.*',
1389                                                  tmpdir])
1390         self.assertNotEqual(None, col['uuid'])
1391         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1392         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1393         self.assertNotRegex(c['manifest_text'],
1394                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1395         self.assertNotRegex(c['manifest_text'],
1396                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1397         self.assertRegex(c['manifest_text'],
1398                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1399         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1400
1401     def test_unicode_on_filename(self):
1402         tmpdir = self.make_tmpdir()
1403         fname = u"iā¤arvados.txt"
1404         with open(os.path.join(tmpdir, fname), 'w') as f:
1405             f.write("This is a unicode named file")
1406         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1407         self.assertNotEqual(None, col['uuid'])
1408         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1409         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1410
1411     def test_silent_mode_no_errors(self):
1412         self.authorize_with('active')
1413         tmpdir = self.make_tmpdir()
1414         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1415             f.write('hello world')
1416         pipe = subprocess.Popen(
1417             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1418             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1419             stderr=subprocess.PIPE, env=self.ENVIRON)
1420         stdout, stderr = pipe.communicate()
1421         # No console output should occur on normal operations
1422         self.assertNotRegex(stderr.decode(), r'.+')
1423         self.assertNotRegex(stdout.decode(), r'.+')
1424
1425     def test_silent_mode_does_not_avoid_error_messages(self):
1426         self.authorize_with('active')
1427         pipe = subprocess.Popen(
1428             [sys.executable, arv_put.__file__] + ['--silent',
1429                                                   '/path/not/existant'],
1430             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1431             stderr=subprocess.PIPE, env=self.ENVIRON)
1432         stdout, stderr = pipe.communicate()
1433         # Error message should be displayed when errors happen
1434         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1435         self.assertNotRegex(stdout.decode(), r'.+')
1436
1437
1438 if __name__ == '__main__':
1439     unittest.main()