before_filter :check_auth_header
def check_auth_header
- mgmt_token = Rails.configuration.management_token
+ mgmt_token = Rails.configuration.ManagementToken
auth_header = request.headers['Authorization']
if !mgmt_token
# Token to be included in all healthcheck requests. Disabled by default.
# Workbench expects request header of the format "Authorization: Bearer xxx"
- management_token: false
+ ManagementToken: false
[true, 'Bearer configuredmanagementtoken', 200, '{"health":"OK"}'],
].each do |enabled, header, error_code, error_msg|
test "ping when #{if enabled then 'enabled' else 'disabled' end} with header '#{header}'" do
- Rails.configuration.management_token = 'configuredmanagementtoken' if enabled
+ Rails.configuration.ManagementToken = 'configuredmanagementtoken' if enabled
@request.headers['Authorization'] = header
get :ping
# Go binaries
cd $WORKSPACE/packages/$TARGET
export GOPATH=$(mktemp -d)
+go get -v github.com/kardianos/govendor
package_go_binary sdk/go/crunchrunner crunchrunner \
"Crunchrunner executes a command inside a container and uploads the output"
package_go_binary services/arv-git-httpd arvados-git-httpd \
mkdir -p "$GOPATH/src/git.curoverse.com"
ln -sfn "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+ (cd "$GOPATH/src/git.curoverse.com/arvados.git" && "$GOPATH/bin/govendor" sync -v)
cd "$GOPATH/src/git.curoverse.com/arvados.git/$src_path"
local version="$(version_from_git)"
local timestamp="$(timestamp_from_git)"
- # If the command imports anything from the Arvados SDK, bump the
- # version number and build a new package whenever the SDK changes.
+ # Update the version number and build a new package if the vendor
+ # bundle has changed, or the command imports anything from the
+ # Arvados SDK and the SDK has changed.
+ declare -a checkdirs=(vendor)
if grep -qr git.curoverse.com/arvados .; then
- cd "$GOPATH/src/git.curoverse.com/arvados.git/sdk/go"
- if [[ $(timestamp_from_git) -gt "$timestamp" ]]; then
+ checkdirs+=(sdk/go)
+ fi
+ for dir in ${checkdirs[@]}; do
+ cd "$GOPATH/src/git.curoverse.com/arvados.git/$dir"
+ ts="$(timestamp_from_git)"
+ if [[ "$ts" -gt "$timestamp" ]]; then
version=$(version_from_git)
+ timestamp="$ts"
fi
- fi
+ done
cd $WORKSPACE/packages/$TARGET
test_package_presence $prog $version go
$cmd = [$docker_bin, 'ps', '-q'];
}
Log(undef, "Sanity check is `@$cmd`");
-my ($exited, $stdout, $stderr) = srun_sync(
+my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
$cmd,
{label => "sanity check"});
# Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
# up work directories crunch_tmp/work, crunch_tmp/opt,
# crunch_tmp/src*.
- my ($exited, $stdout, $stderr) = srun_sync(
+ my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
['bash', '-ec', q{
arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
}],
{label => "clean work dirs"});
if ($exited != 0) {
- exit(EX_RETRY_UNLOCKED);
+ exit_retry_unlocked();
}
}
echo >&2 "image loaded successfully"
};
- my ($exited, $stdout, $stderr) = srun_sync(
+ my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodelist=" . join(',', @node)],
["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
{label => "load docker image"});
if ($exited != 0)
{
- exit(EX_RETRY_UNLOCKED);
+ exit_retry_unlocked();
}
# Determine whether this version of Docker supports memory+swap limits.
- ($exited, $stdout, $stderr) = srun_sync(
+ ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodes=1"],
[$docker_bin, 'run', '--help'],
{label => "check --memory-swap feature"});
+ if ($tempfail) {
+ exit_retry_unlocked();
+ }
$docker_limitmem = ($stdout =~ /--memory-swap/);
# Find a non-root Docker user to use.
$label = "check whether user '$try_user' is UID 0";
$try_user_arg = "--user=$try_user";
}
- my ($exited, $stdout, $stderr) = srun_sync(
+ my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodes=1"],
["/bin/sh", "-ec",
"$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
Log(undef, "Container will run with $dockeruserarg");
}
last;
+ } elsif ($tempfail) {
+ exit_retry_unlocked();
}
}
"mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
$ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
- my ($stdout, $stderr);
- ($exited, $stdout, $stderr) = srun_sync(
+ my ($stdout, $stderr, $tempfail);
+ ($exited, $stdout, $stderr, $tempfail) = srun_sync(
\@srunargs, \@execargs,
{label => "run install script on all workers"},
- $build_script . $git_archive);
+ $build_script . $git_archive);
+ if ($tempfail) {
+ exit_retry_unlocked();
+ }
my $stderr_anything_from_script = 0;
for my $line (split(/\n/, $stderr)) {
} elsif ($working_slot_count < 1) {
save_output_collection();
save_meta();
- exit(EX_RETRY_UNLOCKED);
+ exit_retry_unlocked();
} elsif ($thisround_succeeded == 0 &&
($thisround_failed == 0 || $thisround_failed > 4)) {
my $message = "stop because $thisround_failed tasks failed and none succeeded";
$st->{node}->{fail_count}++;
}
}
- elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
+ elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
$jobstep[$jobstepidx]->{tempfail} = 1;
if (defined($job_slot_index)) {
$slot[$job_slot_index]->{node}->{fail_count}++;
if ($main::please_freeze || $j->{tempfail}) {
$exited ||= 255;
}
- return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
+ return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail});
}
}
}
+sub exit_retry_unlocked {
+ Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED);
+ exit(EX_RETRY_UNLOCKED);
+}
+
sub retry_count {
# Calculate the number of times an operation should be retried,
# assuming exponential backoff, and that we're willing to retry as
tryjobrecord j, binstubs: ['clean_fail']
end
assert_match /Failing mount stub was called/, err
- assert_match /clean work dirs: exit 44\n(.*arv_put.*INFO.*\n)?$/, err
+ assert_match /clean work dirs: exit 44\n.*Transient failure.* exiting 93\n(.*arv_put.*INFO.*\n)?$/, err
assert_equal SPECIAL_EXIT[:EX_RETRY_UNLOCKED], $?.exitstatus
end
fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
num_retries=self.num_retries,
overrides=kwargs.get("override_tools"))
+ kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
with SourceLine(dockerRequirement, "dockerImageId", WorkflowException):
sp = dockerRequirement["dockerImageId"].split(":")
image_name = sp[0]
- image_tag = sp[1] if len(sp) > 1 else None
+ image_tag = sp[1] if len(sp) > 1 else "latest"
images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
image_name=image_name,
if project_uuid:
args.append("--project-uuid="+project_uuid)
args.append(image_name)
- if image_tag:
- args.append(image_tag)
- logger.info("Uploading Docker image %s", ":".join(args[1:]))
+ args.append(image_tag)
+ logger.info("Uploading Docker image %s:%s", image_name, image_tag)
try:
arvados.commands.keepdocker.main(args, stdout=sys.stderr)
except SystemExit as e:
pipeline_template_uuid_pattern = re.compile(r'[a-z0-9]{5}-p5p6p-[a-z0-9]{15}')
def collectionResolver(api_client, document_loader, uri, num_retries=4):
+ if uri.startswith("keep:") or uri.startswith("arvwf:"):
+ return uri
+
if workflow_uuid_pattern.match(uri):
return "arvwf:%s#main" % (uri)
class StagingPathMapper(PathMapper):
_follow_dirs = True
+ def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
+ self.targets = set()
+ super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
+
def visit(self, obj, stagedir, basedir, copy=False, staged=False):
# type: (Dict[unicode, Any], unicode, unicode, bool) -> None
loc = obj["location"]
tgt = os.path.join(stagedir, obj["basename"])
+ basetgt, baseext = os.path.splitext(tgt)
+ n = 1
+ while tgt in self.targets:
+ n += 1
+ tgt = "%s_%i%s" % (basetgt, n, baseext)
+ self.targets.add(tgt)
if obj["class"] == "Directory":
self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
if loc.startswith("_:") or self._follow_dirs:
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+ else:
+ arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20170707200431',
- 'schema-salad==2.6.20170630075932',
+ 'cwltool==1.0.20170727112954',
+ 'schema-salad==2.6.20170712194300',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
'arvados-python-client>=0.1.20170526013812',
"uuid": "",
"portable_data_hash": "99999999999999999999999999999998+99",
"manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
- }}
+ },
+ "99999999999999999999999999999994+99": {
+ "uuid": "",
+ "portable_data_hash": "99999999999999999999999999999994+99",
+ "manifest_text": ". 99999999999999999999999999999994+99 0:0:expect_arvworkflow.cwl"
+ }
+ }
stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
'class': 'File',
'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
"nameext": ".txt",
- "nameroot": "blorp"
+ "nameroot": "blorp",
+ "size": 16
}},
'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
'listing': [
'class': 'File',
'location': u'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
"nameext": ".txt",
- "nameroot": "blorp"
+ "nameroot": "blorp",
+ "size": 16
},
'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
{'basename': 'renamed.txt',
- id: '#main/x'
type: File
default: {class: File, location: 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- basename: blorp.txt, nameroot: blorp, nameext: .txt}
+ size: 16, basename: blorp.txt, nameroot: blorp, nameext: .txt}
- id: '#main/y'
type: Directory
default: {class: Directory, location: 'keep:99999999999999999999999999999998+99',
$self->{'req'} = new HTTP::Request (%req);
$self->{'req'}->header('Authorization' => ('OAuth2 ' . $self->{'authToken'})) if $self->{'authToken'};
$self->{'req'}->header('Accept' => 'application/json');
+
+ # allow_nonref lets us encode JSON::true and JSON::false, see #12078
+ my $json = JSON->new->allow_nonref;
my ($p, $v);
while (($p, $v) = each %{$self->{'queryParams'}}) {
- $content{$p} = (ref($v) eq "") ? $v : JSON::encode_json($v);
+ $content{$p} = (ref($v) eq "") ? $v : $json->encode($v);
}
my $content;
while (($p, $v) = each %content) {
group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
+job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
+container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
def clear_tmpdir(path=None):
before_filter :check_auth_header
def check_auth_header
- mgmt_token = Rails.configuration.management_token
+ mgmt_token = Rails.configuration.ManagementToken
auth_header = request.headers['Authorization']
if !mgmt_token
include CommonApiTemplate
include CanBeAnOwner
+ # To avoid upgrade bugs, when changing the permission cache value
+ # format, change PERM_CACHE_PREFIX too:
+ PERM_CACHE_PREFIX = "perm_v20170725_"
+ PERM_CACHE_TTL = 172800
+
serialize :prefs, Hash
has_many :api_client_authorizations
validates(:username,
timestamp = DbCurrentTime::db_current_time.to_i if timestamp.nil?
connection.execute "NOTIFY invalidate_permissions_cache, '#{timestamp}'"
else
- Rails.cache.delete_matched(/^groups_for_user_/)
+ Rails.cache.delete_matched(/^#{PERM_CACHE_PREFIX}/)
end
end
).rows.each do |group_uuid, max_p_val|
group_perms[group_uuid] = PERMS_FOR_VAL[max_p_val.to_i]
end
- Rails.cache.write "groups_for_user_#{self.uuid}", group_perms
+ Rails.cache.write "#{PERM_CACHE_PREFIX}#{self.uuid}", group_perms, expires_in: PERM_CACHE_TTL
group_perms
end
# and perm_hash[:write] are true if this user can read and write
# objects owned by group_uuid.
def group_permissions
- r = Rails.cache.read "groups_for_user_#{self.uuid}"
+ r = Rails.cache.read "#{PERM_CACHE_PREFIX}#{self.uuid}"
if r.nil?
if Rails.configuration.async_permissions_update
while r.nil?
sleep(0.1)
- r = Rails.cache.read "groups_for_user_#{self.uuid}"
+ r = Rails.cache.read "#{PERM_CACHE_PREFIX}#{self.uuid}"
end
else
r = calculate_group_permissions
# Token to be included in all healthcheck requests. Disabled by default.
# Server expects request header of the format "Authorization: Bearer xxx"
- management_token: false
+ ManagementToken: false
development:
force_ssl: false
[true, 'Bearer configuredmanagementtoken', 200, '{"health":"OK"}'],
].each do |enabled, header, error_code, error_msg|
test "ping when #{if enabled then 'enabled' else 'disabled' end} with header '#{header}'" do
- Rails.configuration.management_token = 'configuredmanagementtoken' if enabled
+ Rails.configuration.ManagementToken = 'configuredmanagementtoken' if enabled
@request.headers['Authorization'] = header
get :ping
APIHost: arvadostest.APIHost(),
Insecure: true,
},
- Listen: ":0",
- GitCommand: "/usr/bin/git",
- RepoRoot: s.tmpRepoRoot,
+ Listen: ":0",
+ GitCommand: "/usr/bin/git",
+ RepoRoot: s.tmpRepoRoot,
+ ManagementToken: arvadostest.ManagementToken,
}
}
// Server configuration
type Config struct {
- Client arvados.Client
- Listen string
- GitCommand string
- RepoRoot string
- GitoliteHome string
+ Client arvados.Client
+ Listen string
+ GitCommand string
+ RepoRoot string
+ GitoliteHome string
+ ManagementToken string
}
var theConfig = defaultConfig()
cfgPath := flag.String("config", defaultCfgPath, "Configuration file `path`.")
dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+
+ flag.StringVar(&theConfig.ManagementToken, "management-token", theConfig.ManagementToken,
+ "Authorization token to be included in all health check requests.")
+
flag.Usage = usage
flag.Parse()
import (
"net/http"
+ "git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
)
func (srv *server) Start() error {
mux := http.NewServeMux()
mux.Handle("/", &authHandler{handler: newGitHandler()})
+ mux.Handle("/_health/", &health.Handler{
+ Token: theConfig.ManagementToken,
+ Prefix: "/_health/",
+ })
srv.Handler = mux
srv.Addr = theConfig.Listen
return srv.Server.Start()
package main
import (
+ "net/http"
+ "net/http/httptest"
"os"
"os/exec"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+
check "gopkg.in/check.v1"
)
c.Log(string(msg))
c.Assert(err, check.Equals, nil)
}
+
+func (s *GitSuite) TestHealthCheckPing(c *check.C) {
+ req, err := http.NewRequest("GET",
+ "http://"+s.testServer.Addr+"/_health/ping",
+ nil)
+ c.Assert(err, check.Equals, nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+
+ resp := httptest.NewRecorder()
+ s.testServer.Handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, 200)
+ c.Check(resp.Body.String(), check.Matches, `{"health":"OK"}\n`)
+}
@catch_exceptions
def on_event(self, ev):
- if 'event_type' not in ev:
+ if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
return
with llfuse.lock:
- new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
- pdh = new_attrs.get("portable_data_hash")
- # new_attributes.modified_at currently lacks
- # subsecond precision (see #6347) so use event_at
- # which should always be the same.
- stamp = ev.get("event_at")
+ properties = ev.get("properties") or {}
+ old_attrs = properties.get("old_attributes") or {}
+ new_attrs = properties.get("new_attributes") or {}
for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
item.invalidate()
- if stamp and pdh and ev.get("object_kind") == "arvados#collection":
- item.update(to_record_version=(stamp, pdh))
- else:
- item.update()
-
- oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
+ if ev.get("object_kind") == "arvados#collection":
+ pdh = new_attrs.get("portable_data_hash")
+ # new_attributes.modified_at currently lacks
+ # subsecond precision (see #6347) so use event_at
+ # which should always be the same.
+ stamp = ev.get("event_at")
+ if (stamp and pdh and item.writable() and
+ item.collection is not None and
+ item.collection.modified() and
+ new_attrs.get("is_trashed") is not True):
+ item.update(to_record_version=(stamp, pdh))
+
+ oldowner = old_attrs.get("owner_uuid")
newowner = ev.get("object_owner_uuid")
for parent in (
self.inodes.inode_cache.find_by_uuid(oldowner) +
self.inodes.inode_cache.find_by_uuid(newowner)):
- parent.invalidate()
- parent.update()
+ parent.child_event(ev)
@catch_exceptions
def getattr(self, inode, ctx=None):
self.logger.info("enable write is %s", self.args.enable_write)
def _setup_api(self):
- self.api = arvados.safeapi.ThreadSafeApiCache(
- apiconfig=arvados.config.settings(),
- keep_params={
- 'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
- 'num_retries': self.args.retries,
- })
+ try:
+ self.api = arvados.safeapi.ThreadSafeApiCache(
+ apiconfig=arvados.config.settings(),
+ keep_params={
+ 'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+ 'num_retries': self.args.retries,
+ })
+ except KeyError as e:
+ self.logger.error("Missing environment: %s", e)
+ exit(1)
# Do a sanity check that we have a working arvados host + token.
self.api.users().current().execute()
def finalize(self):
pass
+
+ def child_event(self, ev):
+ pass
self._poll_time = poll_time
self._updating_lock = threading.Lock()
self._current_user = None
+ self._full_listing = False
def want_event_subscribe(self):
return True
def uuid(self):
return self.project_uuid
+ def items(self):
+ self._full_listing = True
+ return super(ProjectDirectory, self).items()
+
+ def namefn(self, i):
+ if 'name' in i:
+ if i['name'] is None or len(i['name']) == 0:
+ return None
+ elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
+ # collection or subproject
+ return i['name']
+ elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
+ # name link
+ return i['name']
+ elif 'kind' in i and i['kind'].startswith('arvados#'):
+ # something else
+ return "{}.{}".format(i['name'], i['kind'][8:])
+ else:
+ return None
+
+
@use_counter
def update(self):
if self.project_object_file == None:
self.project_object_file = ObjectFile(self.inode, self.project_object)
self.inodes.add_entry(self.project_object_file)
- def namefn(i):
- if 'name' in i:
- if i['name'] is None or len(i['name']) == 0:
- return None
- elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
- # collection or subproject
- return i['name']
- elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
- # name link
- return i['name']
- elif 'kind' in i and i['kind'].startswith('arvados#'):
- # something else
- return "{}.{}".format(i['name'], i['kind'][8:])
- else:
- return None
+ if not self._full_listing:
+ return
def samefn(a, i):
if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
self.project_object = self.api.users().get(
uuid=self.project_uuid).execute(num_retries=self.num_retries)
- contents = arvados.util.list_all(self.api.groups().contents,
- self.num_retries, uuid=self.project_uuid)
+ contents = arvados.util.list_all(self.api.groups().list,
+ self.num_retries,
+ filters=[["owner_uuid", "=", self.project_uuid],
+ ["group_class", "=", "project"]])
+ contents.extend(arvados.util.list_all(self.api.collections().list,
+ self.num_retries,
+ filters=[["owner_uuid", "=", self.project_uuid]]))
# end with llfuse.lock_released, re-acquire lock
self.merge(contents,
- namefn,
+ self.namefn,
samefn,
self.createDirectory)
finally:
self._updating_lock.release()
+ def _add_entry(self, i, name):
+ ent = self.createDirectory(i)
+ self._entries[name] = self.inodes.add_entry(ent)
+ return self._entries[name]
+
@use_counter
@check_update
- def __getitem__(self, item):
- if item == '.arvados#project':
+ def __getitem__(self, k):
+ if k == '.arvados#project':
return self.project_object_file
- else:
- return super(ProjectDirectory, self).__getitem__(item)
+ elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
+ return super(ProjectDirectory, self).__getitem__(k)
+ with llfuse.lock_released:
+ contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+ ["group_class", "=", "project"],
+ ["name", "=", k]],
+ limit=1).execute(num_retries=self.num_retries)["items"]
+ if not contents:
+ contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+ ["name", "=", k]],
+ limit=1).execute(num_retries=self.num_retries)["items"]
+ if contents:
+ name = sanitize_filename(self.namefn(contents[0]))
+ if name != k:
+ raise KeyError(k)
+ return self._add_entry(contents[0], name)
+
+ # Didn't find item
+ raise KeyError(k)
def __contains__(self, k):
if k == '.arvados#project':
return True
- else:
- return super(ProjectDirectory, self).__contains__(k)
+ try:
+ self[k]
+ return True
+ except KeyError:
+ pass
+ return False
@use_counter
@check_update
self._entries[name_new] = ent
self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
+ @use_counter
+ def child_event(self, ev):
+ properties = ev.get("properties") or {}
+ old_attrs = properties.get("old_attributes") or {}
+ new_attrs = properties.get("new_attributes") or {}
+ old_attrs["uuid"] = ev["object_uuid"]
+ new_attrs["uuid"] = ev["object_uuid"]
+ old_name = sanitize_filename(self.namefn(old_attrs))
+ new_name = sanitize_filename(self.namefn(new_attrs))
+
+ # create events will have a new name, but not an old name
+ # delete events will have an old name, but not a new name
+ # update events will have an old and new name, and they may be same or different
+ # if they are the same, an unrelated field changed and there is nothing to do.
+
+ if old_attrs.get("owner_uuid") != self.project_uuid:
+ # Was moved from somewhere else, so don't try to remove entry.
+ old_name = None
+ if ev.get("object_owner_uuid") != self.project_uuid:
+ # Was moved to somewhere else, so don't try to add entry
+ new_name = None
+
+ if ev.get("object_kind") == "arvados#collection":
+ if old_attrs.get("is_trashed"):
+ # Was previously deleted
+ old_name = None
+ if new_attrs.get("is_trashed"):
+ # Has been deleted
+ new_name = None
+
+ if new_name != old_name:
+ ent = None
+ if old_name in self._entries:
+ ent = self._entries[old_name]
+ del self._entries[old_name]
+ self.inodes.invalidate_entry(self.inode, old_name.encode(self.inodes.encoding))
+
+ if new_name:
+ if ent is not None:
+ self._entries[new_name] = ent
+ else:
+ self._add_entry(new_attrs, new_name)
+ elif ent is not None:
+ self.inodes.del_entry(ent)
+
class SharedDirectory(Directory):
"""A special directory that represents users or groups who have shared projects with me."""
attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
+def fuseSharedTestHelper(mounttmp):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ # Double check that we can open and read objects in this folder as a file,
+ # and that its contents are what we expect.
+ baz_path = os.path.join(
+ mounttmp,
+ 'FUSE User',
+ 'FUSE Test Project',
+ 'collection in FUSE project',
+ 'baz')
+ with open(baz_path) as f:
+ self.assertEqual("baz", f.read())
+
+ # check mtime on collection
+ st = os.stat(baz_path)
+ try:
+ mtime = st.st_mtime_ns / 1000000000
+ except AttributeError:
+ mtime = st.st_mtime
+ self.assertEqual(mtime, 1391448174)
+
+ # shared_dirs is a list of the directories exposed
+ # by fuse.SharedDirectory (i.e. any object visible
+ # to the current user)
+ shared_dirs = llfuse.listdir(mounttmp)
+ shared_dirs.sort()
+ self.assertIn('FUSE User', shared_dirs)
+
+ # fuse_user_objs is a list of the objects owned by the FUSE
+ # test user (which present as files in the 'FUSE User'
+ # directory)
+ fuse_user_objs = llfuse.listdir(os.path.join(mounttmp, 'FUSE User'))
+ fuse_user_objs.sort()
+ self.assertEqual(['FUSE Test Project', # project owned by user
+ 'collection #1 owned by FUSE', # collection owned by user
+ 'collection #2 owned by FUSE' # collection owned by user
+ ], fuse_user_objs)
+
+ # test_proj_files is a list of the files in the FUSE Test Project.
+ test_proj_files = llfuse.listdir(os.path.join(mounttmp, 'FUSE User', 'FUSE Test Project'))
+ test_proj_files.sort()
+ self.assertEqual(['collection in FUSE project'
+ ], test_proj_files)
+
+
+ Test().runTest()
+
class FuseSharedTest(MountTestBase):
def runTest(self):
self.make_mount(fuse.SharedDirectory,
exclude=self.api.users().current().execute()['uuid'])
+ keep = arvados.keep.KeepClient()
+ keep.put("baz")
- # shared_dirs is a list of the directories exposed
- # by fuse.SharedDirectory (i.e. any object visible
- # to the current user)
- shared_dirs = llfuse.listdir(self.mounttmp)
- shared_dirs.sort()
- self.assertIn('FUSE User', shared_dirs)
-
- # fuse_user_objs is a list of the objects owned by the FUSE
- # test user (which present as files in the 'FUSE User'
- # directory)
- fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
- fuse_user_objs.sort()
- self.assertEqual(['FUSE Test Project', # project owned by user
- 'collection #1 owned by FUSE', # collection owned by user
- 'collection #2 owned by FUSE', # collection owned by user
- 'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user
- ], fuse_user_objs)
-
- # test_proj_files is a list of the files in the FUSE Test Project.
- test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
- test_proj_files.sort()
- self.assertEqual(['collection in FUSE project',
- 'pipeline instance in FUSE project.pipelineInstance',
- 'pipeline template in FUSE project.pipelineTemplate'
- ], test_proj_files)
-
- # Double check that we can open and read objects in this folder as a file,
- # and that its contents are what we expect.
- pipeline_template_path = os.path.join(
- self.mounttmp,
- 'FUSE User',
- 'FUSE Test Project',
- 'pipeline template in FUSE project.pipelineTemplate')
- with open(pipeline_template_path) as f:
- j = json.load(f)
- self.assertEqual("pipeline template in FUSE project", j['name'])
-
- # check mtime on template
- st = os.stat(pipeline_template_path)
- try:
- mtime = st.st_mtime_ns / 1000000000
- except AttributeError:
- mtime = st.st_mtime
- self.assertEqual(mtime, 1397493304)
-
- # check mtime on collection
- st = os.stat(os.path.join(
- self.mounttmp,
- 'FUSE User',
- 'collection #1 owned by FUSE'))
- try:
- mtime = st.st_mtime_ns / 1000000000
- except AttributeError:
- mtime = st.st_mtime
- self.assertEqual(mtime, 1391448174)
+ self.pool.apply(fuseSharedTestHelper, (self.mounttmp,))
class FuseHomeTest(MountTestBase):
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
)
type handler struct {
- Config *Config
- clientPool *arvadosclient.ClientPool
- setupOnce sync.Once
+ Config *Config
+ clientPool *arvadosclient.ClientPool
+ setupOnce sync.Once
+ healthHandler http.Handler
}
// parseCollectionIDFromDNSName returns a UUID or PDH if s begins with
func (h *handler) setup() {
h.clientPool = arvadosclient.MakeClientPool()
+
keepclient.RefreshServiceDiscoveryOnSIGHUP()
+
+ h.healthHandler = &health.Handler{
+ Token: h.Config.ManagementToken,
+ Prefix: "/_health/",
+ }
}
func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
httpserver.Log(remoteAddr, statusCode, statusText, w.WroteBodyBytes(), r.Method, r.Host, r.URL.Path, r.URL.RawQuery)
}()
+ if strings.HasPrefix(r.URL.Path, "/_health/") && r.Method == "GET" {
+ h.healthHandler.ServeHTTP(w, r)
+ return
+ }
+
if r.Method == "OPTIONS" {
method := r.Header.Get("Access-Control-Request-Method")
if method != "GET" && method != "POST" {
}
}
}
+
+func (s *IntegrationSuite) TestHealthCheckPing(c *check.C) {
+ s.testServer.Config.ManagementToken = arvadostest.ManagementToken
+ authHeader := http.Header{
+ "Authorization": {"Bearer " + arvadostest.ManagementToken},
+ }
+
+ resp := httptest.NewRecorder()
+ u := mustParseURL("http://download.example.com/_health/ping")
+ req := &http.Request{
+ Method: "GET",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: authHeader,
+ }
+ s.testServer.Handler.ServeHTTP(resp, req)
+
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Matches, `{"health":"OK"}\n`)
+}
// Hack to support old command line flag, which is a bool
// meaning "get actual token from environment".
deprecatedAllowAnonymous bool
+
+ //Authorization token to be included in all health check requests.
+ ManagementToken string
}
// DefaultConfig returns the default configuration.
"Only serve attachments at the given `host:port`"+deprecated)
flag.BoolVar(&cfg.TrustAllContent, "trust-all-content", false,
"Serve non-public content from a single origin. Dangerous: read docs before using!"+deprecated)
+ flag.StringVar(&cfg.ManagementToken, "management-token", "",
+ "Authorization token to be included in all health check requests.")
+
dumpConfig := flag.Bool("dump-config", false,
"write current configuration to stdout and exit")
flag.Usage = usage
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/coreos/go-systemd/daemon"
"github.com/ghodss/yaml"
Timeout arvados.Duration
PIDFile string
Debug bool
+ ManagementToken string
}
func DefaultConfig() *Config {
flagset.IntVar(&cfg.DefaultReplicas, "default-replicas", cfg.DefaultReplicas, "Default number of replicas to write if not specified by the client. If 0, use site default."+deprecated)
flagset.StringVar(&cfg.PIDFile, "pid", cfg.PIDFile, "Path to write pid file."+deprecated)
timeoutSeconds := flagset.Int("timeout", int(time.Duration(cfg.Timeout)/time.Second), "Timeout (in seconds) on requests to internal Keep services."+deprecated)
+ flagset.StringVar(&cfg.ManagementToken, "management-token", cfg.ManagementToken, "Authorization token to be included in all health check requests.")
var cfgPath string
const defaultCfgPath = "/etc/arvados/keepproxy/keepproxy.yml"
signal.Notify(term, syscall.SIGINT)
// Start serving requests.
- router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout))
+ router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
http.Serve(listener, router)
log.Println("shutting down")
// MakeRESTRouter returns an http.Handler that passes GET and PUT
// requests to the appropriate handlers.
-func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient, timeout time.Duration) http.Handler {
+func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient, timeout time.Duration, mgmtToken string) http.Handler {
rest := mux.NewRouter()
transport := *(http.DefaultTransport.(*http.Transport))
rest.HandleFunc(`/`, h.Options).Methods("OPTIONS")
}
+ rest.Handle("/_health/{check}", &health.Handler{
+ Token: mgmtToken,
+ Prefix: "/_health/",
+ }).Methods("GET")
+
rest.NotFoundHandler = InvalidPathHandler{}
return h
}
// fixes the invalid Content-Length header. In order to test
// our server behavior, we have to call the handler directly
// using an httptest.ResponseRecorder.
- rtr := MakeRESTRouter(true, true, kc, 10*time.Second)
+ rtr := MakeRESTRouter(true, true, kc, 10*time.Second, "")
type testcase struct {
sendLength string
c.Check(err, ErrorMatches, `.*HTTP 502.*`)
}
}
+
+func (s *ServerRequiredSuite) TestPing(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+
+ rtr := MakeRESTRouter(true, true, kc, 10*time.Second, arvadostest.ManagementToken)
+
+ req, err := http.NewRequest("GET",
+ "http://"+listener.Addr().String()+"/_health/ping",
+ nil)
+ c.Assert(err, IsNil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+
+ resp := httptest.NewRecorder()
+ rtr.ServeHTTP(resp, req)
+ c.Check(resp.Code, Equals, 200)
+ c.Assert(strings.Contains(resp.Body.String(), `{"health":"OK"}`), Equals, true)
+}
Enable debug logging.
+ManagementToken:
+
+ Authorization token to be included in all health check requests.
+
`, exampleConfigFile)
}
&s3UnsafeDelete,
"s3-unsafe-delete",
false,
- "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
+ "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
}
// S3Volume implements Volume using an S3 bucket.
'watchdog': '600',
'node_mem_scaling': '0.95'},
'Manage': {'address': '127.0.0.1',
- 'port': '-1'},
+ 'port': '-1',
+ 'ManagementToken': ''},
'Logging': {'file': '/dev/stderr',
'level': 'WARNING'}
}.iteritems():
import logging
import subprocess
+import arvados.util
+
from . import clientactor
from .config import ARVADOS_ERRORS
+
class ServerCalculator(object):
"""Generate cloud server wishlists from an Arvados job queue.
self.max_nodes = max_nodes or float('inf')
self.max_price = max_price or float('inf')
self.logger = logging.getLogger('arvnodeman.jobqueue')
- self.logged_jobs = set()
self.logger.info("Using cloud node sizes:")
for s in self.cloud_sizes:
def servers_for_queue(self, queue):
servers = []
- seen_jobs = set()
+ unsatisfiable_jobs = {}
for job in queue:
- seen_jobs.add(job['uuid'])
constraints = job['runtime_constraints']
want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
cloud_size = self.cloud_size_for_constraints(constraints)
if cloud_size is None:
- if job['uuid'] not in self.logged_jobs:
- self.logged_jobs.add(job['uuid'])
- self.logger.debug("job %s not satisfiable", job['uuid'])
- elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
+ unsatisfiable_jobs[job['uuid']] = (
+ 'Requirements for a single node exceed the available '
+ 'cloud node size')
+ elif (want_count > self.max_nodes):
+ unsatisfiable_jobs[job['uuid']] = (
+ "Job's min_nodes constraint is greater than the configured "
+ "max_nodes (%d)" % self.max_nodes)
+ elif (want_count*cloud_size.price <= self.max_price):
servers.extend([cloud_size.real] * want_count)
- self.logged_jobs.intersection_update(seen_jobs)
- return servers
+ else:
+ unsatisfiable_jobs[job['uuid']] = (
+ "Job's price (%d) is above system's max_price "
+ "limit (%d)" % (want_count*cloud_size.price, self.max_price))
+ return (servers, unsatisfiable_jobs)
def cheapest_size(self):
return self.cloud_sizes[0]
return s
return None
+
class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
"""Actor to generate server wishlists from the job queue.
for out in squeue_out.splitlines():
try:
cpu, ram, disk, reason, jobname = out.split("|", 4)
- if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
+ if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
queuelist.append({
"uuid": jobname,
"runtime_constraints": {
return queuelist
def _got_response(self, queue):
- server_list = self._calculator.servers_for_queue(queue)
+ server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
+ # Cancel any job/container with unsatisfiable requirements, emitting
+ # a log explaining why.
+ for job_uuid, reason in unsatisfiable_jobs.iteritems():
+ try:
+ self._client.logs().create(body={
+ 'object_uuid': job_uuid,
+ 'event_type': 'stderr',
+ 'properties': {'text': reason},
+ }).execute()
+ # Cancel the job depending on its type
+ if arvados.util.container_uuid_pattern.match(job_uuid):
+ subprocess.check_call(['scancel', '--name='+job_uuid])
+ elif arvados.util.job_uuid_pattern.match(job_uuid):
+ self._client.jobs().cancel(uuid=job_uuid).execute()
+ else:
+ raise Exception('Unknown job type')
+ self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid)
+ except Exception as error:
+ self._logger.error("Trying to cancel job '%s': %s",
+ job_uuid,
+ error)
self._logger.debug("Calculated wishlist: %s",
', '.join(s.name for s in server_list) or "(empty)")
return super(JobQueueMonitorActor, self)._got_response(server_list)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(tracker.get_json())
+ elif self.path == '/_health/ping':
+ code, msg = self.check_auth()
+
+ if code != 200:
+ self.send_response(code)
+ self.wfile.write(msg)
+ else:
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.end_headers()
+ self.wfile.write(json.dumps({"health":"OK"}))
else:
self.send_response(404)
def log_message(self, fmt, *args, **kwargs):
_logger.info(fmt, *args, **kwargs)
+ def check_auth(self):
+ mgmt_token = self.server._config.get('Manage', 'ManagementToken')
+ auth_header = self.headers.get('Authorization', None)
+
+ if mgmt_token == '':
+ return 404, "disabled"
+ elif auth_header == None:
+ return 401, "authorization required"
+ elif auth_header != 'Bearer '+mgmt_token:
+ return 403, "authorization error"
+ return 200, ""
class Tracker(object):
def __init__(self):
fake_slurm = None
compute_nodes = None
all_jobs = None
+unsatisfiable_job_scancelled = None
def update_script(path, val):
with open(path+"_", "w") as f:
"\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
return 0
+def set_queue_unsatisfiable(g):
+ global all_jobs, unsatisfiable_job_scancelled
+ # Simulate a job requesting a 99 core node.
+ update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
+ "\n".join("echo '99|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+ update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
+ "\ntouch %s" % unsatisfiable_job_scancelled)
+ return 0
+
+def job_cancelled(g):
+ global unsatisfiable_job_scancelled
+ cancelled_job = g.group(1)
+ api = arvados.api('v1')
+ # Check that 'scancel' was called
+ if not os.path.isfile(unsatisfiable_job_scancelled):
+ return 1
+ # Check for the log entry
+ log_entry = api.logs().list(
+ filters=[
+ ['object_uuid', '=', cancelled_job],
+ ['event_type', '=', 'stderr'],
+ ]).execute()['items'][0]
+ if not re.match(
+ r"Requirements for a single node exceed the available cloud node size",
+ log_entry['properties']['text']):
+ return 1
+ return 0
def node_paired(g):
global compute_nodes
def run_test(name, actions, checks, driver_class, jobs, provider):
code = 0
+ global unsatisfiable_job_scancelled
+ unsatisfiable_job_scancelled = os.path.join(tempfile.mkdtemp(),
+ "scancel_called")
# Delete any stale node records
api = arvados.api('v1')
# Test main loop:
# - Read line
- # - Apply negative checks (thinks that are not supposed to happen)
+ # - Apply negative checks (things that are not supposed to happen)
# - Check timeout
# - Check if the next action should trigger
# - If all actions are exhausted, terminate with test success
code = 1
shutil.rmtree(fake_slurm)
+ shutil.rmtree(os.path.dirname(unsatisfiable_job_scancelled))
if code == 0:
logger.info("%s passed", name)
# Test lifecycle.
tests = {
+ "test_unsatisfiable_jobs" : (
+ # Actions (pattern -> action)
+ [
+ (r".*Daemon started", set_queue_unsatisfiable),
+ (r".*Cancelled unsatisfiable job '(\S+)'", job_cancelled),
+ ],
+ # Checks (things that shouldn't happen)
+ {
+ r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": fail,
+ r".*Trying to cancel job '(\S+)'": fail,
+ },
+ # Driver class
+ "arvnodeman.test.fake_driver.FakeDriver",
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_single_node_azure": (
[
(r".*Daemon started", set_squeue),
def test_empty_queue_needs_no_servers(self):
servcalc = self.make_calculator([1])
- self.assertEqual([], servcalc.servers_for_queue([]))
+ self.assertEqual(([], {}), servcalc.servers_for_queue([]))
def test_easy_server_count(self):
servcalc = self.make_calculator([1])
- servlist = self.calculate(servcalc, {'min_nodes': 3})
+ servlist, _ = self.calculate(servcalc, {'min_nodes': 3})
self.assertEqual(3, len(servlist))
def test_default_5pct_ram_value_decrease(self):
servcalc = self.make_calculator([1])
- servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
+ servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
self.assertEqual(0, len(servlist))
- servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 121})
+ servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 121})
self.assertEqual(1, len(servlist))
def test_custom_node_mem_scaling_factor(self):
# Simulate a custom 'node_mem_scaling' config parameter by passing
# the value to ServerCalculator
servcalc = self.make_calculator([1], node_mem_scaling=0.5)
- servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
+ servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
self.assertEqual(0, len(servlist))
- servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 64})
+ servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 64})
self.assertEqual(1, len(servlist))
def test_implicit_server_count(self):
servcalc = self.make_calculator([1])
- servlist = self.calculate(servcalc, {}, {'min_nodes': 3})
+ servlist, _ = self.calculate(servcalc, {}, {'min_nodes': 3})
self.assertEqual(4, len(servlist))
def test_bad_min_nodes_override(self):
servcalc = self.make_calculator([1])
- servlist = self.calculate(servcalc,
- {'min_nodes': -2}, {'min_nodes': 'foo'})
+ servlist, _ = self.calculate(servcalc,
+ {'min_nodes': -2}, {'min_nodes': 'foo'})
self.assertEqual(2, len(servlist))
- def test_ignore_unsatisfiable_jobs(self):
+ def test_ignore_and_return_unsatisfiable_jobs(self):
servcalc = self.make_calculator([1], max_nodes=9)
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 2},
- {'min_ram_mb_per_node': 256},
- {'min_nodes': 6},
- {'min_nodes': 12},
- {'min_scratch_mb_per_node': 300000})
+ servlist, u_jobs = self.calculate(servcalc,
+ {'min_cores_per_node': 2},
+ {'min_ram_mb_per_node': 256},
+ {'min_nodes': 6},
+ {'min_nodes': 12},
+ {'min_scratch_mb_per_node': 300000})
self.assertEqual(6, len(servlist))
+ # Only unsatisfiable jobs are returned on u_jobs
+ self.assertIn('zzzzz-jjjjj-000000000000000', u_jobs.keys())
+ self.assertIn('zzzzz-jjjjj-000000000000001', u_jobs.keys())
+ self.assertNotIn('zzzzz-jjjjj-000000000000002', u_jobs.keys())
+ self.assertIn('zzzzz-jjjjj-000000000000003', u_jobs.keys())
+ self.assertIn('zzzzz-jjjjj-000000000000004', u_jobs.keys())
def test_ignore_too_expensive_jobs(self):
servcalc = self.make_calculator([1, 2], max_nodes=12, max_price=6)
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 1, 'min_nodes': 6})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 1, 'min_nodes': 6})
self.assertEqual(6, len(servlist))
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 2, 'min_nodes': 6})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 2, 'min_nodes': 6})
self.assertEqual(0, len(servlist))
def test_job_requesting_max_nodes_accepted(self):
servcalc = self.make_calculator([1], max_nodes=4)
- servlist = self.calculate(servcalc, {'min_nodes': 4})
+ servlist, _ = self.calculate(servcalc, {'min_nodes': 4})
self.assertEqual(4, len(servlist))
def test_cheapest_size(self):
def test_next_biggest(self):
servcalc = self.make_calculator([1, 2, 4, 8])
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 3},
- {'min_cores_per_node': 6})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 3},
+ {'min_cores_per_node': 6})
self.assertEqual([servcalc.cloud_sizes[2].id,
servcalc.cloud_sizes[3].id],
[s.id for s in servlist])
def test_multiple_sizes(self):
servcalc = self.make_calculator([1, 2])
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 2},
- {'min_cores_per_node': 1},
- {'min_cores_per_node': 1})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 2},
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 1})
self.assertEqual([servcalc.cloud_sizes[1].id,
servcalc.cloud_sizes[0].id,
servcalc.cloud_sizes[0].id],
[s.id for s in servlist])
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 1},
- {'min_cores_per_node': 2},
- {'min_cores_per_node': 1})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 2},
+ {'min_cores_per_node': 1})
self.assertEqual([servcalc.cloud_sizes[0].id,
servcalc.cloud_sizes[1].id,
servcalc.cloud_sizes[0].id],
[s.id for s in servlist])
- servlist = self.calculate(servcalc,
- {'min_cores_per_node': 1},
- {'min_cores_per_node': 1},
- {'min_cores_per_node': 2})
+ servlist, _ = self.calculate(servcalc,
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 2})
self.assertEqual([servcalc.cloud_sizes[0].id,
servcalc.cloud_sizes[0].id,
servcalc.cloud_sizes[1].id],
unittest.TestCase):
TEST_CLASS = jobqueue.JobQueueMonitorActor
+
class MockCalculator(object):
@staticmethod
def servers_for_queue(queue):
- return [testutil.MockSize(n) for n in queue]
+ return ([testutil.MockSize(n) for n in queue], {})
+
+
+ class MockCalculatorUnsatisfiableJobs(object):
+ @staticmethod
+ def servers_for_queue(queue):
+ return ([], {k["uuid"]: "Unsatisfiable job mock" for k in queue})
def build_monitor(self, side_effect, *args, **kwargs):
super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs)
self.client.jobs().queue().execute.side_effect = side_effect
+ @mock.patch("subprocess.check_call")
+ @mock.patch("subprocess.check_output")
+ def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
+ #mock_scancel.return_value = ""
+ job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
+ container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
+ mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
+
+ self.build_monitor([{'items': [{'uuid': job_uuid}]}],
+ self.MockCalculatorUnsatisfiableJobs(), True, True)
+ self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
+ self.client.jobs().cancel.assert_called_with(uuid=job_uuid)
+ mock_scancel.assert_called_with(['scancel', '--name='+container_uuid])
+
@mock.patch("subprocess.check_output")
def test_subscribers_get_server_lists(self, mock_squeue):
mock_squeue.return_value = ""
from __future__ import absolute_import, print_function
from future import standard_library
+import json
import requests
import unittest
class TestServer(object):
+ def __init__(self, management_token=None):
+ self.mgmt_token = management_token
+
def __enter__(self):
cfg = config.NodeManagerConfig()
cfg.set('Manage', 'port', '0')
cfg.set('Manage', 'address', '127.0.0.1')
+ if self.mgmt_token != None:
+ cfg.set('Manage', 'ManagementToken', self.mgmt_token)
self.srv = status.Server(cfg)
self.srv.start()
addr, port = self.srv.server_address
def get_status(self):
return self.get_status_response().json()
+ def get_healthcheck_ping(self, auth_header=None):
+ headers = {}
+ if auth_header != None:
+ headers['Authorization'] = auth_header
+ return requests.get(self.srv_base+'/_health/ping', headers=headers)
class StatusServerUpdates(unittest.TestCase):
def test_updates(self):
self.srv.start()
self.assertFalse(self.srv.enabled)
self.assertFalse(getattr(self.srv, '_thread', False))
+
+class HealthcheckPing(unittest.TestCase):
+ def test_ping_disabled(self):
+ with TestServer() as srv:
+ r = srv.get_healthcheck_ping()
+ self.assertEqual(404, r.status_code)
+
+ def test_ping_no_auth(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping()
+ self.assertEqual(401, r.status_code)
+
+ def test_ping_bad_auth_format(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('noBearer')
+ self.assertEqual(403, r.status_code)
+
+ def test_ping_bad_auth_token(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('Bearer badtoken')
+ self.assertEqual(403, r.status_code)
+
+ def test_ping_success(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('Bearer configuredmanagementtoken')
+ self.assertEqual(200, r.status_code)
+ self.assertEqual('application/json', r.headers['content-type'])
+ resp = r.json()
+ self.assertEqual('{"health": "OK"}', json.dumps(resp))
"package": [
{
"checksumSHA1": "jf7K+UTQNIzRdlG5F4zX/8b++/E=",
+ "origin": "github.com/curoverse/goamz/aws",
"path": "github.com/AdRoll/goamz/aws",
- "revision": "c5d7d9bd6c743fae44efc6c18450282022445ffc",
- "revisionTime": "2017-02-25T09:28:51Z"
+ "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+ "revisionTime": "2017-07-27T13:52:37Z"
},
{
- "checksumSHA1": "ey9ddXTW9dncjJz/COKpeYm+sgg=",
+ "checksumSHA1": "9nUwQXI+pNxZo6bnR7NslpMpfPI=",
+ "origin": "github.com/curoverse/goamz/s3",
"path": "github.com/AdRoll/goamz/s3",
- "revision": "c5d7d9bd6c743fae44efc6c18450282022445ffc",
- "revisionTime": "2017-02-25T09:28:51Z"
+ "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+ "revisionTime": "2017-07-27T13:52:37Z"
},
{
- "checksumSHA1": "pDHYVqUQtRsPYw/X4kUrdK7pxMs=",
+ "checksumSHA1": "tvxbsTkdjB0C/uxEglqD6JfVnMg=",
+ "origin": "github.com/curoverse/goamz/s3/s3test",
"path": "github.com/AdRoll/goamz/s3/s3test",
- "revision": "c5d7d9bd6c743fae44efc6c18450282022445ffc",
- "revisionTime": "2017-02-25T09:28:51Z"
+ "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+ "revisionTime": "2017-07-27T13:52:37Z"
},
{
"checksumSHA1": "Rjy2uYZkQ8Kjht6ZFU0qzm2I/kI=",
"revision": "1620af6b32398bfc91827ceae54a8cc1f55df04d",
"revisionTime": "2016-12-14T20:08:43Z"
},
- {
- "checksumSHA1": "qjY3SPlNvqT179DPiRaIsRhYZQI=",
- "origin": "github.com/docker/docker/vendor/github.com/docker/distribution",
- "path": "github.com/docker/distribution",
- "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
- "revisionTime": "2017-05-17T20:48:28Z"
- },
- {
- "checksumSHA1": "0au+tD+jymXNssdb1JgcctY7PN4=",
- "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/context",
- "path": "github.com/docker/distribution/context",
- "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
- "revisionTime": "2017-05-17T20:48:28Z"
- },
{
"checksumSHA1": "Gj+xR1VgFKKmFXYOJMnAczC3Znk=",
"origin": "github.com/docker/docker/vendor/github.com/docker/distribution/digestset",
"revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
"revisionTime": "2017-05-17T20:48:28Z"
},
- {
- "checksumSHA1": "oYy5Q1HBImMQvh9t96cmNzWar80=",
- "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/manifest",
- "path": "github.com/docker/distribution/manifest",
- "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
- "revisionTime": "2017-05-17T20:48:28Z"
- },
- {
- "checksumSHA1": "SK1g7ll2cPbgDyWpK0oVT9beVZY=",
- "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/manifest/manifestlist",
- "path": "github.com/docker/distribution/manifest/manifestlist",
- "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
- "revisionTime": "2017-05-17T20:48:28Z"
- },
{
"checksumSHA1": "m4wEFD0Mh+ClfprUqgl0GyNmk7Q=",
"origin": "github.com/docker/docker/vendor/github.com/docker/distribution/reference",
"revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
"revisionTime": "2017-05-17T20:48:28Z"
},
- {
- "checksumSHA1": "cNp7rNReJHvdSfrIetXS9RGsLSo=",
- "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/uuid",
- "path": "github.com/docker/distribution/uuid",
- "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
- "revisionTime": "2017-05-17T20:48:28Z"
- },
{
"checksumSHA1": "5b7eC73lORtIUFCjz548jXkLlKU=",
"path": "github.com/docker/docker/api",
"revisionTime": "2017-03-24T20:46:54Z"
},
{
- "checksumSHA1": "Gk3jTNQ5uGDUE0WMJFWcYz9PMps=",
+ "checksumSHA1": "q5SZBWFVC3wOIzftf+l/h5WLG1k=",
"path": "github.com/lib/pq/oid",
"revision": "2704adc878c21e1329f46f6e56a1c387d788ff94",
"revisionTime": "2017-03-24T20:46:54Z"
"revisionTime": "2017-05-12T22:20:15Z"
},
{
- "checksumSHA1": "ENl6I8+3AaBanbn9CVExMjDTHPc=",
+ "checksumSHA1": "dUfdXzRJupI9VpqNR2LlppeZvLc=",
"origin": "github.com/docker/docker/vendor/golang.org/x/sys/unix",
"path": "golang.org/x/sys/unix",
"revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",