import argparse
import arvados
+import arvados.collection
import base64
import datetime
import errno
args = arg_parser.parse_args(arguments)
if len(args.paths) == 0:
- args.paths += ['/dev/stdin']
+ args.paths = ['-']
+
+ args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
if args.filename:
args.progress = True
if args.paths == ['-']:
- args.paths = ['/dev/stdin']
+ args.resume = False
if not args.filename:
- args.filename = '-'
+ args.filename = 'stdin'
return args
class ResumeCache(object):
CACHE_DIR = '.cache/arvados/arv-put'
- def __init__(self, file_spec):
+ def __init__(self, file_spec, api_client=None, num_retries=0):
self.cache_file = open(file_spec, 'a+')
self._lock_file(self.cache_file)
self.filename = self.cache_file.name
+ try:
+ state = self.load()
+ locator = None
+ try:
+ if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
+ locator = state["_finished_streams"][0][1][0]
+ elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
+ locator = state["_current_stream_locators"][0]
+ if locator is not None:
+ kc = arvados.keep.KeepClient(api_client=api_client)
+ kc.head(locator, num_retries=num_retries)
+ except Exception as e:
+ raise arvados.errors.KeepRequestError("Head request error for {}: {}".format(locator, e))
+ except (ValueError):
+ pass
@classmethod
def make_path(cls, args):
replication=replication)
except (TypeError, ValueError,
arvados.errors.StaleWriterStateError) as error:
- return cls(cache, reporter, bytes_expected, num_retries=num_retries)
+ return cls(cache, reporter, bytes_expected,
+ num_retries=num_retries,
+ replication=replication)
else:
return writer
resume_cache = None
if args.resume:
try:
- resume_cache = ResumeCache(ResumeCache.make_path(args))
+ cachepath = ResumeCache.make_path(args)
+ resume_cache = ResumeCache(cachepath, api_client=api_client, num_retries=args.retries)
except (IOError, OSError, ValueError):
pass # Couldn't open cache directory/file. Continue without it.
+ except arvados.errors.KeepRequestError:
+ # delete the cache and create a new one
+ shutil.rmtree(cachepath)
+ resume_cache = ResumeCache(cachepath)
except ResumeCacheConflict:
print >>stderr, "\n".join([
"arv-put: Another process is already uploading this data.",
writer.report_progress()
writer.do_queued_work() # Do work resumed from cache.
for path in args.paths: # Copy file data to Keep.
- if os.path.isdir(path):
+ if path == '-':
+ writer.start_new_stream()
+ writer.start_new_file(args.filename)
+ r = sys.stdin.read(64*1024)
+ while r:
+ # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
+ # CollectionWriter.write().
+ super(arvados.collection.ResumableCollectionWriter, writer).write(r)
+ r = sys.stdin.read(64*1024)
+ elif os.path.isdir(path):
writer.write_directory_tree(
path, max_manifest_depth=args.max_manifest_depth)
else:
if args.stream:
output = writer.manifest_text()
if args.normalize:
- output = CollectionReader(output).manifest_text(normalize=True)
+ output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
elif args.raw:
output = ','.join(writer.data_locators())
else:
try:
manifest_text = writer.manifest_text()
if args.normalize:
- manifest_text = CollectionReader(manifest_text).manifest_text(normalize=True)
+ manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
replication_attr = 'replication_desired'
if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
# API called it 'redundancy' before #3410.