before_filter :check_auth_header
def check_auth_header
- mgmt_token = Rails.configuration.management_token
+ mgmt_token = Rails.configuration.ManagementToken
auth_header = request.headers['Authorization']
if !mgmt_token
# Token to be included in all healthcheck requests. Disabled by default.
# Workbench expects request header of the format "Authorization: Bearer xxx"
- management_token: false
+ ManagementToken: false
[true, 'Bearer configuredmanagementtoken', 200, '{"health":"OK"}'],
].each do |enabled, header, error_code, error_msg|
test "ping when #{if enabled then 'enabled' else 'disabled' end} with header '#{header}'" do
- Rails.configuration.management_token = 'configuredmanagementtoken' if enabled
+ Rails.configuration.ManagementToken = 'configuredmanagementtoken' if enabled
@request.headers['Authorization'] = header
get :ping
$cmd = [$docker_bin, 'ps', '-q'];
}
Log(undef, "Sanity check is `@$cmd`");
-my ($exited, $stdout, $stderr) = srun_sync(
+my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
$cmd,
{label => "sanity check"});
# Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
# up work directories crunch_tmp/work, crunch_tmp/opt,
# crunch_tmp/src*.
- my ($exited, $stdout, $stderr) = srun_sync(
+ my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
['bash', '-ec', q{
arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
}],
{label => "clean work dirs"});
if ($exited != 0) {
- exit(EX_RETRY_UNLOCKED);
+ exit_retry_unlocked();
}
}
echo >&2 "image loaded successfully"
};
- my ($exited, $stdout, $stderr) = srun_sync(
+ my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodelist=" . join(',', @node)],
["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
{label => "load docker image"});
if ($exited != 0)
{
- exit(EX_RETRY_UNLOCKED);
+ exit_retry_unlocked();
}
# Determine whether this version of Docker supports memory+swap limits.
- ($exited, $stdout, $stderr) = srun_sync(
+ ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodes=1"],
[$docker_bin, 'run', '--help'],
{label => "check --memory-swap feature"});
+ if ($tempfail) {
+ exit_retry_unlocked();
+ }
$docker_limitmem = ($stdout =~ /--memory-swap/);
# Find a non-root Docker user to use.
$label = "check whether user '$try_user' is UID 0";
$try_user_arg = "--user=$try_user";
}
- my ($exited, $stdout, $stderr) = srun_sync(
+ my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodes=1"],
["/bin/sh", "-ec",
"$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
Log(undef, "Container will run with $dockeruserarg");
}
last;
+ } elsif ($tempfail) {
+ exit_retry_unlocked();
}
}
"mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
$ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
- my ($stdout, $stderr);
- ($exited, $stdout, $stderr) = srun_sync(
+ my ($stdout, $stderr, $tempfail);
+ ($exited, $stdout, $stderr, $tempfail) = srun_sync(
\@srunargs, \@execargs,
{label => "run install script on all workers"},
- $build_script . $git_archive);
+ $build_script . $git_archive);
+ if ($tempfail) {
+ exit_retry_unlocked();
+ }
my $stderr_anything_from_script = 0;
for my $line (split(/\n/, $stderr)) {
} elsif ($working_slot_count < 1) {
save_output_collection();
save_meta();
- exit(EX_RETRY_UNLOCKED);
+ exit_retry_unlocked();
} elsif ($thisround_succeeded == 0 &&
($thisround_failed == 0 || $thisround_failed > 4)) {
my $message = "stop because $thisround_failed tasks failed and none succeeded";
if ($main::please_freeze || $j->{tempfail}) {
$exited ||= 255;
}
- return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
+ return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail});
}
}
}
+sub exit_retry_unlocked {
+ Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED);
+ exit(EX_RETRY_UNLOCKED);
+}
+
sub retry_count {
# Calculate the number of times an operation should be retried,
# assuming exponential backoff, and that we're willing to retry as
tryjobrecord j, binstubs: ['clean_fail']
end
assert_match /Failing mount stub was called/, err
- assert_match /clean work dirs: exit 44\n(.*arv_put.*INFO.*\n)?$/, err
+ assert_match /clean work dirs: exit 44\n.*Transient failure.* exiting 93\n(.*arv_put.*INFO.*\n)?$/, err
assert_equal SPECIAL_EXIT[:EX_RETRY_UNLOCKED], $?.exitstatus
end
before_filter :check_auth_header
def check_auth_header
- mgmt_token = Rails.configuration.management_token
+ mgmt_token = Rails.configuration.ManagementToken
auth_header = request.headers['Authorization']
if !mgmt_token
include CommonApiTemplate
include CanBeAnOwner
+ # To avoid upgrade bugs, when changing the permission cache value
+ # format, change PERM_CACHE_PREFIX too:
+ PERM_CACHE_PREFIX = "perm_v20170725_"
+ PERM_CACHE_TTL = 172800
+
serialize :prefs, Hash
has_many :api_client_authorizations
validates(:username,
timestamp = DbCurrentTime::db_current_time.to_i if timestamp.nil?
connection.execute "NOTIFY invalidate_permissions_cache, '#{timestamp}'"
else
- Rails.cache.delete_matched(/^groups_for_user_/)
+ Rails.cache.delete_matched(/^#{PERM_CACHE_PREFIX}/)
end
end
).rows.each do |group_uuid, max_p_val|
group_perms[group_uuid] = PERMS_FOR_VAL[max_p_val.to_i]
end
- Rails.cache.write "groups_for_user_#{self.uuid}", group_perms
+ Rails.cache.write "#{PERM_CACHE_PREFIX}#{self.uuid}", group_perms, expires_in: PERM_CACHE_TTL
group_perms
end
# and perm_hash[:write] are true if this user can read and write
# objects owned by group_uuid.
def group_permissions
- r = Rails.cache.read "groups_for_user_#{self.uuid}"
+ r = Rails.cache.read "#{PERM_CACHE_PREFIX}#{self.uuid}"
if r.nil?
if Rails.configuration.async_permissions_update
while r.nil?
sleep(0.1)
- r = Rails.cache.read "groups_for_user_#{self.uuid}"
+ r = Rails.cache.read "#{PERM_CACHE_PREFIX}#{self.uuid}"
end
else
r = calculate_group_permissions
# Token to be included in all healthcheck requests. Disabled by default.
# Server expects request header of the format "Authorization: Bearer xxx"
- management_token: false
+ ManagementToken: false
development:
force_ssl: false
[true, 'Bearer configuredmanagementtoken', 200, '{"health":"OK"}'],
].each do |enabled, header, error_code, error_msg|
test "ping when #{if enabled then 'enabled' else 'disabled' end} with header '#{header}'" do
- Rails.configuration.management_token = 'configuredmanagementtoken' if enabled
+ Rails.configuration.ManagementToken = 'configuredmanagementtoken' if enabled
@request.headers['Authorization'] = header
get :ping
APIHost: arvadostest.APIHost(),
Insecure: true,
},
- Listen: ":0",
- GitCommand: "/usr/bin/git",
- RepoRoot: s.tmpRepoRoot,
+ Listen: ":0",
+ GitCommand: "/usr/bin/git",
+ RepoRoot: s.tmpRepoRoot,
+ ManagementToken: arvadostest.ManagementToken,
}
}
// Server configuration
type Config struct {
- Client arvados.Client
- Listen string
- GitCommand string
- RepoRoot string
- GitoliteHome string
+ Client arvados.Client
+ Listen string
+ GitCommand string
+ RepoRoot string
+ GitoliteHome string
+ ManagementToken string
}
var theConfig = defaultConfig()
cfgPath := flag.String("config", defaultCfgPath, "Configuration file `path`.")
dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+
+ flag.StringVar(&theConfig.ManagementToken, "management-token", theConfig.ManagementToken,
+ "Authorization token to be included in all health check requests.")
+
flag.Usage = usage
flag.Parse()
import (
"net/http"
+ "git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
)
func (srv *server) Start() error {
mux := http.NewServeMux()
mux.Handle("/", &authHandler{handler: newGitHandler()})
+ mux.Handle("/_health/", &health.Handler{
+ Token: theConfig.ManagementToken,
+ Prefix: "/_health/",
+ })
srv.Handler = mux
srv.Addr = theConfig.Listen
return srv.Server.Start()
package main
import (
+ "net/http"
+ "net/http/httptest"
"os"
"os/exec"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+
check "gopkg.in/check.v1"
)
c.Log(string(msg))
c.Assert(err, check.Equals, nil)
}
+
+func (s *GitSuite) TestHealthCheckPing(c *check.C) {
+ req, err := http.NewRequest("GET",
+ "http://"+s.testServer.Addr+"/_health/ping",
+ nil)
+ c.Assert(err, check.Equals, nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+
+ resp := httptest.NewRecorder()
+ s.testServer.Handler.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, 200)
+ c.Check(resp.Body.String(), check.Matches, `{"health":"OK"}\n`)
+}
@catch_exceptions
def on_event(self, ev):
- if 'event_type' not in ev:
+ if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
return
with llfuse.lock:
- new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
- pdh = new_attrs.get("portable_data_hash")
- # new_attributes.modified_at currently lacks
- # subsecond precision (see #6347) so use event_at
- # which should always be the same.
- stamp = ev.get("event_at")
+ properties = ev.get("properties") or {}
+ old_attrs = properties.get("old_attributes") or {}
+ new_attrs = properties.get("new_attributes") or {}
for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
item.invalidate()
- if stamp and pdh and ev.get("object_kind") == "arvados#collection":
- item.update(to_record_version=(stamp, pdh))
- else:
- item.update()
-
- oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
+ if ev.get("object_kind") == "arvados#collection":
+ pdh = new_attrs.get("portable_data_hash")
+ # new_attributes.modified_at currently lacks
+ # subsecond precision (see #6347) so use event_at
+ # which should always be the same.
+ stamp = ev.get("event_at")
+ if (stamp and pdh and item.writable() and
+ item.collection is not None and
+ item.collection.modified() and
+ new_attrs.get("is_trashed") is not True):
+ item.update(to_record_version=(stamp, pdh))
+
+ oldowner = old_attrs.get("owner_uuid")
newowner = ev.get("object_owner_uuid")
for parent in (
self.inodes.inode_cache.find_by_uuid(oldowner) +
self.inodes.inode_cache.find_by_uuid(newowner)):
- parent.invalidate()
- parent.update()
+ parent.child_event(ev)
@catch_exceptions
def getattr(self, inode, ctx=None):
self.logger.info("enable write is %s", self.args.enable_write)
def _setup_api(self):
- self.api = arvados.safeapi.ThreadSafeApiCache(
- apiconfig=arvados.config.settings(),
- keep_params={
- 'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
- 'num_retries': self.args.retries,
- })
+ try:
+ self.api = arvados.safeapi.ThreadSafeApiCache(
+ apiconfig=arvados.config.settings(),
+ keep_params={
+ 'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+ 'num_retries': self.args.retries,
+ })
+ except KeyError as e:
+ self.logger.error("Missing environment: %s", e)
+ exit(1)
# Do a sanity check that we have a working arvados host + token.
self.api.users().current().execute()
def finalize(self):
pass
+
+ def child_event(self, ev):
+ pass
self._poll_time = poll_time
self._updating_lock = threading.Lock()
self._current_user = None
+ self._full_listing = False
def want_event_subscribe(self):
return True
def uuid(self):
return self.project_uuid
+ def items(self):
+ self._full_listing = True
+ return super(ProjectDirectory, self).items()
+
+ def namefn(self, i):
+ if 'name' in i:
+ if i['name'] is None or len(i['name']) == 0:
+ return None
+ elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
+ # collection or subproject
+ return i['name']
+ elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
+ # name link
+ return i['name']
+ elif 'kind' in i and i['kind'].startswith('arvados#'):
+ # something else
+ return "{}.{}".format(i['name'], i['kind'][8:])
+ else:
+ return None
+
+
@use_counter
def update(self):
if self.project_object_file == None:
self.project_object_file = ObjectFile(self.inode, self.project_object)
self.inodes.add_entry(self.project_object_file)
- def namefn(i):
- if 'name' in i:
- if i['name'] is None or len(i['name']) == 0:
- return None
- elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
- # collection or subproject
- return i['name']
- elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
- # name link
- return i['name']
- elif 'kind' in i and i['kind'].startswith('arvados#'):
- # something else
- return "{}.{}".format(i['name'], i['kind'][8:])
- else:
- return None
+ if not self._full_listing:
+ return
def samefn(a, i):
if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
self.project_object = self.api.users().get(
uuid=self.project_uuid).execute(num_retries=self.num_retries)
- contents = arvados.util.list_all(self.api.groups().contents,
- self.num_retries, uuid=self.project_uuid)
+ contents = arvados.util.list_all(self.api.groups().list,
+ self.num_retries,
+ filters=[["owner_uuid", "=", self.project_uuid],
+ ["group_class", "=", "project"]])
+ contents.extend(arvados.util.list_all(self.api.collections().list,
+ self.num_retries,
+ filters=[["owner_uuid", "=", self.project_uuid]]))
# end with llfuse.lock_released, re-acquire lock
self.merge(contents,
- namefn,
+ self.namefn,
samefn,
self.createDirectory)
finally:
self._updating_lock.release()
+ def _add_entry(self, i, name):
+ ent = self.createDirectory(i)
+ self._entries[name] = self.inodes.add_entry(ent)
+ return self._entries[name]
+
@use_counter
@check_update
- def __getitem__(self, item):
- if item == '.arvados#project':
+ def __getitem__(self, k):
+ if k == '.arvados#project':
return self.project_object_file
- else:
- return super(ProjectDirectory, self).__getitem__(item)
+ elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
+ return super(ProjectDirectory, self).__getitem__(k)
+ with llfuse.lock_released:
+ contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+ ["group_class", "=", "project"],
+ ["name", "=", k]],
+ limit=1).execute(num_retries=self.num_retries)["items"]
+ if not contents:
+ contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+ ["name", "=", k]],
+ limit=1).execute(num_retries=self.num_retries)["items"]
+ if contents:
+ name = sanitize_filename(self.namefn(contents[0]))
+ if name != k:
+ raise KeyError(k)
+ return self._add_entry(contents[0], name)
+
+ # Didn't find item
+ raise KeyError(k)
def __contains__(self, k):
if k == '.arvados#project':
return True
- else:
- return super(ProjectDirectory, self).__contains__(k)
+ try:
+ self[k]
+ return True
+ except KeyError:
+ pass
+ return False
@use_counter
@check_update
self._entries[name_new] = ent
self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
+ @use_counter
+ def child_event(self, ev):
+ properties = ev.get("properties") or {}
+ old_attrs = properties.get("old_attributes") or {}
+ new_attrs = properties.get("new_attributes") or {}
+ old_attrs["uuid"] = ev["object_uuid"]
+ new_attrs["uuid"] = ev["object_uuid"]
+ old_name = sanitize_filename(self.namefn(old_attrs))
+ new_name = sanitize_filename(self.namefn(new_attrs))
+
+ # create events will have a new name, but not an old name
+ # delete events will have an old name, but not a new name
+ # update events will have an old and new name, and they may be same or different
+ # if they are the same, an unrelated field changed and there is nothing to do.
+
+ if old_attrs.get("owner_uuid") != self.project_uuid:
+ # Was moved from somewhere else, so don't try to remove entry.
+ old_name = None
+ if ev.get("object_owner_uuid") != self.project_uuid:
+ # Was moved to somewhere else, so don't try to add entry
+ new_name = None
+
+ if ev.get("object_kind") == "arvados#collection":
+ if old_attrs.get("is_trashed"):
+ # Was previously deleted
+ old_name = None
+ if new_attrs.get("is_trashed"):
+ # Has been deleted
+ new_name = None
+
+ if new_name != old_name:
+ ent = None
+ if old_name in self._entries:
+ ent = self._entries[old_name]
+ del self._entries[old_name]
+ self.inodes.invalidate_entry(self.inode, old_name.encode(self.inodes.encoding))
+
+ if new_name:
+ if ent is not None:
+ self._entries[new_name] = ent
+ else:
+ self._add_entry(new_attrs, new_name)
+ elif ent is not None:
+ self.inodes.del_entry(ent)
+
class SharedDirectory(Directory):
"""A special directory that represents users or groups who have shared projects with me."""
attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
+def fuseSharedTestHelper(mounttmp):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ # Double check that we can open and read objects in this folder as a file,
+ # and that its contents are what we expect.
+ baz_path = os.path.join(
+ mounttmp,
+ 'FUSE User',
+ 'FUSE Test Project',
+ 'collection in FUSE project',
+ 'baz')
+ with open(baz_path) as f:
+ self.assertEqual("baz", f.read())
+
+ # check mtime on collection
+ st = os.stat(baz_path)
+ try:
+ mtime = st.st_mtime_ns / 1000000000
+ except AttributeError:
+ mtime = st.st_mtime
+ self.assertEqual(mtime, 1391448174)
+
+ # shared_dirs is a list of the directories exposed
+ # by fuse.SharedDirectory (i.e. any object visible
+ # to the current user)
+ shared_dirs = llfuse.listdir(mounttmp)
+ shared_dirs.sort()
+ self.assertIn('FUSE User', shared_dirs)
+
+ # fuse_user_objs is a list of the objects owned by the FUSE
+ # test user (which present as files in the 'FUSE User'
+ # directory)
+ fuse_user_objs = llfuse.listdir(os.path.join(mounttmp, 'FUSE User'))
+ fuse_user_objs.sort()
+ self.assertEqual(['FUSE Test Project', # project owned by user
+ 'collection #1 owned by FUSE', # collection owned by user
+ 'collection #2 owned by FUSE' # collection owned by user
+ ], fuse_user_objs)
+
+ # test_proj_files is a list of the files in the FUSE Test Project.
+ test_proj_files = llfuse.listdir(os.path.join(mounttmp, 'FUSE User', 'FUSE Test Project'))
+ test_proj_files.sort()
+ self.assertEqual(['collection in FUSE project'
+ ], test_proj_files)
+
+
+ Test().runTest()
+
class FuseSharedTest(MountTestBase):
def runTest(self):
self.make_mount(fuse.SharedDirectory,
exclude=self.api.users().current().execute()['uuid'])
+ keep = arvados.keep.KeepClient()
+ keep.put("baz")
- # shared_dirs is a list of the directories exposed
- # by fuse.SharedDirectory (i.e. any object visible
- # to the current user)
- shared_dirs = llfuse.listdir(self.mounttmp)
- shared_dirs.sort()
- self.assertIn('FUSE User', shared_dirs)
-
- # fuse_user_objs is a list of the objects owned by the FUSE
- # test user (which present as files in the 'FUSE User'
- # directory)
- fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
- fuse_user_objs.sort()
- self.assertEqual(['FUSE Test Project', # project owned by user
- 'collection #1 owned by FUSE', # collection owned by user
- 'collection #2 owned by FUSE', # collection owned by user
- 'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user
- ], fuse_user_objs)
-
- # test_proj_files is a list of the files in the FUSE Test Project.
- test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
- test_proj_files.sort()
- self.assertEqual(['collection in FUSE project',
- 'pipeline instance in FUSE project.pipelineInstance',
- 'pipeline template in FUSE project.pipelineTemplate'
- ], test_proj_files)
-
- # Double check that we can open and read objects in this folder as a file,
- # and that its contents are what we expect.
- pipeline_template_path = os.path.join(
- self.mounttmp,
- 'FUSE User',
- 'FUSE Test Project',
- 'pipeline template in FUSE project.pipelineTemplate')
- with open(pipeline_template_path) as f:
- j = json.load(f)
- self.assertEqual("pipeline template in FUSE project", j['name'])
-
- # check mtime on template
- st = os.stat(pipeline_template_path)
- try:
- mtime = st.st_mtime_ns / 1000000000
- except AttributeError:
- mtime = st.st_mtime
- self.assertEqual(mtime, 1397493304)
-
- # check mtime on collection
- st = os.stat(os.path.join(
- self.mounttmp,
- 'FUSE User',
- 'collection #1 owned by FUSE'))
- try:
- mtime = st.st_mtime_ns / 1000000000
- except AttributeError:
- mtime = st.st_mtime
- self.assertEqual(mtime, 1391448174)
+ self.pool.apply(fuseSharedTestHelper, (self.mounttmp,))
class FuseHomeTest(MountTestBase):
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
)
type handler struct {
- Config *Config
- clientPool *arvadosclient.ClientPool
- setupOnce sync.Once
+ Config *Config
+ clientPool *arvadosclient.ClientPool
+ setupOnce sync.Once
+ healthHandler http.Handler
}
// parseCollectionIDFromDNSName returns a UUID or PDH if s begins with
func (h *handler) setup() {
h.clientPool = arvadosclient.MakeClientPool()
+
keepclient.RefreshServiceDiscoveryOnSIGHUP()
+
+ h.healthHandler = &health.Handler{
+ Token: h.Config.ManagementToken,
+ Prefix: "/_health/",
+ }
}
func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
httpserver.Log(remoteAddr, statusCode, statusText, w.WroteBodyBytes(), r.Method, r.Host, r.URL.Path, r.URL.RawQuery)
}()
+ if strings.HasPrefix(r.URL.Path, "/_health/") && r.Method == "GET" {
+ h.healthHandler.ServeHTTP(w, r)
+ return
+ }
+
if r.Method == "OPTIONS" {
method := r.Header.Get("Access-Control-Request-Method")
if method != "GET" && method != "POST" {
}
}
}
+
+func (s *IntegrationSuite) TestHealthCheckPing(c *check.C) {
+ s.testServer.Config.ManagementToken = arvadostest.ManagementToken
+ authHeader := http.Header{
+ "Authorization": {"Bearer " + arvadostest.ManagementToken},
+ }
+
+ resp := httptest.NewRecorder()
+ u := mustParseURL("http://download.example.com/_health/ping")
+ req := &http.Request{
+ Method: "GET",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: authHeader,
+ }
+ s.testServer.Handler.ServeHTTP(resp, req)
+
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Matches, `{"health":"OK"}\n`)
+}
// Hack to support old command line flag, which is a bool
// meaning "get actual token from environment".
deprecatedAllowAnonymous bool
+
+ //Authorization token to be included in all health check requests.
+ ManagementToken string
}
// DefaultConfig returns the default configuration.
"Only serve attachments at the given `host:port`"+deprecated)
flag.BoolVar(&cfg.TrustAllContent, "trust-all-content", false,
"Serve non-public content from a single origin. Dangerous: read docs before using!"+deprecated)
+ flag.StringVar(&cfg.ManagementToken, "management-token", "",
+ "Authorization token to be included in all health check requests.")
+
dumpConfig := flag.Bool("dump-config", false,
"write current configuration to stdout and exit")
flag.Usage = usage
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/coreos/go-systemd/daemon"
"github.com/ghodss/yaml"
Timeout arvados.Duration
PIDFile string
Debug bool
+ ManagementToken string
}
func DefaultConfig() *Config {
flagset.IntVar(&cfg.DefaultReplicas, "default-replicas", cfg.DefaultReplicas, "Default number of replicas to write if not specified by the client. If 0, use site default."+deprecated)
flagset.StringVar(&cfg.PIDFile, "pid", cfg.PIDFile, "Path to write pid file."+deprecated)
timeoutSeconds := flagset.Int("timeout", int(time.Duration(cfg.Timeout)/time.Second), "Timeout (in seconds) on requests to internal Keep services."+deprecated)
+ flagset.StringVar(&cfg.ManagementToken, "management-token", cfg.ManagementToken, "Authorization token to be included in all health check requests.")
var cfgPath string
const defaultCfgPath = "/etc/arvados/keepproxy/keepproxy.yml"
signal.Notify(term, syscall.SIGINT)
// Start serving requests.
- router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout))
+ router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
http.Serve(listener, router)
log.Println("shutting down")
// MakeRESTRouter returns an http.Handler that passes GET and PUT
// requests to the appropriate handlers.
-func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient, timeout time.Duration) http.Handler {
+func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient, timeout time.Duration, mgmtToken string) http.Handler {
rest := mux.NewRouter()
transport := *(http.DefaultTransport.(*http.Transport))
rest.HandleFunc(`/`, h.Options).Methods("OPTIONS")
}
+ rest.Handle("/_health/{check}", &health.Handler{
+ Token: mgmtToken,
+ Prefix: "/_health/",
+ }).Methods("GET")
+
rest.NotFoundHandler = InvalidPathHandler{}
return h
}
// fixes the invalid Content-Length header. In order to test
// our server behavior, we have to call the handler directly
// using an httptest.ResponseRecorder.
- rtr := MakeRESTRouter(true, true, kc, 10*time.Second)
+ rtr := MakeRESTRouter(true, true, kc, 10*time.Second, "")
type testcase struct {
sendLength string
c.Check(err, ErrorMatches, `.*HTTP 502.*`)
}
}
+
+func (s *ServerRequiredSuite) TestPing(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+
+ rtr := MakeRESTRouter(true, true, kc, 10*time.Second, arvadostest.ManagementToken)
+
+ req, err := http.NewRequest("GET",
+ "http://"+listener.Addr().String()+"/_health/ping",
+ nil)
+ c.Assert(err, IsNil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+
+ resp := httptest.NewRecorder()
+ rtr.ServeHTTP(resp, req)
+ c.Check(resp.Code, Equals, 200)
+ c.Assert(strings.Contains(resp.Body.String(), `{"health":"OK"}`), Equals, true)
+}
Enable debug logging.
+ManagementToken:
+
+ Authorization token to be included in all health check requests.
+
`, exampleConfigFile)
}
'watchdog': '600',
'node_mem_scaling': '0.95'},
'Manage': {'address': '127.0.0.1',
- 'port': '-1'},
+ 'port': '-1',
+ 'ManagementToken': ''},
'Logging': {'file': '/dev/stderr',
'level': 'WARNING'}
}.iteritems():
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(tracker.get_json())
+ elif self.path == '/_health/ping':
+ code, msg = self.check_auth()
+
+ if code != 200:
+ self.send_response(code)
+ self.wfile.write(msg)
+ else:
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.end_headers()
+ self.wfile.write(json.dumps({"health":"OK"}))
else:
self.send_response(404)
def log_message(self, fmt, *args, **kwargs):
_logger.info(fmt, *args, **kwargs)
+ def check_auth(self):
+ mgmt_token = self.server._config.get('Manage', 'ManagementToken')
+ auth_header = self.headers.get('Authorization', None)
+
+ if mgmt_token == '':
+ return 404, "disabled"
+ elif auth_header == None:
+ return 401, "authorization required"
+ elif auth_header != 'Bearer '+mgmt_token:
+ return 403, "authorization error"
+ return 200, ""
class Tracker(object):
def __init__(self):
from __future__ import absolute_import, print_function
from future import standard_library
+import json
import requests
import unittest
class TestServer(object):
+ def __init__(self, management_token=None):
+ self.mgmt_token = management_token
+
def __enter__(self):
cfg = config.NodeManagerConfig()
cfg.set('Manage', 'port', '0')
cfg.set('Manage', 'address', '127.0.0.1')
+ if self.mgmt_token != None:
+ cfg.set('Manage', 'ManagementToken', self.mgmt_token)
self.srv = status.Server(cfg)
self.srv.start()
addr, port = self.srv.server_address
def get_status(self):
return self.get_status_response().json()
+ def get_healthcheck_ping(self, auth_header=None):
+ headers = {}
+ if auth_header != None:
+ headers['Authorization'] = auth_header
+ return requests.get(self.srv_base+'/_health/ping', headers=headers)
class StatusServerUpdates(unittest.TestCase):
def test_updates(self):
self.srv.start()
self.assertFalse(self.srv.enabled)
self.assertFalse(getattr(self.srv, '_thread', False))
+
+class HealthcheckPing(unittest.TestCase):
+ def test_ping_disabled(self):
+ with TestServer() as srv:
+ r = srv.get_healthcheck_ping()
+ self.assertEqual(404, r.status_code)
+
+ def test_ping_no_auth(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping()
+ self.assertEqual(401, r.status_code)
+
+ def test_ping_bad_auth_format(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('noBearer')
+ self.assertEqual(403, r.status_code)
+
+ def test_ping_bad_auth_token(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('Bearer badtoken')
+ self.assertEqual(403, r.status_code)
+
+ def test_ping_success(self):
+ with TestServer('configuredmanagementtoken') as srv:
+ r = srv.get_healthcheck_ping('Bearer configuredmanagementtoken')
+ self.assertEqual(200, r.status_code)
+ self.assertEqual('application/json', r.headers['content-type'])
+ resp = r.json()
+ self.assertEqual('{"health": "OK"}', json.dumps(resp))
"ignore": "test",
"package": [
{
- "checksumSHA1": "jf7K+UTQNIzRdlG5F4zX/8b++/E=",
+ "checksumSHA1": "b68aaMZImS90FjnReAxpbp20FGA=",
+ "origin": "github.com/curoverse/goamz/aws",
"path": "github.com/AdRoll/goamz/aws",
- "revision": "c5d7d9bd6c743fae44efc6c18450282022445ffc",
- "revisionTime": "2017-02-25T09:28:51Z"
+ "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
},
{
"checksumSHA1": "ey9ddXTW9dncjJz/COKpeYm+sgg=",
+ "origin": "github.com/curoverse/goamz/s3",
"path": "github.com/AdRoll/goamz/s3",
- "revision": "c5d7d9bd6c743fae44efc6c18450282022445ffc",
- "revisionTime": "2017-02-25T09:28:51Z"
+ "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
},
{
"checksumSHA1": "pDHYVqUQtRsPYw/X4kUrdK7pxMs=",
+ "origin": "github.com/curoverse/goamz/s3/s3test",
"path": "github.com/AdRoll/goamz/s3/s3test",
- "revision": "c5d7d9bd6c743fae44efc6c18450282022445ffc",
- "revisionTime": "2017-02-25T09:28:51Z"
+ "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
},
{
"checksumSHA1": "Rjy2uYZkQ8Kjht6ZFU0qzm2I/kI=",
"revision": "1620af6b32398bfc91827ceae54a8cc1f55df04d",
"revisionTime": "2016-12-14T20:08:43Z"
},
- {
- "checksumSHA1": "qjY3SPlNvqT179DPiRaIsRhYZQI=",
- "origin": "github.com/docker/docker/vendor/github.com/docker/distribution",
- "path": "github.com/docker/distribution",
- "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
- "revisionTime": "2017-05-17T20:48:28Z"
- },
- {
- "checksumSHA1": "0au+tD+jymXNssdb1JgcctY7PN4=",
- "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/context",
- "path": "github.com/docker/distribution/context",
- "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
- "revisionTime": "2017-05-17T20:48:28Z"
- },
{
"checksumSHA1": "Gj+xR1VgFKKmFXYOJMnAczC3Znk=",
"origin": "github.com/docker/docker/vendor/github.com/docker/distribution/digestset",
"revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
"revisionTime": "2017-05-17T20:48:28Z"
},
- {
- "checksumSHA1": "oYy5Q1HBImMQvh9t96cmNzWar80=",
- "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/manifest",
- "path": "github.com/docker/distribution/manifest",
- "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
- "revisionTime": "2017-05-17T20:48:28Z"
- },
- {
- "checksumSHA1": "SK1g7ll2cPbgDyWpK0oVT9beVZY=",
- "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/manifest/manifestlist",
- "path": "github.com/docker/distribution/manifest/manifestlist",
- "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
- "revisionTime": "2017-05-17T20:48:28Z"
- },
{
"checksumSHA1": "m4wEFD0Mh+ClfprUqgl0GyNmk7Q=",
"origin": "github.com/docker/docker/vendor/github.com/docker/distribution/reference",
"revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
"revisionTime": "2017-05-17T20:48:28Z"
},
- {
- "checksumSHA1": "cNp7rNReJHvdSfrIetXS9RGsLSo=",
- "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/uuid",
- "path": "github.com/docker/distribution/uuid",
- "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
- "revisionTime": "2017-05-17T20:48:28Z"
- },
{
"checksumSHA1": "5b7eC73lORtIUFCjz548jXkLlKU=",
"path": "github.com/docker/docker/api",