.collection_files_inline {
clear: both;
width: 80%;
- height: auto;
- max-height: 6em;
- margin: 0 1em;
+ margin: 0 3em;
+}
+
+.collection_files_inline img {
+ max-height: 15em;
}
.collection_files_name {
self.response_body = file_enumerator opts
end
+ def sharing_scopes
+ ["GET /arvados/v1/collections/#{@object.uuid}", "GET /arvados/v1/collections/#{@object.uuid}/", "GET /arvados/v1/keep_services/accessible"]
+ end
+
+ def search_scopes
+ begin
+ ApiClientAuthorization.filter([['scopes', '=', sharing_scopes]]).results
+ rescue ArvadosApiClient::AccessForbiddenException
+ nil
+ end
+ end
+
def show
return super if !@object
if current_user
.where(head_uuid: @object.uuid, tail_uuid: current_user.uuid,
link_class: 'resources', name: 'wants')
.results.any?
+ @search_sharing = search_scopes
end
@prov_svg = ProvenanceHelper::create_provenance_graph(@object.provenance, "provenance_svg",
{:request => request,
:pdata_only => true}) rescue nil
end
+ def sharing_popup
+ @search_sharing = search_scopes
+ respond_to do |format|
+ format.html
+ format.js
+ end
+ end
+
+ helper_method :download_link
+
+ def download_link
+ collections_url + "/download/#{@object.uuid}/#{@search_sharing.first.api_token}"
+ end
+
+ def share
+ a = ApiClientAuthorization.create(scopes: sharing_scopes)
+ @search_sharing = search_scopes
+ render 'sharing_popup'
+ end
+
+ def unshare
+ @search_sharing = search_scopes
+ @search_sharing.each do |s|
+ s.destroy
+ end
+ @search_sharing = search_scopes
+ render 'sharing_popup'
+ end
+
protected
def find_usable_token(token_list)
end
class InvalidApiResponseException < StandardError
end
+ class AccessForbiddenException < StandardError
+ end
@@profiling_enabled = Rails.configuration.profiling_enabled
@@discovery = nil
if msg.status_code != 200
errors = resp[:errors]
errors = errors.join("\n\n") if errors.is_a? Array
- raise "#{errors} [API: #{msg.status_code}]"
+ if msg.status_code == 403
+ raise AccessForbiddenException.new "#{errors} [API: #{msg.status_code}]"
+ else
+ raise "#{errors} [API: #{msg.status_code}]"
+ end
end
if resp[:_profile]
Rails.logger.info "API client: " \
--- /dev/null
+<%# a nil @search_sharing means we got an AccessForbiddenException and should
+disable this feature entirely. %>
+<% if @search_sharing != nil %>
+ <% if @search_sharing.any? %>
+ <div>Shared at:
+ <span class="pull-right">
+ <%= link_to "Unshare", unshare_collection_url, {
+ class: 'btn-xs btn-info',
+ remote: true,
+ method: 'post'
+ } %></span>
+ <div class="smaller-text" style="word-break: break-all"><%= link_to download_link, download_link %></div>
+ </div>
+ <% else %>
+ <%= link_to "Create sharing link", share_collection_url, {
+ class: 'btn-xs btn-info',
+ remote: true,
+ method: 'post'
+ } %>
+ <% end %>
+<% end %>
<div class="col-md-6"></div>
<div class="col-md-6">
<div class="pull-right">
- Collection storage status:
+ <span style="padding-left: 1em">Collection storage status:</span>
<%= render partial: 'toggle_persist', locals: { uuid: @object.uuid, current_state: (@is_persistent ? 'persistent' : 'cache') } %>
+
</div>
</div>
</div>
<% end # file_tree.each %>
<%= raw(dirstack.map { |_| "</ul>" }.join("</li>")) %>
<% end # if file_tree %>
+
+<% content_for :footer_html do %>
+<div id="collection-sharing-modal-window" class="modal fade" role="dialog" aria-labelledby="myModalLabel" aria-hidden="true"></div>
+<% end %>
--- /dev/null
+$("#sharing-button").html("<%= escape_javascript(render partial: 'sharing_button') %>");
<%= render_arvados_object_list_start(@logs, 'Show all activity',
logs_path(filters: [['object_uuid','=',@object.uuid]].to_json)) do |log| %>
<p>
- <%= time_ago_in_words(log.event_at) %> ago: <%= log.summary %>
+ <%= time_ago_in_words(log.event_at) rescue 'unknown time' %> ago: <%= log.summary %>
<% if log.object_uuid %>
<%= link_to_if_arvados_object log.object_uuid, link_text: raw('<i class="fa fa-hand-o-right"></i>') %>
<% end %>
<!--
<input type="text" class="form-control" placeholder="Search"/>
-->
+
+ <div id="sharing-button">
+ <%= render partial: 'sharing_button' %>
+ </div>
+
<div style="height:0.5em;"></div>
<% if @folders.andand.any? %>
<p>Included in folders:<br />
<% end %>
</p>
<% end %>
+
</div>
</div>
</div>
<% content_for :tab_line_buttons do %>
<%= form_tag '/pipeline_instances' do |f| %>
<%= hidden_field :pipeline_instance, :pipeline_template_uuid, :value => @object.uuid %>
- <%= button_tag "Create pipeline", {class: 'btn btn-primary pull-right', id: "run-pipeline-button"} %>
+ <%= button_tag "Run this pipeline", {class: 'btn btn-primary pull-right', id: "run-pipeline-button"} %>
<% end %>
<% end %>
get '/collections/graph' => 'collections#graph'
resources :collections do
post 'set_persistent', on: :member
+ get 'sharing_popup', :on => :member
+ post 'share', :on => :member
+ post 'unshare', :on => :member
end
get('/collections/download/:uuid/:reader_token/*file' => 'collections#show_file',
format: false)
func pythonDir() string {
gopath := os.Getenv("GOPATH")
- return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
+ return fmt.Sprintf("%s/../python/tests", strings.Split(gopath, ":")[0])
}
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
}
}
+ // 'defer' is a stack, so it will drain the Body before closing it.
+ defer resp.Body.Close()
+ defer io.Copy(ioutil.Discard, resp.Body)
+
type svcList struct {
Items []keepDisk `json:"items"`
}
fmt.Sscanf(xr, "%d", &rep)
}
+ defer resp.Body.Close()
+ defer io.Copy(ioutil.Discard, resp.Body)
+
respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
if err2 != nil && err2 != io.EOF {
upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
import time
import threading
+from collections import deque
+from stat import *
+
from keep import *
from stream import *
import config
stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
if len(stream[f]) == 0:
- stream_tokens.append("0:0:{0}".format(fout))
+ stream_tokens.append("0:0:{0}".format(fout))
- return stream_tokens
+ return stream_tokens
def normalize(collection):
streams = {}
self._current_file_name = None
self._current_file_pos = 0
self._finished_streams = []
+ self._close_file = None
+ self._queued_file = None
+ self._queued_dirents = deque()
+ self._queued_trees = deque()
def __enter__(self):
pass
def __exit__(self):
self.finish()
- def write_directory_tree(self,
- path, stream_name='.', max_manifest_depth=-1):
- self.start_new_stream(stream_name)
- todo = []
- if max_manifest_depth == 0:
- dirents = sorted(util.listdir_recursive(path))
- else:
- dirents = sorted(os.listdir(path))
- for dirent in dirents:
+ def do_queued_work(self):
+ # The work queue consists of three pieces:
+ # * _queued_file: The file object we're currently writing to the
+ # Collection.
+ # * _queued_dirents: Entries under the current directory
+ # (_queued_trees[0]) that we want to write or recurse through.
+ # This may contain files from subdirectories if
+ # max_manifest_depth == 0 for this directory.
+ # * _queued_trees: Directories that should be written as separate
+ # streams to the Collection.
+ # This function handles the smallest piece of work currently queued
+ # (current file, then current directory, then next directory) until
+ # no work remains. The _work_THING methods each do a unit of work on
+ # THING. _queue_THING methods add a THING to the work queue.
+ while True:
+ if self._queued_file:
+ self._work_file()
+ elif self._queued_dirents:
+ self._work_dirents()
+ elif self._queued_trees:
+ self._work_trees()
+ else:
+ break
+
+ def _work_file(self):
+ while True:
+ buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
+ if not buf:
+ break
+ self.write(buf)
+ self.finish_current_file()
+ if self._close_file:
+ self._queued_file.close()
+ self._close_file = None
+ self._queued_file = None
+
+ def _work_dirents(self):
+ path, stream_name, max_manifest_depth = self._queued_trees[0]
+ if stream_name != self.current_stream_name():
+ self.start_new_stream(stream_name)
+ while self._queued_dirents:
+ dirent = self._queued_dirents.popleft()
target = os.path.join(path, dirent)
if os.path.isdir(target):
- todo += [[target,
- os.path.join(stream_name, dirent),
- max_manifest_depth-1]]
+ self._queue_tree(target,
+ os.path.join(stream_name, dirent),
+ max_manifest_depth - 1)
else:
- self.start_new_file(dirent)
- with open(target, 'rb') as f:
- while True:
- buf = f.read(2**26)
- if len(buf) == 0:
- break
- self.write(buf)
- self.finish_current_stream()
- map(lambda x: self.write_directory_tree(*x), todo)
+ self._queue_file(target, dirent)
+ break
+ if not self._queued_dirents:
+ self._queued_trees.popleft()
+
+ def _work_trees(self):
+ path, stream_name, max_manifest_depth = self._queued_trees[0]
+ make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
+ else os.listdir)
+ self._queue_dirents(stream_name, make_dirents(path))
+
+ def _queue_file(self, source, filename=None):
+ assert (self._queued_file is None), "tried to queue more than one file"
+ if not hasattr(source, 'read'):
+ source = open(source, 'rb')
+ self._close_file = True
+ else:
+ self._close_file = False
+ if filename is None:
+ filename = os.path.basename(source.name)
+ self.start_new_file(filename)
+ self._queued_file = source
+
+ def _queue_dirents(self, stream_name, dirents):
+ assert (not self._queued_dirents), "tried to queue more than one tree"
+ self._queued_dirents = deque(sorted(dirents))
+
+ def _queue_tree(self, path, stream_name, max_manifest_depth):
+ self._queued_trees.append((path, stream_name, max_manifest_depth))
+
+ def write_file(self, source, filename=None):
+ self._queue_file(source, filename)
+ self.do_queued_work()
+
+ def write_directory_tree(self,
+ path, stream_name='.', max_manifest_depth=-1):
+ self._queue_tree(path, stream_name, max_manifest_depth)
+ self.do_queued_work()
def write(self, newdata):
if hasattr(newdata, '__iter__'):
manifest += ' ' + ' '.join(stream[1])
manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
manifest += "\n"
-
+
#print 'writer',manifest
#print 'after reader',CollectionReader(manifest).manifest_text()
for name, locators, files in self._finished_streams:
ret += locators
return ret
+
+
+class ResumableCollectionWriter(CollectionWriter):
+ STATE_PROPS = ['_current_stream_files', '_current_stream_length',
+ '_current_stream_locators', '_current_stream_name',
+ '_current_file_name', '_current_file_pos', '_close_file',
+ '_data_buffer', '_dependencies', '_finished_streams',
+ '_queued_dirents', '_queued_trees']
+
+ def __init__(self):
+ self._dependencies = {}
+ super(ResumableCollectionWriter, self).__init__()
+
+ @classmethod
+ def from_state(cls, state, *init_args, **init_kwargs):
+ # Try to build a new writer from scratch with the given state.
+ # If the state is not suitable to resume (because files have changed,
+ # been deleted, aren't predictable, etc.), raise a
+ # StaleWriterStateError. Otherwise, return the initialized writer.
+ # The caller is responsible for calling writer.do_queued_work()
+ # appropriately after it's returned.
+ writer = cls(*init_args, **init_kwargs)
+ for attr_name in cls.STATE_PROPS:
+ attr_value = state[attr_name]
+ attr_class = getattr(writer, attr_name).__class__
+ # Coerce the value into the same type as the initial value, if
+ # needed.
+ if attr_class not in (type(None), attr_value.__class__):
+ attr_value = attr_class(attr_value)
+ setattr(writer, attr_name, attr_value)
+ # Check dependencies before we try to resume anything.
+ if any(KeepLocator(ls).permission_expired()
+ for ls in writer._current_stream_locators):
+ raise errors.StaleWriterStateError(
+ "locators include expired permission hint")
+ writer.check_dependencies()
+ if state['_current_file'] is not None:
+ path, pos = state['_current_file']
+ try:
+ writer._queued_file = open(path, 'rb')
+ writer._queued_file.seek(pos)
+ except IOError as error:
+ raise errors.StaleWriterStateError(
+ "failed to reopen active file {}: {}".format(path, error))
+ return writer
+
+ def check_dependencies(self):
+ for path, orig_stat in self._dependencies.items():
+ if not S_ISREG(orig_stat[ST_MODE]):
+ raise errors.StaleWriterStateError("{} not file".format(path))
+ try:
+ now_stat = tuple(os.stat(path))
+ except OSError as error:
+ raise errors.StaleWriterStateError(
+ "failed to stat {}: {}".format(path, error))
+ if ((not S_ISREG(now_stat[ST_MODE])) or
+ (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
+ (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
+ raise errors.StaleWriterStateError("{} changed".format(path))
+
+ def dump_state(self, copy_func=lambda x: x):
+ state = {attr: copy_func(getattr(self, attr))
+ for attr in self.STATE_PROPS}
+ if self._queued_file is None:
+ state['_current_file'] = None
+ else:
+ state['_current_file'] = (os.path.realpath(self._queued_file.name),
+ self._queued_file.tell())
+ return state
+
+ def _queue_file(self, source, filename=None):
+ try:
+ src_path = os.path.realpath(source)
+ except Exception:
+ raise errors.AssertionError("{} not a file path".format(source))
+ try:
+ path_stat = os.stat(src_path)
+ except OSError as stat_error:
+ path_stat = None
+ super(ResumableCollectionWriter, self)._queue_file(source, filename)
+ fd_stat = os.fstat(self._queued_file.fileno())
+ if not S_ISREG(fd_stat.st_mode):
+ # We won't be able to resume from this cache anyway, so don't
+ # worry about further checks.
+ self._dependencies[source] = tuple(fd_stat)
+ elif path_stat is None:
+ raise errors.AssertionError(
+ "could not stat {}: {}".format(source, stat_error))
+ elif path_stat.st_ino != fd_stat.st_ino:
+ raise errors.AssertionError(
+ "{} changed between open and stat calls".format(source))
+ else:
+ self._dependencies[src_path] = tuple(fd_stat)
+
+ def write(self, data):
+ if self._queued_file is None:
+ raise errors.AssertionError(
+ "resumable writer can't accept unsourced data")
+ return super(ResumableCollectionWriter, self).write(data)
--- /dev/null
+#!/usr/bin/env python
+
+# TODO:
+# --md5sum - display md5 of each file as read from disk
+
+import argparse
+import arvados
+import base64
+import errno
+import fcntl
+import hashlib
+import json
+import os
+import signal
+import sys
+import tempfile
+
+CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
+
+def parse_arguments(arguments):
+ parser = argparse.ArgumentParser(
+ description='Copy data from the local filesystem to Keep.')
+
+ parser.add_argument('paths', metavar='path', type=str, nargs='*',
+ help="""
+ Local file or directory. Default: read from standard input.
+ """)
+
+ parser.add_argument('--max-manifest-depth', type=int, metavar='N',
+ default=-1, help="""
+ Maximum depth of directory tree to represent in the manifest
+ structure. A directory structure deeper than this will be represented
+ as a single stream in the manifest. If N=0, the manifest will contain
+ a single stream. Default: -1 (unlimited), i.e., exactly one manifest
+ stream per filesystem directory that contains files.
+ """)
+
+ group = parser.add_mutually_exclusive_group()
+
+ group.add_argument('--as-stream', action='store_true', dest='stream',
+ help="""
+ Synonym for --stream.
+ """)
+
+ group.add_argument('--stream', action='store_true',
+ help="""
+ Store the file content and display the resulting manifest on
+ stdout. Do not write the manifest to Keep or save a Collection object
+ in Arvados.
+ """)
+
+ group.add_argument('--as-manifest', action='store_true', dest='manifest',
+ help="""
+ Synonym for --manifest.
+ """)
+
+ group.add_argument('--in-manifest', action='store_true', dest='manifest',
+ help="""
+ Synonym for --manifest.
+ """)
+
+ group.add_argument('--manifest', action='store_true',
+ help="""
+ Store the file data and resulting manifest in Keep, save a Collection
+ object in Arvados, and display the manifest locator (Collection uuid)
+ on stdout. This is the default behavior.
+ """)
+
+ group.add_argument('--as-raw', action='store_true', dest='raw',
+ help="""
+ Synonym for --raw.
+ """)
+
+ group.add_argument('--raw', action='store_true',
+ help="""
+ Store the file content and display the data block locators on stdout,
+ separated by commas, with a trailing newline. Do not store a
+ manifest.
+ """)
+
+ parser.add_argument('--use-filename', type=str, default=None,
+ dest='filename', help="""
+ Synonym for --filename.
+ """)
+
+ parser.add_argument('--filename', type=str, default=None,
+ help="""
+ Use the given filename in the manifest, instead of the name of the
+ local file. This is useful when "-" or "/dev/stdin" is given as an
+ input file. It can be used only if there is exactly one path given and
+ it is not a directory. Implies --manifest.
+ """)
+
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument('--progress', action='store_true',
+ help="""
+ Display human-readable progress on stderr (bytes and, if possible,
+ percentage of total data size). This is the default behavior when
+ stderr is a tty.
+ """)
+
+ group.add_argument('--no-progress', action='store_true',
+ help="""
+ Do not display human-readable progress on stderr, even if stderr is a
+ tty.
+ """)
+
+ group.add_argument('--batch-progress', action='store_true',
+ help="""
+ Display machine-readable progress on stderr (bytes and, if known,
+ total data size).
+ """)
+
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument('--resume', action='store_true', default=True,
+ help="""
+ Continue interrupted uploads from cached state (default).
+ """)
+ group.add_argument('--no-resume', action='store_false', dest='resume',
+ help="""
+ Do not continue interrupted uploads from cached state.
+ """)
+
+ args = parser.parse_args(arguments)
+
+ if len(args.paths) == 0:
+ args.paths += ['/dev/stdin']
+
+ if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
+ if args.filename:
+ parser.error("""
+ --filename argument cannot be used when storing a directory or
+ multiple files.
+ """)
+
+ # Turn on --progress by default if stderr is a tty.
+ if (not (args.batch_progress or args.no_progress)
+ and os.isatty(sys.stderr.fileno())):
+ args.progress = True
+
+ if args.paths == ['-']:
+ args.paths = ['/dev/stdin']
+ if not args.filename:
+ args.filename = '-'
+
+ return args
+
+class ResumeCacheConflict(Exception):
+ pass
+
+
+class ResumeCache(object):
+ CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
+
+ @classmethod
+ def setup_user_cache(cls):
+ try:
+ os.makedirs(cls.CACHE_DIR)
+ except OSError as error:
+ if error.errno != errno.EEXIST:
+ raise
+ else:
+ os.chmod(cls.CACHE_DIR, 0o700)
+
+ def __init__(self, file_spec):
+ self.cache_file = open(file_spec, 'a+')
+ self._lock_file(self.cache_file)
+ self.filename = self.cache_file.name
+
+ @classmethod
+ def make_path(cls, args):
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ realpaths = sorted(os.path.realpath(path) for path in args.paths)
+ md5.update('\0'.join(realpaths))
+ if any(os.path.isdir(path) for path in realpaths):
+ md5.update(str(max(args.max_manifest_depth, -1)))
+ elif args.filename:
+ md5.update(args.filename)
+ return os.path.join(cls.CACHE_DIR, md5.hexdigest())
+
+ def _lock_file(self, fileobj):
+ try:
+ fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+ def load(self):
+ self.cache_file.seek(0)
+ return json.load(self.cache_file)
+
+ def save(self, data):
+ try:
+ new_cache_fd, new_cache_name = tempfile.mkstemp(
+ dir=os.path.dirname(self.filename))
+ self._lock_file(new_cache_fd)
+ new_cache = os.fdopen(new_cache_fd, 'r+')
+ json.dump(data, new_cache)
+ os.rename(new_cache_name, self.filename)
+ except (IOError, OSError, ResumeCacheConflict) as error:
+ try:
+ os.unlink(new_cache_name)
+ except NameError: # mkstemp failed.
+ pass
+ else:
+ self.cache_file.close()
+ self.cache_file = new_cache
+
+ def close(self):
+ self.cache_file.close()
+
+ def destroy(self):
+ try:
+ os.unlink(self.filename)
+ except OSError as error:
+ if error.errno != errno.ENOENT: # That's what we wanted anyway.
+ raise
+ self.close()
+
+ def restart(self):
+ self.destroy()
+ self.__init__(self.filename)
+
+
+class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
+ STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
+ ['bytes_written', '_seen_inputs'])
+
+ def __init__(self, cache=None, reporter=None, bytes_expected=None):
+ self.bytes_written = 0
+ self._seen_inputs = []
+ self.cache = cache
+ self.reporter = reporter
+ self.bytes_expected = bytes_expected
+ super(ArvPutCollectionWriter, self).__init__()
+
+ @classmethod
+ def from_cache(cls, cache, reporter=None, bytes_expected=None):
+ try:
+ state = cache.load()
+ state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
+ writer = cls.from_state(state, cache, reporter, bytes_expected)
+ except (TypeError, ValueError,
+ arvados.errors.StaleWriterStateError) as error:
+ return cls(cache, reporter, bytes_expected)
+ else:
+ return writer
+
+ def cache_state(self):
+ if self.cache is None:
+ return
+ state = self.dump_state()
+ # Transform attributes for serialization.
+ for attr, value in state.items():
+ if attr == '_data_buffer':
+ state[attr] = base64.encodestring(''.join(value))
+ elif hasattr(value, 'popleft'):
+ state[attr] = list(value)
+ self.cache.save(state)
+
+ def report_progress(self):
+ if self.reporter is not None:
+ self.reporter(self.bytes_written, self.bytes_expected)
+
+ def flush_data(self):
+ start_buffer_len = self._data_buffer_len
+ start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
+ super(ArvPutCollectionWriter, self).flush_data()
+ if self._data_buffer_len < start_buffer_len: # We actually PUT data.
+ self.bytes_written += (start_buffer_len - self._data_buffer_len)
+ self.report_progress()
+ if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+ self.cache_state()
+
+ def _record_new_input(self, input_type, source_name, dest_name):
+ # The key needs to be a list because that's what we'll get back
+ # from JSON deserialization.
+ key = [input_type, source_name, dest_name]
+ if key in self._seen_inputs:
+ return False
+ self._seen_inputs.append(key)
+ return True
+
+ def write_file(self, source, filename=None):
+ if self._record_new_input('file', source, filename):
+ super(ArvPutCollectionWriter, self).write_file(source, filename)
+
+ def write_directory_tree(self,
+ path, stream_name='.', max_manifest_depth=-1):
+ if self._record_new_input('directory', path, stream_name):
+ super(ArvPutCollectionWriter, self).write_directory_tree(
+ path, stream_name, max_manifest_depth)
+
+
+def expected_bytes_for(pathlist):
+ # Walk the given directory trees and stat files, adding up file sizes,
+ # so we can display progress as percent
+ bytesum = 0
+ for path in pathlist:
+ if os.path.isdir(path):
+ for filename in arvados.util.listdir_recursive(path):
+ bytesum += os.path.getsize(os.path.join(path, filename))
+ elif not os.path.isfile(path):
+ return None
+ else:
+ bytesum += os.path.getsize(path)
+ return bytesum
+
+_machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
+ os.getpid())
+def machine_progress(bytes_written, bytes_expected):
+ return _machine_format.format(
+ bytes_written, -1 if (bytes_expected is None) else bytes_expected)
+
+def human_progress(bytes_written, bytes_expected):
+ if bytes_expected:
+ return "\r{}M / {}M {:.1%} ".format(
+ bytes_written >> 20, bytes_expected >> 20,
+ float(bytes_written) / bytes_expected)
+ else:
+ return "\r{} ".format(bytes_written)
+
+def progress_writer(progress_func, outfile=sys.stderr):
+ def write_progress(bytes_written, bytes_expected):
+ outfile.write(progress_func(bytes_written, bytes_expected))
+ return write_progress
+
+def exit_signal_handler(sigcode, frame):
+ sys.exit(-sigcode)
+
+def main(arguments=None):
+ ResumeCache.setup_user_cache()
+ args = parse_arguments(arguments)
+
+ if args.progress:
+ reporter = progress_writer(human_progress)
+ elif args.batch_progress:
+ reporter = progress_writer(machine_progress)
+ else:
+ reporter = None
+
+ try:
+ resume_cache = ResumeCache(ResumeCache.make_path(args))
+ if not args.resume:
+ resume_cache.restart()
+ except ResumeCacheConflict:
+ print "arv-put: Another process is already uploading this data."
+ sys.exit(1)
+
+ writer = ArvPutCollectionWriter.from_cache(
+ resume_cache, reporter, expected_bytes_for(args.paths))
+
+ # Install our signal handler for each code in CAUGHT_SIGNALS, and save
+ # the originals.
+ orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
+ for sigcode in CAUGHT_SIGNALS}
+
+ if writer.bytes_written > 0: # We're resuming a previous upload.
+ print >>sys.stderr, "\n".join([
+ "arv-put: Resuming previous upload from last checkpoint.",
+ " Use the --no-resume option to start over."])
+ writer.report_progress()
+
+ writer.do_queued_work() # Do work resumed from cache.
+ for path in args.paths: # Copy file data to Keep.
+ if os.path.isdir(path):
+ writer.write_directory_tree(
+ path, max_manifest_depth=args.max_manifest_depth)
+ else:
+ writer.start_new_stream()
+ writer.write_file(path, args.filename or os.path.basename(path))
+ writer.finish_current_stream()
+
+ if args.progress: # Print newline to split stderr from stdout for humans.
+ print >>sys.stderr
+
+ if args.stream:
+ print writer.manifest_text(),
+ elif args.raw:
+ print ','.join(writer.data_locators())
+ else:
+ # Register the resulting collection in Arvados.
+ collection = arvados.api().collections().create(
+ body={
+ 'uuid': writer.finish(),
+ 'manifest_text': writer.manifest_text(),
+ },
+ ).execute()
+
+ # Print the locator (uuid) of the new collection.
+ print collection['uuid']
+
+ for sigcode, orig_handler in orig_signal_handlers.items():
+ signal.signal(sigcode, orig_handler)
+
+ resume_cache.destroy()
+
+if __name__ == '__main__':
+ main()
pass
class NoKeepServersError(Exception):
pass
+class StaleWriterStateError(Exception):
+ pass
import time
import threading
import timer
+import datetime
global_client_object = None
import config
import arvados.errors
+class KeepLocator(object):
+ EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
+ HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
+
+ def __init__(self, locator_str):
+ self.size = None
+ self.loc_hint = None
+ self._perm_sig = None
+ self._perm_expiry = None
+ pieces = iter(locator_str.split('+'))
+ self.md5sum = next(pieces)
+ for hint in pieces:
+ if hint.startswith('A'):
+ self.parse_permission_hint(hint)
+ elif hint.startswith('K'):
+ self.loc_hint = hint # FIXME
+ elif hint.isdigit():
+ self.size = int(hint)
+ else:
+ raise ValueError("unrecognized hint data {}".format(hint))
+
+ def __str__(self):
+ return '+'.join(
+ str(s) for s in [self.md5sum, self.size, self.loc_hint,
+ self.permission_hint()]
+ if s is not None)
+
+ def _is_hex_length(self, s, *size_spec):
+ if len(size_spec) == 1:
+ good_len = (len(s) == size_spec[0])
+ else:
+ good_len = (size_spec[0] <= len(s) <= size_spec[1])
+ return good_len and self.HEX_RE.match(s)
+
+ def _make_hex_prop(name, length):
+ # Build and return a new property with the given name that
+ # must be a hex string of the given length.
+ data_name = '_{}'.format(name)
+ def getter(self):
+ return getattr(self, data_name)
+ def setter(self, hex_str):
+ if not self._is_hex_length(hex_str, length):
+ raise ValueError("{} must be a {}-digit hex string: {}".
+ format(name, length, hex_str))
+ setattr(self, data_name, hex_str)
+ return property(getter, setter)
+
+ md5sum = _make_hex_prop('md5sum', 32)
+ perm_sig = _make_hex_prop('perm_sig', 40)
+
+ @property
+ def perm_expiry(self):
+ return self._perm_expiry
+
+ @perm_expiry.setter
+ def perm_expiry(self, value):
+ if not self._is_hex_length(value, 1, 8):
+ raise ValueError(
+ "permission timestamp must be a hex Unix timestamp: {}".
+ format(value))
+ self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
+
+ def permission_hint(self):
+ data = [self.perm_sig, self.perm_expiry]
+ if None in data:
+ return None
+ data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
+ return "A{}@{:08x}".format(*data)
+
+ def parse_permission_hint(self, s):
+ try:
+ self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
+ except IndexError:
+ raise ValueError("bad permission hint {}".format(s))
+
+ def permission_expired(self, as_of_dt=None):
+ if self.perm_expiry is None:
+ return False
+ elif as_of_dt is None:
+ as_of_dt = datetime.datetime.now()
+ return self.perm_expiry <= as_of_dt
+
+
class Keep:
@staticmethod
def global_client_object():
#!/usr/bin/env python
-# TODO:
-# --md5sum - display md5 of each file as read from disk
-
-import argparse
-import os
-import sys
-
-parser = argparse.ArgumentParser(
- description='Copy data from the local filesystem to Keep.')
-
-parser.add_argument('paths', metavar='path', type=str, nargs='*',
- help="""
-Local file or directory. Default: read from standard input.
-""")
-
-parser.add_argument('--max-manifest-depth', type=int, metavar='N', default=-1,
- help="""
-Maximum depth of directory tree to represent in the manifest
-structure. A directory structure deeper than this will be represented
-as a single stream in the manifest. If N=0, the manifest will contain
-a single stream. Default: -1 (unlimited), i.e., exactly one manifest
-stream per filesystem directory that contains files.
-""")
-
-group = parser.add_mutually_exclusive_group()
-
-group.add_argument('--as-stream', action='store_true', dest='stream',
- help="""
-Synonym for --stream.
-""")
-
-group.add_argument('--stream', action='store_true',
- help="""
-Store the file content and display the resulting manifest on
-stdout. Do not write the manifest to Keep or save a Collection object
-in Arvados.
-""")
-
-group.add_argument('--as-manifest', action='store_true', dest='manifest',
- help="""
-Synonym for --manifest.
-""")
-
-group.add_argument('--in-manifest', action='store_true', dest='manifest',
- help="""
-Synonym for --manifest.
-""")
-
-group.add_argument('--manifest', action='store_true',
- help="""
-Store the file data and resulting manifest in Keep, save a Collection
-object in Arvados, and display the manifest locator (Collection uuid)
-on stdout. This is the default behavior.
-""")
-
-group.add_argument('--as-raw', action='store_true', dest='raw',
- help="""
-Synonym for --raw.
-""")
-
-group.add_argument('--raw', action='store_true',
- help="""
-Store the file content and display the data block locators on stdout,
-separated by commas, with a trailing newline. Do not store a
-manifest.
-""")
-
-parser.add_argument('--use-filename', type=str, default=None, dest='filename',
- help="""
-Synonym for --filename.
-""")
-
-parser.add_argument('--filename', type=str, default=None,
- help="""
-Use the given filename in the manifest, instead of the name of the
-local file. This is useful when "-" or "/dev/stdin" is given as an
-input file. It can be used only if there is exactly one path given and
-it is not a directory. Implies --manifest.
-""")
-
-group = parser.add_mutually_exclusive_group()
-group.add_argument('--progress', action='store_true',
- help="""
-Display human-readable progress on stderr (bytes and, if possible,
-percentage of total data size). This is the default behavior when
-stderr is a tty.
-""")
-
-group.add_argument('--no-progress', action='store_true',
- help="""
-Do not display human-readable progress on stderr, even if stderr is a
-tty.
-""")
-
-group.add_argument('--batch-progress', action='store_true',
- help="""
-Display machine-readable progress on stderr (bytes and, if known,
-total data size).
-""")
-
-args = parser.parse_args()
-
-if len(args.paths) == 0:
- args.paths += ['/dev/stdin']
-
-if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
- if args.filename:
- parser.error("""
---filename argument cannot be used when storing a directory or
-multiple files.
-""")
-
-# Turn on --progress by default if stderr is a tty.
-if (not (args.batch_progress or args.no_progress)
- and os.isatty(sys.stderr.fileno())):
- args.progress = True
-
-
-import arvados
-import re
-import string
-
-class CollectionWriterWithProgress(arvados.CollectionWriter):
- def flush_data(self, *args, **kwargs):
- if not getattr(self, 'display_type', None):
- return
- if not hasattr(self, 'bytes_flushed'):
- self.bytes_flushed = 0
- self.bytes_flushed += self._data_buffer_len
- super(CollectionWriterWithProgress, self).flush_data(*args, **kwargs)
- self.bytes_flushed -= self._data_buffer_len
- if self.display_type == 'machine':
- sys.stderr.write('%s %d: %d written %d total\n' %
- (sys.argv[0],
- os.getpid(),
- self.bytes_flushed,
- getattr(self, 'bytes_expected', -1)))
- elif getattr(self, 'bytes_expected', 0) > 0:
- pct = 100.0 * self.bytes_flushed / self.bytes_expected
- sys.stderr.write('\r%dM / %dM %.1f%% ' %
- (self.bytes_flushed >> 20,
- self.bytes_expected >> 20, pct))
- else:
- sys.stderr.write('\r%d ' % self.bytes_flushed)
-
- def manifest_text(self, *args, **kwargs):
- manifest_text = (super(CollectionWriterWithProgress, self)
- .manifest_text(*args, **kwargs))
- if getattr(self, 'display_type', None):
- if self.display_type == 'human':
- sys.stderr.write('\n')
- self.display_type = None
- return manifest_text
-
-if args.progress:
- writer = CollectionWriterWithProgress()
- writer.display_type = 'human'
-elif args.batch_progress:
- writer = CollectionWriterWithProgress()
- writer.display_type = 'machine'
-else:
- writer = arvados.CollectionWriter()
-
-if args.paths == ['-']:
- args.paths = ['/dev/stdin']
- if not args.filename:
- args.filename = '-'
-
-# Walk the given directory trees and stat files, adding up file sizes,
-# so we can display progress as percent
-writer.bytes_expected = 0
-for path in args.paths:
- if os.path.isdir(path):
- for filename in arvados.util.listdir_recursive(path):
- writer.bytes_expected += os.path.getsize(
- os.path.join(path, filename))
- elif not os.path.isfile(path):
- del writer.bytes_expected
- break
- else:
- writer.bytes_expected += os.path.getsize(path)
-
-# Copy file data to Keep.
-for path in args.paths:
- if os.path.isdir(path):
- writer.write_directory_tree(path,
- max_manifest_depth=args.max_manifest_depth)
- else:
- writer.start_new_stream()
- writer.start_new_file(args.filename or os.path.split(path)[1])
- with open(path, 'rb') as f:
- while True:
- buf = f.read(2**26)
- if len(buf) == 0:
- break
- writer.write(buf)
-
-if args.stream:
- print writer.manifest_text(),
-elif args.raw:
- writer.finish_current_stream()
- print string.join(writer.data_locators(), ',')
-else:
- # Register the resulting collection in Arvados.
- collection = arvados.api().collections().create(
- body={
- 'uuid': writer.finish(),
- 'manifest_text': writer.manifest_text(),
- },
- ).execute()
-
- # Print the locator (uuid) of the new collection.
- print collection['uuid']
+from arvados.commands.put import main
+main()
--- /dev/null
+#!/usr/bin/env python
+
+import errno
+import os
+import shutil
+import tempfile
+import unittest
+
+class ArvadosBaseTestCase(unittest.TestCase):
+ # This class provides common utility functions for our tests.
+
+ def setUp(self):
+ self._tempdirs = []
+
+ def tearDown(self):
+ for workdir in self._tempdirs:
+ shutil.rmtree(workdir, ignore_errors=True)
+
+ def make_tmpdir(self):
+ self._tempdirs.append(tempfile.mkdtemp())
+ return self._tempdirs[-1]
+
+ def data_file(self, filename):
+ try:
+ basedir = os.path.dirname(__file__)
+ except NameError:
+ basedir = '.'
+ return open(os.path.join(basedir, 'data', filename))
+
+
+class ArvadosKeepLocalStoreTestCase(ArvadosBaseTestCase):
+ def setUp(self):
+ super(ArvadosKeepLocalStoreTestCase, self).setUp()
+ self._orig_keep_local_store = os.environ.get('KEEP_LOCAL_STORE')
+ os.environ['KEEP_LOCAL_STORE'] = self.make_tmpdir()
+
+ def tearDown(self):
+ if self._orig_keep_local_store is None:
+ del os.environ['KEEP_LOCAL_STORE']
+ else:
+ os.environ['KEEP_LOCAL_STORE'] = self._orig_keep_local_store
+ super(ArvadosKeepLocalStoreTestCase, self).tearDown()
+
+ def build_directory_tree(self, tree):
+ tree_root = self.make_tmpdir()
+ for leaf in tree:
+ path = os.path.join(tree_root, leaf)
+ try:
+ os.makedirs(os.path.dirname(path))
+ except OSError as error:
+ if error.errno != errno.EEXIST:
+ raise
+ with open(path, 'w') as tmpfile:
+ tmpfile.write(leaf)
+ return tree_root
+
+ def make_test_file(self, text="test"):
+ testfile = tempfile.NamedTemporaryFile()
+ testfile.write(text)
+ testfile.flush()
+ return testfile
-import subprocess
-import time
+#!/usr/bin/env python
+
+import argparse
import os
+import shutil
import signal
-import yaml
+import subprocess
import sys
-import argparse
-import arvados.config
-import arvados.api
-import shutil
import tempfile
+import time
+import yaml
+
+MY_DIRNAME = os.path.dirname(os.path.realpath(__file__))
+if __name__ == '__main__' and os.path.exists(
+ os.path.join(MY_DIRNAME, '..', 'arvados', '__init__.py')):
+ # We're being launched to support another test suite.
+ # Add the Python SDK source to the library path.
+ sys.path.insert(1, os.path.dirname(MY_DIRNAME))
+
+import arvados.api
+import arvados.config
-ARV_API_SERVER_DIR = '../../services/api'
-KEEP_SERVER_DIR = '../../services/keep'
+ARV_API_SERVER_DIR = '../../../services/api'
+KEEP_SERVER_DIR = '../../../services/keep'
SERVER_PID_PATH = 'tmp/pids/webrick-test.pid'
WEBSOCKETS_SERVER_PID_PATH = 'tmp/pids/passenger-test.pid'
def run(websockets=False, reuse_server=False):
cwd = os.getcwd()
- os.chdir(os.path.join(os.path.dirname(__file__), ARV_API_SERVER_DIR))
+ os.chdir(os.path.join(MY_DIRNAME, ARV_API_SERVER_DIR))
if websockets:
pid_file = WEBSOCKETS_SERVER_PID_PATH
def stop():
cwd = os.getcwd()
- os.chdir(os.path.join(os.path.dirname(__file__), ARV_API_SERVER_DIR))
+ os.chdir(os.path.join(MY_DIRNAME, ARV_API_SERVER_DIR))
kill_server_pid(WEBSOCKETS_SERVER_PID_PATH, 0)
kill_server_pid(SERVER_PID_PATH, 0)
stop_keep()
cwd = os.getcwd()
- os.chdir(os.path.join(os.path.dirname(__file__), KEEP_SERVER_DIR))
+ os.chdir(os.path.join(MY_DIRNAME, KEEP_SERVER_DIR))
if os.environ.get('GOPATH') == None:
os.environ["GOPATH"] = os.getcwd()
else:
def stop_keep():
cwd = os.getcwd()
- os.chdir(os.path.join(os.path.dirname(__file__), KEEP_SERVER_DIR))
+ os.chdir(os.path.join(MY_DIRNAME, KEEP_SERVER_DIR))
_stop_keep(0)
_stop_keep(1)
stop_keep_proxy()
cwd = os.getcwd()
- os.chdir(os.path.join(os.path.dirname(__file__), KEEP_SERVER_DIR))
+ os.chdir(os.path.join(MY_DIRNAME, KEEP_SERVER_DIR))
if os.environ.get('GOPATH') == None:
os.environ["GOPATH"] = os.getcwd()
else:
def stop_keep_proxy():
cwd = os.getcwd()
- os.chdir(os.path.join(os.path.dirname(__file__), KEEP_SERVER_DIR))
+ os.chdir(os.path.join(MY_DIRNAME, KEEP_SERVER_DIR))
kill_server_pid("tmp/keepproxy.pid", 0)
os.chdir(cwd)
def fixture(fix):
'''load a fixture yaml file'''
- with open(os.path.join(os.path.dirname(__file__), ARV_API_SERVER_DIR, "test", "fixtures",
+ with open(os.path.join(MY_DIRNAME, ARV_API_SERVER_DIR, "test", "fixtures",
fix + ".yml")) as f:
return yaml.load(f.read())
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+import re
+import shutil
+import subprocess
+import sys
+import tempfile
+import time
+import unittest
+
+import arvados
+import arvados.commands.put as arv_put
+from arvados_testutil import ArvadosBaseTestCase, ArvadosKeepLocalStoreTestCase
+
+class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
+ CACHE_ARGSET = [
+ [],
+ ['/dev/null'],
+ ['/dev/null', '--filename', 'empty'],
+ ['/tmp'],
+ ['/tmp', '--max-manifest-depth', '0'],
+ ['/tmp', '--max-manifest-depth', '1']
+ ]
+
+ def tearDown(self):
+ super(ArvadosPutResumeCacheTest, self).tearDown()
+ try:
+ self.last_cache.destroy()
+ except AttributeError:
+ pass
+
+ def cache_path_from_arglist(self, arglist):
+ return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
+
+ def test_cache_names_stable(self):
+ for argset in self.CACHE_ARGSET:
+ self.assertEquals(self.cache_path_from_arglist(argset),
+ self.cache_path_from_arglist(argset),
+ "cache name changed for {}".format(argset))
+
+ def test_cache_names_unique(self):
+ results = []
+ for argset in self.CACHE_ARGSET:
+ path = self.cache_path_from_arglist(argset)
+ self.assertNotIn(path, results)
+ results.append(path)
+
+ def test_cache_names_simple(self):
+ # The goal here is to make sure the filename doesn't use characters
+ # reserved by the filesystem. Feel free to adjust this regexp as
+ # long as it still does that.
+ bad_chars = re.compile(r'[^-\.\w]')
+ for argset in self.CACHE_ARGSET:
+ path = self.cache_path_from_arglist(argset)
+ self.assertFalse(bad_chars.search(os.path.basename(path)),
+ "path too exotic: {}".format(path))
+
+ def test_cache_names_ignore_argument_order(self):
+ self.assertEquals(
+ self.cache_path_from_arglist(['a', 'b', 'c']),
+ self.cache_path_from_arglist(['c', 'a', 'b']))
+ self.assertEquals(
+ self.cache_path_from_arglist(['-', '--filename', 'stdin']),
+ self.cache_path_from_arglist(['--filename', 'stdin', '-']))
+
+ def test_cache_names_differ_for_similar_paths(self):
+ # This test needs names at / that don't exist on the real filesystem.
+ self.assertNotEqual(
+ self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
+ self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
+
+ def test_cache_names_ignore_irrelevant_arguments(self):
+ # Workaround: parse_arguments bails on --filename with a directory.
+ path1 = self.cache_path_from_arglist(['/tmp'])
+ args = arv_put.parse_arguments(['/tmp'])
+ args.filename = 'tmp'
+ path2 = arv_put.ResumeCache.make_path(args)
+ self.assertEquals(path1, path2,
+ "cache path considered --filename for directory")
+ self.assertEquals(
+ self.cache_path_from_arglist(['-']),
+ self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
+ "cache path considered --max-manifest-depth for file")
+
+ def test_cache_names_treat_negative_manifest_depths_identically(self):
+ base_args = ['/tmp', '--max-manifest-depth']
+ self.assertEquals(
+ self.cache_path_from_arglist(base_args + ['-1']),
+ self.cache_path_from_arglist(base_args + ['-2']))
+
+ def test_cache_names_treat_stdin_consistently(self):
+ self.assertEquals(
+ self.cache_path_from_arglist(['-', '--filename', 'test']),
+ self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
+
+ def test_cache_names_identical_for_synonymous_names(self):
+ self.assertEquals(
+ self.cache_path_from_arglist(['.']),
+ self.cache_path_from_arglist([os.path.realpath('.')]))
+ testdir = self.make_tmpdir()
+ looplink = os.path.join(testdir, 'loop')
+ os.symlink(testdir, looplink)
+ self.assertEquals(
+ self.cache_path_from_arglist([testdir]),
+ self.cache_path_from_arglist([looplink]))
+
+ def test_cache_names_different_by_api_host(self):
+ config = arvados.config.settings()
+ orig_host = config.get('ARVADOS_API_HOST')
+ try:
+ name1 = self.cache_path_from_arglist(['.'])
+ config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
+ self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
+ finally:
+ if orig_host is None:
+ del config['ARVADOS_API_HOST']
+ else:
+ config['ARVADOS_API_HOST'] = orig_host
+
+ def test_basic_cache_storage(self):
+ thing = ['test', 'list']
+ with tempfile.NamedTemporaryFile() as cachefile:
+ self.last_cache = arv_put.ResumeCache(cachefile.name)
+ self.last_cache.save(thing)
+ self.assertEquals(thing, self.last_cache.load())
+
+ def test_empty_cache(self):
+ with tempfile.NamedTemporaryFile() as cachefile:
+ cache = arv_put.ResumeCache(cachefile.name)
+ self.assertRaises(ValueError, cache.load)
+
+ def test_cache_persistent(self):
+ thing = ['test', 'list']
+ path = os.path.join(self.make_tmpdir(), 'cache')
+ cache = arv_put.ResumeCache(path)
+ cache.save(thing)
+ cache.close()
+ self.last_cache = arv_put.ResumeCache(path)
+ self.assertEquals(thing, self.last_cache.load())
+
+ def test_multiple_cache_writes(self):
+ thing = ['short', 'list']
+ with tempfile.NamedTemporaryFile() as cachefile:
+ self.last_cache = arv_put.ResumeCache(cachefile.name)
+ # Start writing an object longer than the one we test, to make
+ # sure the cache file gets truncated.
+ self.last_cache.save(['long', 'long', 'list'])
+ self.last_cache.save(thing)
+ self.assertEquals(thing, self.last_cache.load())
+
+ def test_cache_is_locked(self):
+ with tempfile.NamedTemporaryFile() as cachefile:
+ cache = arv_put.ResumeCache(cachefile.name)
+ self.assertRaises(arv_put.ResumeCacheConflict,
+ arv_put.ResumeCache, cachefile.name)
+
+ def test_cache_stays_locked(self):
+ with tempfile.NamedTemporaryFile() as cachefile:
+ self.last_cache = arv_put.ResumeCache(cachefile.name)
+ path = cachefile.name
+ self.last_cache.save('test')
+ self.assertRaises(arv_put.ResumeCacheConflict,
+ arv_put.ResumeCache, path)
+
+ def test_destroy_cache(self):
+ cachefile = tempfile.NamedTemporaryFile(delete=False)
+ try:
+ cache = arv_put.ResumeCache(cachefile.name)
+ cache.save('test')
+ cache.destroy()
+ try:
+ arv_put.ResumeCache(cachefile.name)
+ except arv_put.ResumeCacheConflict:
+ self.fail("could not load cache after destroying it")
+ self.assertRaises(ValueError, cache.load)
+ finally:
+ if os.path.exists(cachefile.name):
+ os.unlink(cachefile.name)
+
+ def test_restart_cache(self):
+ path = os.path.join(self.make_tmpdir(), 'cache')
+ cache = arv_put.ResumeCache(path)
+ cache.save('test')
+ cache.restart()
+ self.assertRaises(ValueError, cache.load)
+ self.assertRaises(arv_put.ResumeCacheConflict,
+ arv_put.ResumeCache, path)
+
+
+class ArvadosPutCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
+ def setUp(self):
+ super(ArvadosPutCollectionWriterTest, self).setUp()
+ with tempfile.NamedTemporaryFile(delete=False) as cachefile:
+ self.cache = arv_put.ResumeCache(cachefile.name)
+ self.cache_filename = cachefile.name
+
+ def tearDown(self):
+ super(ArvadosPutCollectionWriterTest, self).tearDown()
+ if os.path.exists(self.cache_filename):
+ self.cache.destroy()
+ self.cache.close()
+
+ def test_writer_caches(self):
+ cwriter = arv_put.ArvPutCollectionWriter(self.cache)
+ cwriter.write_file('/dev/null')
+ cwriter.cache_state()
+ self.assertTrue(self.cache.load())
+ self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
+
+ def test_writer_works_without_cache(self):
+ cwriter = arv_put.ArvPutCollectionWriter()
+ cwriter.write_file('/dev/null')
+ self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
+
+ def test_writer_resumes_from_cache(self):
+ cwriter = arv_put.ArvPutCollectionWriter(self.cache)
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ cwriter.cache_state()
+ new_writer = arv_put.ArvPutCollectionWriter.from_cache(
+ self.cache)
+ self.assertEquals(
+ ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
+ new_writer.manifest_text())
+
+ def test_new_writer_from_stale_cache(self):
+ cwriter = arv_put.ArvPutCollectionWriter(self.cache)
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
+ new_writer.write_file('/dev/null')
+ self.assertEquals(". 0:0:null\n", new_writer.manifest_text())
+
+ def test_new_writer_from_empty_cache(self):
+ cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
+ cwriter.write_file('/dev/null')
+ self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
+
+ def test_writer_resumable_after_arbitrary_bytes(self):
+ cwriter = arv_put.ArvPutCollectionWriter(self.cache)
+ # These bytes are intentionally not valid UTF-8.
+ with self.make_test_file('\x00\x07\xe2') as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ cwriter.cache_state()
+ new_writer = arv_put.ArvPutCollectionWriter.from_cache(
+ self.cache)
+ self.assertEquals(cwriter.manifest_text(), new_writer.manifest_text())
+
+ def make_progress_tester(self):
+ progression = []
+ def record_func(written, expected):
+ progression.append((written, expected))
+ return progression, record_func
+
+ def test_progress_reporting(self):
+ for expect_count in (None, 8):
+ progression, reporter = self.make_progress_tester()
+ cwriter = arv_put.ArvPutCollectionWriter(
+ reporter=reporter, bytes_expected=expect_count)
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ cwriter.finish_current_stream()
+ self.assertIn((4, expect_count), progression)
+
+ def test_resume_progress(self):
+ cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
+ with self.make_test_file() as testfile:
+ # Set up a writer with some flushed bytes.
+ cwriter.write_file(testfile.name, 'test')
+ cwriter.finish_current_stream()
+ cwriter.cache_state()
+ new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
+ self.assertEqual(new_writer.bytes_written, 4)
+
+
+class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
+ TEST_SIZE = os.path.getsize(__file__)
+
+ def test_expected_bytes_for_file(self):
+ self.assertEquals(self.TEST_SIZE,
+ arv_put.expected_bytes_for([__file__]))
+
+ def test_expected_bytes_for_tree(self):
+ tree = self.make_tmpdir()
+ shutil.copyfile(__file__, os.path.join(tree, 'one'))
+ shutil.copyfile(__file__, os.path.join(tree, 'two'))
+ self.assertEquals(self.TEST_SIZE * 2,
+ arv_put.expected_bytes_for([tree]))
+ self.assertEquals(self.TEST_SIZE * 3,
+ arv_put.expected_bytes_for([tree, __file__]))
+
+ def test_expected_bytes_for_device(self):
+ self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
+ self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
+
+
+class ArvadosPutReportTest(ArvadosBaseTestCase):
+ def test_machine_progress(self):
+ for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
+ expect = ": {} written {} total\n".format(
+ count, -1 if (total is None) else total)
+ self.assertTrue(
+ arv_put.machine_progress(count, total).endswith(expect))
+
+ def test_known_human_progress(self):
+ for count, total in [(0, 1), (2, 4), (45, 60)]:
+ expect = '{:.1%}'.format(float(count) / total)
+ actual = arv_put.human_progress(count, total)
+ self.assertTrue(actual.startswith('\r'))
+ self.assertIn(expect, actual)
+
+ def test_unknown_human_progress(self):
+ for count in [1, 20, 300, 4000, 50000]:
+ self.assertTrue(re.search(r'\b{}\b'.format(count),
+ arv_put.human_progress(count, None)))
+
+
+class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
+ def test_simple_file_put(self):
+ with self.make_test_file() as testfile:
+ path = testfile.name
+ arv_put.main(['--stream', '--no-progress', path])
+ self.assertTrue(
+ os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
+ '098f6bcd4621d373cade4e832627b4f6')),
+ "did not find file stream in Keep store")
+
+ def test_short_put_from_stdin(self):
+ # Have to run this separately since arv-put can't read from the
+ # tests' stdin.
+ # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
+ # case, because the /proc entry is already gone by the time it tries.
+ pipe = subprocess.Popen(
+ [sys.executable, arv_put.__file__, '--stream'],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=open('/dev/null', 'w'))
+ pipe.stdin.write('stdin test\n')
+ pipe.stdin.close()
+ deadline = time.time() + 5
+ while (pipe.poll() is None) and (time.time() < deadline):
+ time.sleep(.1)
+ if pipe.returncode is None:
+ pipe.terminate()
+ self.fail("arv-put did not PUT from stdin within 5 seconds")
+ self.assertEquals(pipe.returncode, 0)
+ self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
+
+
+if __name__ == '__main__':
+ unittest.main()
#
# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
-import unittest
import arvados
-import os
import bz2
-import sys
+import copy
+import os
+import pprint
import subprocess
+import tempfile
+import unittest
-class KeepLocalStoreTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def runTest(self):
- self.assertEqual(arvados.Keep.put('foo'), 'acbd18db4cc2f85cedef654fccc4a4d8+3', 'wrong md5 hash from Keep.put')
- self.assertEqual(arvados.Keep.get('acbd18db4cc2f85cedef654fccc4a4d8+3'), 'foo', 'wrong data from Keep.get')
+from arvados_testutil import ArvadosKeepLocalStoreTestCase
+
+class TestResumableWriter(arvados.ResumableCollectionWriter):
+ KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
-class LocalCollectionWriterTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def runTest(self):
+ def current_state(self):
+ return self.dump_state(copy.deepcopy)
+
+
+class ArvadosCollectionsTest(ArvadosKeepLocalStoreTestCase):
+ def write_foo_bar_baz(self):
cw = arvados.CollectionWriter()
self.assertEqual(cw.current_stream_name(), '.',
'current_stream_name() should be "." now')
cw.start_new_stream('baz')
cw.write('baz')
cw.set_current_file_name('baz.txt')
- hash = cw.finish()
- self.assertEqual(hash,
+ return cw.finish()
+
+ def test_keep_local_store(self):
+ self.assertEqual(arvados.Keep.put('foo'), 'acbd18db4cc2f85cedef654fccc4a4d8+3', 'wrong md5 hash from Keep.put')
+ self.assertEqual(arvados.Keep.get('acbd18db4cc2f85cedef654fccc4a4d8+3'), 'foo', 'wrong data from Keep.get')
+
+ def test_local_collection_writer(self):
+ self.assertEqual(self.write_foo_bar_baz(),
'd6c3b8e571f1b81ebb150a45ed06c884+114',
- "resulting manifest hash was {0}, expecting d6c3b8e571f1b81ebb150a45ed06c884+114".format(hash))
+ "wrong locator hash for files foo, bar, baz")
-class LocalCollectionReaderTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- LocalCollectionWriterTest().runTest()
- def runTest(self):
+ def test_local_collection_reader(self):
+ self.write_foo_bar_baz()
cr = arvados.CollectionReader('d6c3b8e571f1b81ebb150a45ed06c884+114+Xzizzle')
got = []
for s in cr.all_streams():
'',
'reading zero bytes should have returned empty string')
-class LocalCollectionManifestSubsetTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- LocalCollectionWriterTest().runTest()
- def runTest(self):
- self._runTest('d6c3b8e571f1b81ebb150a45ed06c884+114',
- [[3, '.', 'bar.txt', 'bar'],
- [3, '.', 'foo.txt', 'foo'],
- [3, './baz', 'baz.txt', 'baz']])
- self._runTest((". %s %s 0:3:foo.txt 3:3:bar.txt\n" %
- (arvados.Keep.put("foo"),
- arvados.Keep.put("bar"))),
- [[3, '.', 'bar.txt', 'bar'],
- [3, '.', 'foo.txt', 'foo']])
- self._runTest((". %s %s 0:2:fo.txt 2:4:obar.txt\n" %
- (arvados.Keep.put("foo"),
- arvados.Keep.put("bar"))),
- [[2, '.', 'fo.txt', 'fo'],
- [4, '.', 'obar.txt', 'obar']])
- self._runTest((". %s %s 0:2:fo.txt 2:0:zero.txt 2:2:ob.txt 4:2:ar.txt\n" %
- (arvados.Keep.put("foo"),
- arvados.Keep.put("bar"))),
- [[2, '.', 'ar.txt', 'ar'],
- [2, '.', 'fo.txt', 'fo'],
- [2, '.', 'ob.txt', 'ob'],
- [0, '.', 'zero.txt', '']])
-
- def _runTest(self, collection, expected):
+ def _test_subset(self, collection, expected):
cr = arvados.CollectionReader(collection)
for s in cr.all_streams():
for ex in expected:
ex,
'all_files|as_manifest did not preserve manifest contents: got %s expected %s' % (got, ex))
-class LocalCollectionReadlineTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def _runTest(self, what_in, what_out):
+ def test_collection_manifest_subset(self):
+ self.write_foo_bar_baz()
+ self._test_subset('d6c3b8e571f1b81ebb150a45ed06c884+114',
+ [[3, '.', 'bar.txt', 'bar'],
+ [3, '.', 'foo.txt', 'foo'],
+ [3, './baz', 'baz.txt', 'baz']])
+ self._test_subset((". %s %s 0:3:foo.txt 3:3:bar.txt\n" %
+ (arvados.Keep.put("foo"),
+ arvados.Keep.put("bar"))),
+ [[3, '.', 'bar.txt', 'bar'],
+ [3, '.', 'foo.txt', 'foo']])
+ self._test_subset((". %s %s 0:2:fo.txt 2:4:obar.txt\n" %
+ (arvados.Keep.put("foo"),
+ arvados.Keep.put("bar"))),
+ [[2, '.', 'fo.txt', 'fo'],
+ [4, '.', 'obar.txt', 'obar']])
+ self._test_subset((". %s %s 0:2:fo.txt 2:0:zero.txt 2:2:ob.txt 4:2:ar.txt\n" %
+ (arvados.Keep.put("foo"),
+ arvados.Keep.put("bar"))),
+ [[2, '.', 'ar.txt', 'ar'],
+ [2, '.', 'fo.txt', 'fo'],
+ [2, '.', 'ob.txt', 'ob'],
+ [0, '.', 'zero.txt', '']])
+
+ def _test_readline(self, what_in, what_out):
cw = arvados.CollectionWriter()
cw.start_new_file('test.txt')
cw.write(what_in)
self.assertEqual(got,
what_out,
"readlines did not split lines correctly: %s" % got)
- def runTest(self):
- self._runTest("\na\nbcd\n\nefg\nz",
- ["\n", "a\n", "bcd\n", "\n", "efg\n", "z"])
- self._runTest("ab\ncd\n",
- ["ab\n", "cd\n"])
-
-class LocalCollectionEmptyFileTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def runTest(self):
+
+ def test_collection_readline(self):
+ self._test_readline("\na\nbcd\n\nefg\nz",
+ ["\n", "a\n", "bcd\n", "\n", "efg\n", "z"])
+ self._test_readline("ab\ncd\n",
+ ["ab\n", "cd\n"])
+
+ def test_collection_empty_file(self):
cw = arvados.CollectionWriter()
cw.start_new_file('zero.txt')
cw.write('')
got_sizes += [f.size()]
self.assertEqual(got_sizes, expect_sizes, "got wrong file sizes %s, expected %s" % (got_sizes, expect_sizes))
-class LocalCollectionBZ2DecompressionTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def runTest(self):
+ def test_collection_bz2_decompression(self):
n_lines_in = 2**18
data_in = "abc\n"
for x in xrange(0, 18):
n_lines_in,
"decompression returned %d lines instead of %d" % (got, n_lines_in))
-class LocalCollectionGzipDecompressionTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def runTest(self):
+ def test_collection_gzip_decompression(self):
n_lines_in = 2**18
data_in = "abc\n"
for x in xrange(0, 18):
n_lines_in,
"decompression returned %d lines instead of %d" % (got, n_lines_in))
-class NormalizedCollectionTest(unittest.TestCase):
- def runTest(self):
+ def test_normalized_collection(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt"""
./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
""")
- with open('testdata/1000G_ref_manifest') as f6:
+ with self.data_file('1000G_ref_manifest') as f6:
m6 = f6.read()
self.assertEqual(arvados.CollectionReader(m6).manifest_text(), m6)
- with open('testdata/jlake_manifest') as f7:
+ with self.data_file('jlake_manifest') as f7:
m7 = f7.read()
self.assertEqual(arvados.CollectionReader(m7).manifest_text(), m7)
"""
self.assertEqual(arvados.CollectionReader(m8).manifest_text(), m8)
-class LocatorsAndRangesTest(unittest.TestCase):
- def runTest(self):
+ def test_locators_and_ranges(self):
blocks2 = [['a', 10, 0],
['b', 10, 10],
['c', 10, 20],
self.assertEqual(arvados.locators_and_ranges(blocks2, 49, 2), [['e', 10, 9, 1], ['f', 10, 0, 1]])
self.assertEqual(arvados.locators_and_ranges(blocks2, 59, 2), [['f', 10, 9, 1]])
-
+
blocks3 = [['a', 10, 0],
['b', 10, 10],
['c', 10, 20],
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 5), [['a', 10, 0, 5]])
self.assertEqual(arvados.locators_and_ranges(blocks, 3, 5), [['a', 10, 3, 5]])
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 10), [['a', 10, 0, 10]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 11), [['a', 10, 0, 10],
['b', 15, 0, 1]])
self.assertEqual(arvados.locators_and_ranges(blocks, 1, 11), [['a', 10, 1, 9],
['b', 15, 0, 2]])
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 25), [['a', 10, 0, 10],
['b', 15, 0, 15]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 30), [['a', 10, 0, 10],
['b', 15, 0, 15],
['c', 5, 0, 5]])
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 31), [['a', 10, 0, 10],
['b', 15, 0, 15],
['c', 5, 0, 5]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 15, 5), [['b', 15, 5, 5]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 8, 17), [['a', 10, 8, 2],
['b', 15, 0, 15]])
self.assertEqual(arvados.locators_and_ranges(blocks, 8, 20), [['a', 10, 8, 2],
['b', 15, 0, 15],
['c', 5, 0, 3]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 26, 2), [['c', 5, 1, 2]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 9, 15), [['a', 10, 9, 1],
- ['b', 15, 0, 14]])
+ ['b', 15, 0, 14]])
self.assertEqual(arvados.locators_and_ranges(blocks, 10, 15), [['b', 15, 0, 15]])
self.assertEqual(arvados.locators_and_ranges(blocks, 11, 15), [['b', 15, 1, 14],
['c', 5, 0, 1]])
-class FileStreamTest(unittest.TestCase):
class MockStreamReader(object):
def __init__(self, content):
self.content = content
def readfrom(self, start, size):
return self.content[start:start+size]
- def runTest(self):
+ def test_file_stream(self):
content = 'abcdefghijklmnopqrstuvwxyz0123456789'
- msr = FileStreamTest.MockStreamReader(content)
+ msr = self.MockStreamReader(content)
segments = [[0, 10, 0],
[10, 15, 10],
[25, 5, 25]]
-
+
sfr = arvados.StreamFileReader(msr, segments, "test")
self.assertEqual(sfr.name(), "test")
segments = [[26, 10, 0],
[0, 15, 10],
[15, 5, 25]]
-
+
sfr = arvados.StreamFileReader(msr, segments, "test")
self.assertEqual(sfr.size(), 30)
self.assertEqual(sfr.tell(), 30)
-class StreamReaderTest(unittest.TestCase):
-
class MockKeep(object):
def __init__(self, content):
self.content = content
def get(self, locator):
return self.content[locator]
- def runTest(self):
- keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
- 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
+ def test_stream_reader(self):
+ keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
+ 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
'cccccccccccccccccccccccccccccccc+5': 'z0123'}
- mk = StreamReaderTest.MockKeep(keepblocks)
+ mk = self.MockKeep(keepblocks)
sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:30:foo"], mk)
self.assertEqual(sr.readfrom(25, 5), content[25:30])
self.assertEqual(sr.readfrom(30, 5), '')
-class ExtractFileTest(unittest.TestCase):
- def runTest(self):
+ def test_extract_file(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md6sum.txt
. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md7sum.txt
". 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md7sum.txt\n")
self.assertEqual(arvados.CollectionReader(m1).all_streams()[0].files()['md9sum.txt'].as_manifest(),
". 085c37f02916da1cad16f93c54d899b7+41 5348b82a029fd9e971a811ce1f71360b+43 8b22da26f9f433dea0a10e5ec66d73ba+43 40:80:md9sum.txt\n")
+
+ def test_write_directory_tree(self):
+ cwriter = arvados.CollectionWriter()
+ cwriter.write_directory_tree(self.build_directory_tree(
+ ['basefile', 'subdir/subfile']))
+ self.assertEqual(cwriter.manifest_text(),
+ """. c5110c5ac93202d8e0f9e381f22bac0f+8 0:8:basefile
+./subdir 1ca4dec89403084bf282ad31e6cf7972+14 0:14:subfile\n""")
+
+ def test_write_named_directory_tree(self):
+ cwriter = arvados.CollectionWriter()
+ cwriter.write_directory_tree(self.build_directory_tree(
+ ['basefile', 'subdir/subfile']), 'root')
+ self.assertEqual(
+ cwriter.manifest_text(),
+ """./root c5110c5ac93202d8e0f9e381f22bac0f+8 0:8:basefile
+./root/subdir 1ca4dec89403084bf282ad31e6cf7972+14 0:14:subfile\n""")
+
+ def test_write_directory_tree_in_one_stream(self):
+ cwriter = arvados.CollectionWriter()
+ cwriter.write_directory_tree(self.build_directory_tree(
+ ['basefile', 'subdir/subfile']), max_manifest_depth=0)
+ self.assertEqual(cwriter.manifest_text(),
+ """. 4ace875ffdc6824a04950f06858f4465+22 0:8:basefile
+./subdir 4ace875ffdc6824a04950f06858f4465+22 8:14:subfile\n""")
+
+ def test_write_directory_tree_with_limited_recursion(self):
+ cwriter = arvados.CollectionWriter()
+ cwriter.write_directory_tree(
+ self.build_directory_tree(['f1', 'd1/f2', 'd1/d2/f3']),
+ max_manifest_depth=1)
+ self.assertEqual(cwriter.manifest_text(),
+ """. bd19836ddb62c11c55ab251ccaca5645+2 0:2:f1
+./d1 50170217e5b04312024aa5cd42934494+13 8:5:f2
+./d1/d2 50170217e5b04312024aa5cd42934494+13 0:8:f3\n""")
+
+ def test_write_one_file(self):
+ cwriter = arvados.CollectionWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name)
+ self.assertEqual(
+ cwriter.manifest_text(),
+ ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:{}\n".format(
+ os.path.basename(testfile.name)))
+
+ def test_write_named_file(self):
+ cwriter = arvados.CollectionWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'foo')
+ self.assertEqual(cwriter.manifest_text(),
+ ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:foo\n")
+
+ def test_write_multiple_files(self):
+ cwriter = arvados.CollectionWriter()
+ for letter in 'ABC':
+ with self.make_test_file(letter) as testfile:
+ cwriter.write_file(testfile.name, letter)
+ self.assertEqual(
+ cwriter.manifest_text(),
+ ". 902fbdd2b1df0c4f70b4a5d23525e932+3 0:1:A 1:1:B 2:1:C\n")
+
+ def test_basic_resume(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ resumed = TestResumableWriter.from_state(cwriter.current_state())
+ self.assertEquals(cwriter.manifest_text(), resumed.manifest_text(),
+ "resumed CollectionWriter had different manifest")
+
+ def test_resume_fails_when_missing_dependency(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.current_state())
+
+ def test_resume_fails_when_dependency_mtime_changed(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ os.utime(testfile.name, (0, 0))
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.current_state())
+
+ def test_resume_fails_when_dependency_is_nonfile(self):
+ cwriter = TestResumableWriter()
+ cwriter.write_file('/dev/null', 'empty')
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.current_state())
+
+ def test_resume_fails_when_dependency_size_changed(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ orig_mtime = os.fstat(testfile.fileno()).st_mtime
+ testfile.write('extra')
+ testfile.flush()
+ os.utime(testfile.name, (orig_mtime, orig_mtime))
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.current_state())
+
+ def test_resume_fails_with_expired_locator(self):
+ cwriter = TestResumableWriter()
+ state = cwriter.current_state()
+ # Add an expired locator to the state.
+ state['_current_stream_locators'].append(''.join([
+ 'a' * 32, '+A', 'b' * 40, '@', '10000000']))
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state, state)
+
+ def test_arbitrary_objects_not_resumable(self):
+ cwriter = TestResumableWriter()
+ with open('/dev/null') as badfile:
+ self.assertRaises(arvados.errors.AssertionError,
+ cwriter.write_file, badfile)
+
+ def test_arbitrary_writes_not_resumable(self):
+ cwriter = TestResumableWriter()
+ self.assertRaises(arvados.errors.AssertionError,
+ cwriter.write, "badtext")
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import datetime
+import itertools
+import random
+import unittest
+
+from arvados.keep import KeepLocator
+
+class ArvadosPutResumeCacheTest(unittest.TestCase):
+ DEFAULT_TEST_COUNT = 10
+
+ def numstrs(fmtstr, base, exponent):
+ def genstrs(self, count=None):
+ return (fmtstr.format(random.randint(0, base ** exponent))
+ for c in xrange(count or self.DEFAULT_TEST_COUNT))
+ return genstrs
+
+ checksums = numstrs('{:032x}', 16, 32)
+ sizes = numstrs('{:d}', 2, 26)
+ signatures = numstrs('{:040x}', 16, 40)
+ timestamps = numstrs('{:08x}', 16, 8)
+
+ def perm_hints(self, count=DEFAULT_TEST_COUNT):
+ for sig, ts in itertools.izip(self.signatures(count),
+ self.timestamps(count)):
+ yield 'A{}@{}'.format(sig, ts)
+
+ def test_good_locators_returned(self):
+ for hint_gens in [(), (self.sizes(),), (self.perm_hints(),),
+ (self.sizes(), self.perm_hints())]:
+ for loc_data in itertools.izip(self.checksums(), *hint_gens):
+ locator = '+'.join(loc_data)
+ self.assertEquals(locator, str(KeepLocator(locator)))
+
+ def test_nonchecksum_rejected(self):
+ for badstr in ['', 'badbadbad', '8f9e68d957b504a29ba76c526c3145dj',
+ '+8f9e68d957b504a29ba76c526c3145d9',
+ '3+8f9e68d957b504a29ba76c526c3145d9']:
+ self.assertRaises(ValueError, KeepLocator, badstr)
+
+ def test_bad_hints_rejected(self):
+ checksum = next(self.checksums(1))
+ for badhint in ['', 'nonsense', '+32', checksum]:
+ self.assertRaises(ValueError, KeepLocator,
+ '+'.join([checksum, badhint]))
+
+ def test_expiry_passed(self):
+ checksum = next(self.checksums(1))
+ signature = next(self.signatures(1))
+ dt1980 = datetime.datetime(1980, 1, 1)
+ dt2000 = datetime.datetime(2000, 2, 2)
+ dt2080 = datetime.datetime(2080, 3, 3)
+ locator = KeepLocator(checksum)
+ self.assertFalse(locator.permission_expired())
+ self.assertFalse(locator.permission_expired(dt1980))
+ self.assertFalse(locator.permission_expired(dt2080))
+ # Timestamped to 1987-01-05 18:48:32.
+ locator = KeepLocator('{}+A{}@20000000'.format(checksum, signature))
+ self.assertTrue(locator.permission_expired())
+ self.assertTrue(locator.permission_expired(dt2000))
+ self.assertFalse(locator.permission_expired(dt1980))
+
+
+if __name__ == '__main__':
+ unittest.main()
# translate UUID to numeric ID here.
resource_attrs[:user_id] =
User.where(uuid: resource_attrs.delete(:owner_uuid)).first.andand.id
+ elsif not resource_attrs[:user_id]
+ resource_attrs[:user_id] = current_user.id
end
resource_attrs[:api_client_id] = Thread.current[:api_client].id
super
# Note: This only returns permission links. It does not account for
# permissions obtained via user.is_admin or
# user.uuid==object.owner_uuid.
- has_many :permissions, :foreign_key => :head_uuid, :class_name => 'Link', :primary_key => :uuid, :conditions => "link_class = 'permission'", dependent: :destroy
+ has_many :permissions, :foreign_key => :head_uuid, :class_name => 'Link', :primary_key => :uuid, :conditions => "link_class = 'permission'"
class PermissionDeniedError < StandardError
def http_status
self.owner_uuid ||= current_user.uuid
end
if self.owner_uuid_changed?
- if current_user.uuid == self.owner_uuid or
+ if new_record?
+ return true
+ elsif current_user.uuid == self.owner_uuid or
current_user.can? write: self.owner_uuid
# current_user is, or has :write permission on, the new owner
else
end
def destroy_permission_links
- Link.destroy_all(['link_class=? and (head_uuid=? or tail_uuid=?)',
- 'permission', uuid, uuid])
+ if uuid
+ Link.destroy_all(['link_class=? and (head_uuid=? or tail_uuid=?)',
+ 'permission', uuid, uuid])
+ end
end
end
name: can_read
head_uuid: 1fd08fc162a5c6413070a8bd0bffc818+150
properties: {}
+
+bug2931_link_with_null_head_uuid:
+ uuid: zzzzz-o0j2j-uru66qok2wruasb
+ owner_uuid: zzzzz-tpzed-000000000000000
+ created_at: 2014-05-30 14:30:00.184389725 Z
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-000000000000000
+ modified_at: 2014-05-30 14:30:00.184019565 Z
+ updated_at: 2014-05-30 14:30:00.183829316 Z
+ link_class: permission
+ name: bug2931
+ tail_uuid: ~
+ head_uuid: ~
+ properties: {}
require 'test_helper'
class ApiClientAuthorizationTest < ActiveSupport::TestCase
- # test "the truth" do
- # assert true
- # end
+ include CurrentApiClient
+
+ [:admin_trustedclient, :active_trustedclient].each do |token|
+ test "ApiClientAuthorization can be created then deleted by #{token}" do
+ set_user_from_auth token
+ x = ApiClientAuthorization.create!(user_id: current_user.id,
+ api_client_id: 0,
+ scopes: [])
+ newtoken = x.api_token
+ assert x.destroy, "Failed to destroy new ApiClientAuth"
+ assert_empty ApiClientAuthorization.where(api_token: newtoken), "Destroyed ApiClientAuth is still in database"
+ end
+ end
end
name: 'can_manage')
assert perm_link.save, "should give myself permission on my own object"
end
+
+ test "Delete permission links when deleting an object" do
+ set_user_from_auth :active_trustedclient
+
+ ob = Specimen.create!
+ Link.create!(tail_uuid: users(:active).uuid,
+ head_uuid: ob.uuid,
+ link_class: 'permission',
+ name: 'can_manage')
+ ob_uuid = ob.uuid
+ assert ob.destroy, "Could not destroy object with 1 permission link"
+ assert_empty(Link.where(head_uuid: ob_uuid),
+ "Permission link was not deleted when object was deleted")
+ end
end
+++ /dev/null
-../../sdk/python/run_test_server.py
\ No newline at end of file
--- /dev/null
+../../../sdk/python/tests/run_test_server.py
\ No newline at end of file
listener.Close()
}(term)
signal.Notify(term, syscall.SIGTERM)
+ signal.Notify(term, syscall.SIGINT)
if pidfile != "" {
f, err := os.Create(pidfile)
func pythonDir() string {
gopath := os.Getenv("GOPATH")
- return fmt.Sprintf("%s/../../sdk/python", strings.Split(gopath, ":")[0])
+ return fmt.Sprintf("%s/../../sdk/python/tests", strings.Split(gopath, ":")[0])
}
func (s *ServerRequiredSuite) SetUpSuite(c *C) {