11167: Merge branch 'master' into 11167-wb-remove-arvget
authorLucas Di Pentima <lucas@curoverse.com>
Mon, 7 Aug 2017 21:55:59 +0000 (18:55 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Mon, 7 Aug 2017 21:57:11 +0000 (18:57 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas@curoverse.com>

44 files changed:
apps/workbench/app/controllers/healthcheck_controller.rb
apps/workbench/config/application.default.yml
apps/workbench/test/controllers/healthcheck_controller_test.rb
build/run-build-packages.sh
build/run-library.sh
sdk/cli/bin/crunch-job
sdk/cli/test/test_crunch-job.rb
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/wf/expect_packed.cwl
sdk/perl/lib/Arvados/Request.pm
sdk/python/arvados/util.py
services/api/app/controllers/arvados/v1/healthcheck_controller.rb
services/api/app/models/user.rb
services/api/config/application.default.yml
services/api/test/functional/arvados/v1/healthcheck_controller_test.rb
services/arv-git-httpd/integration_test.go
services/arv-git-httpd/main.go
services/arv-git-httpd/server.go
services/arv-git-httpd/server_test.go
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/command.py
services/fuse/arvados_fuse/fresh.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/tests/test_mount.py
services/keep-web/handler.go
services/keep-web/handler_test.go
services/keep-web/main.go
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go
services/keepproxy/usage.go
services/keepstore/s3_volume.go
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/arvnodeman/status.py
services/nodemanager/tests/integration_test.py
services/nodemanager/tests/test_jobqueue.py
services/nodemanager/tests/test_status.py
vendor/vendor.json

index 8cf6b93b518076aaa01123880377db948dca3abd..60043d9024c223558cabc9cfc51a1d2522e6e1f4 100644 (file)
@@ -16,7 +16,7 @@ class HealthcheckController < ApplicationController
   before_filter :check_auth_header
 
   def check_auth_header
-    mgmt_token = Rails.configuration.management_token
+    mgmt_token = Rails.configuration.ManagementToken
     auth_header = request.headers['Authorization']
 
     if !mgmt_token
index 2557b47ee91fd9859f63dcf9e46a13df467331ae..da20573abb859b96a5202983ab9e77e1703ac25f 100644 (file)
@@ -301,4 +301,4 @@ common:
 
   # Token to be included in all healthcheck requests. Disabled by default.
   # Workbench expects request header of the format "Authorization: Bearer xxx"
-  management_token: false
+  ManagementToken: false
index 9254593dc3354914b391740747440dc07f7eb221..9a63a29e8f9677ec8a53426b374e42d255996c56 100644 (file)
@@ -13,7 +13,7 @@ class HealthcheckControllerTest < ActionController::TestCase
     [true, 'Bearer configuredmanagementtoken', 200, '{"health":"OK"}'],
   ].each do |enabled, header, error_code, error_msg|
     test "ping when #{if enabled then 'enabled' else 'disabled' end} with header '#{header}'" do
-      Rails.configuration.management_token = 'configuredmanagementtoken' if enabled
+      Rails.configuration.ManagementToken = 'configuredmanagementtoken' if enabled
 
       @request.headers['Authorization'] = header
       get :ping
index 39c40934311c9786ba0ed7786818807d77086ab6..a0a6c5f2645f6a17ca201f51672e160f29d37e86 100755 (executable)
@@ -340,6 +340,7 @@ fi
 # Go binaries
 cd $WORKSPACE/packages/$TARGET
 export GOPATH=$(mktemp -d)
+go get -v github.com/kardianos/govendor
 package_go_binary sdk/go/crunchrunner crunchrunner \
     "Crunchrunner executes a command inside a container and uploads the output"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
index ae5ad6d49bff34557c68eacebb8976d97ab4b2db..cf7755b68de780631cee4319ea720160146ffdff 100755 (executable)
@@ -103,19 +103,27 @@ package_go_binary() {
 
     mkdir -p "$GOPATH/src/git.curoverse.com"
     ln -sfn "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+    (cd "$GOPATH/src/git.curoverse.com/arvados.git" && "$GOPATH/bin/govendor" sync -v)
 
     cd "$GOPATH/src/git.curoverse.com/arvados.git/$src_path"
     local version="$(version_from_git)"
     local timestamp="$(timestamp_from_git)"
 
-    # If the command imports anything from the Arvados SDK, bump the
-    # version number and build a new package whenever the SDK changes.
+    # Update the version number and build a new package if the vendor
+    # bundle has changed, or the command imports anything from the
+    # Arvados SDK and the SDK has changed.
+    declare -a checkdirs=(vendor)
     if grep -qr git.curoverse.com/arvados .; then
-        cd "$GOPATH/src/git.curoverse.com/arvados.git/sdk/go"
-        if [[ $(timestamp_from_git) -gt "$timestamp" ]]; then
+        checkdirs+=(sdk/go)
+    fi
+    for dir in ${checkdirs[@]}; do
+        cd "$GOPATH/src/git.curoverse.com/arvados.git/$dir"
+        ts="$(timestamp_from_git)"
+        if [[ "$ts" -gt "$timestamp" ]]; then
             version=$(version_from_git)
+            timestamp="$ts"
         fi
-    fi
+    done
 
     cd $WORKSPACE/packages/$TARGET
     test_package_presence $prog $version go
index afca52cb73f1809d5f9f25cca69ba28be8becab1..5e6c3a084ed49d4f5d9e8beff6d9e38f815a2c5c 100755 (executable)
@@ -189,7 +189,7 @@ if (($Job || $local_job)->{docker_image_locator}) {
   $cmd = [$docker_bin, 'ps', '-q'];
 }
 Log(undef, "Sanity check is `@$cmd`");
-my ($exited, $stdout, $stderr) = srun_sync(
+my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
   $cmd,
   {label => "sanity check"});
@@ -397,7 +397,7 @@ if (!defined $no_clear_tmp) {
   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
   # up work directories crunch_tmp/work, crunch_tmp/opt,
   # crunch_tmp/src*.
-  my ($exited, $stdout, $stderr) = srun_sync(
+  my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
     ['bash', '-ec', q{
 arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
@@ -405,7 +405,7 @@ rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRU
     }],
     {label => "clean work dirs"});
   if ($exited != 0) {
-    exit(EX_RETRY_UNLOCKED);
+    exit_retry_unlocked();
   }
 }
 
@@ -439,20 +439,23 @@ fi
 echo >&2 "image loaded successfully"
 };
 
-  my ($exited, $stdout, $stderr) = srun_sync(
+  my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
     ["srun", "--nodelist=" . join(',', @node)],
     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
     {label => "load docker image"});
   if ($exited != 0)
   {
-    exit(EX_RETRY_UNLOCKED);
+    exit_retry_unlocked();
   }
 
   # Determine whether this version of Docker supports memory+swap limits.
-  ($exited, $stdout, $stderr) = srun_sync(
+  ($exited, $stdout, $stderr, $tempfail) = srun_sync(
     ["srun", "--nodes=1"],
     [$docker_bin, 'run', '--help'],
     {label => "check --memory-swap feature"});
+  if ($tempfail) {
+    exit_retry_unlocked();
+  }
   $docker_limitmem = ($stdout =~ /--memory-swap/);
 
   # Find a non-root Docker user to use.
@@ -472,7 +475,7 @@ echo >&2 "image loaded successfully"
       $label = "check whether user '$try_user' is UID 0";
       $try_user_arg = "--user=$try_user";
     }
-    my ($exited, $stdout, $stderr) = srun_sync(
+    my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
       ["srun", "--nodes=1"],
       ["/bin/sh", "-ec",
        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
@@ -486,6 +489,8 @@ echo >&2 "image loaded successfully"
         Log(undef, "Container will run with $dockeruserarg");
       }
       last;
+    } elsif ($tempfail) {
+      exit_retry_unlocked();
     }
   }
 
@@ -678,11 +683,14 @@ else {
                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
 
     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
-    my ($stdout, $stderr);
-    ($exited, $stdout, $stderr) = srun_sync(
+    my ($stdout, $stderr, $tempfail);
+    ($exited, $stdout, $stderr, $tempfail) = srun_sync(
       \@srunargs, \@execargs,
       {label => "run install script on all workers"},
-      $build_script . $git_archive);
+        $build_script . $git_archive);
+    if ($tempfail) {
+      exit_retry_unlocked();
+    }
 
     my $stderr_anything_from_script = 0;
     for my $line (split(/\n/, $stderr)) {
@@ -1117,7 +1125,7 @@ if (!defined $main::success)
   } elsif ($working_slot_count < 1) {
     save_output_collection();
     save_meta();
-    exit(EX_RETRY_UNLOCKED);
+    exit_retry_unlocked();
   } elsif ($thisround_succeeded == 0 &&
            ($thisround_failed == 0 || $thisround_failed > 4)) {
     my $message = "stop because $thisround_failed tasks failed and none succeeded";
@@ -1536,7 +1544,7 @@ sub preprocess_stderr
         $st->{node}->{fail_count}++;
       }
     }
-    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
+    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
       $jobstep[$jobstepidx]->{tempfail} = 1;
       if (defined($job_slot_index)) {
         $slot[$job_slot_index]->{node}->{fail_count}++;
@@ -2044,7 +2052,7 @@ sub srun_sync
   if ($main::please_freeze || $j->{tempfail}) {
     $exited ||= 255;
   }
-  return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
+  return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail});
 }
 
 
@@ -2132,6 +2140,11 @@ sub find_docker_image {
   }
 }
 
+sub exit_retry_unlocked {
+  Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED);
+  exit(EX_RETRY_UNLOCKED);
+}
+
 sub retry_count {
   # Calculate the number of times an operation should be retried,
   # assuming exponential backoff, and that we're willing to retry as
index 4a528deab46f3cf1799074025ce6b33e95bf717d..c1465d8d69feb447b5d521dad70e7d0c262e9182 100644 (file)
@@ -96,7 +96,7 @@ class TestCrunchJob < Minitest::Test
       tryjobrecord j, binstubs: ['clean_fail']
     end
     assert_match /Failing mount stub was called/, err
-    assert_match /clean work dirs: exit 44\n(.*arv_put.*INFO.*\n)?$/, err
+    assert_match /clean work dirs: exit 44\n.*Transient failure.* exiting 93\n(.*arv_put.*INFO.*\n)?$/, err
     assert_equal SPECIAL_EXIT[:EX_RETRY_UNLOCKED], $?.exitstatus
   end
 
index 695597f839c1ceee5ccbb67f3391218d3d1a8e34..7f4b5c7549314b0d0dbd3cfbf52b1023ad7887fd 100644 (file)
@@ -113,6 +113,7 @@ class ArvCwlRunner(object):
                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
                                                 num_retries=self.num_retries,
                                                 overrides=kwargs.get("override_tools"))
+        kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
index f42e6d8c9c356cfcee7c165d554359cdec40623b..6b736a5a7d872ff60eae3bdeffc1e55c66de40c0 100644 (file)
@@ -36,7 +36,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
     with SourceLine(dockerRequirement, "dockerImageId", WorkflowException):
         sp = dockerRequirement["dockerImageId"].split(":")
         image_name = sp[0]
-        image_tag = sp[1] if len(sp) > 1 else None
+        image_tag = sp[1] if len(sp) > 1 else "latest"
 
         images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
                                                                 image_name=image_name,
@@ -51,9 +51,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
             if project_uuid:
                 args.append("--project-uuid="+project_uuid)
             args.append(image_name)
-            if image_tag:
-                args.append(image_tag)
-            logger.info("Uploading Docker image %s", ":".join(args[1:]))
+            args.append(image_tag)
+            logger.info("Uploading Docker image %s:%s", image_name, image_tag)
             try:
                 arvados.commands.keepdocker.main(args, stdout=sys.stderr)
             except SystemExit as e:
index 93e2819084601d784a973c33ca30bbbdb2d6db49..08e203b87908aa13d702ee983b1c39617a9ca8a2 100644 (file)
@@ -227,6 +227,9 @@ workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
 pipeline_template_uuid_pattern = re.compile(r'[a-z0-9]{5}-p5p6p-[a-z0-9]{15}')
 
 def collectionResolver(api_client, document_loader, uri, num_retries=4):
+    if uri.startswith("keep:") or uri.startswith("arvwf:"):
+        return uri
+
     if workflow_uuid_pattern.match(uri):
         return "arvwf:%s#main" % (uri)
 
index e39c7d23ce22fe71e94d7956e2f5abe4a8e323e3..5a2d814f5d0f614caefea7b1a08c9c36f6fc8925 100644 (file)
@@ -205,10 +205,20 @@ class ArvPathMapper(PathMapper):
 class StagingPathMapper(PathMapper):
     _follow_dirs = True
 
+    def __init__(self, referenced_files, basedir, stagedir, separateDirs=True):
+        self.targets = set()
+        super(StagingPathMapper, self).__init__(referenced_files, basedir, stagedir, separateDirs)
+
     def visit(self, obj, stagedir, basedir, copy=False, staged=False):
         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
         loc = obj["location"]
         tgt = os.path.join(stagedir, obj["basename"])
+        basetgt, baseext = os.path.splitext(tgt)
+        n = 1
+        while tgt in self.targets:
+            n += 1
+            tgt = "%s_%i%s" % (basetgt, n, baseext)
+        self.targets.add(tgt)
         if obj["class"] == "Directory":
             self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
             if loc.startswith("_:") or self._follow_dirs:
index 087fed3e16e72cb26c95500b4ccb03a83bf71806..bb4fac2ae9541a55872ff6fd371b380659a0f15e 100644 (file)
@@ -161,6 +161,8 @@ def upload_docker(arvrunner, tool):
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+        else:
+            arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
index db11705f4ed4b99d7bfd03dec59a2d6b3de20e3d..572519fdcf7e6c45218e8e2352366a90eb87564c 100644 (file)
@@ -51,8 +51,8 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20170707200431',
-          'schema-salad==2.6.20170630075932',
+          'cwltool==1.0.20170727112954',
+          'schema-salad==2.6.20170712194300',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',
           'arvados-python-client>=0.1.20170526013812',
index 3d6b91536a2f2732ad2e97d5c172bf41e2ee60e7..8ab0a8de9c2448e999475b7ac8735dcedc928099 100644 (file)
@@ -84,7 +84,13 @@ def stubs(func):
                 "uuid": "",
                 "portable_data_hash": "99999999999999999999999999999998+99",
                 "manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
-            }}
+            },
+            "99999999999999999999999999999994+99": {
+                "uuid": "",
+                "portable_data_hash": "99999999999999999999999999999994+99",
+                "manifest_text": ". 99999999999999999999999999999994+99 0:0:expect_arvworkflow.cwl"
+            }
+        }
         stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
         stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
 
@@ -152,7 +158,8 @@ def stubs(func):
                             'class': 'File',
                             'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
                             "nameext": ".txt",
-                            "nameroot": "blorp"
+                            "nameroot": "blorp",
+                            "size": 16
                         }},
                         'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
                               'listing': [
@@ -217,7 +224,8 @@ def stubs(func):
                             'class': 'File',
                             'location': u'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
                             "nameext": ".txt",
-                            "nameroot": "blorp"
+                            "nameroot": "blorp",
+                            "size": 16
                         },
                         'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
                             {'basename': 'renamed.txt',
index 32a9255e90d442a475394036ea54dd55a797e6fa..f45077197fef194662c206a72045b2a26ddaae24 100644 (file)
@@ -24,7 +24,7 @@ $graph:
   - id: '#main/x'
     type: File
     default: {class: File, location: 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
-      basename: blorp.txt, nameroot: blorp, nameext: .txt}
+      size: 16, basename: blorp.txt, nameroot: blorp, nameext: .txt}
   - id: '#main/y'
     type: Directory
     default: {class: Directory, location: 'keep:99999999999999999999999999999998+99',
index 03d542805161481f4023c7c9084a2e0cce3c7b16..4523f7d6b3ac38561e17c50b889201166f22baad 100644 (file)
@@ -46,9 +46,12 @@ sub process_request
     $self->{'req'} = new HTTP::Request (%req);
     $self->{'req'}->header('Authorization' => ('OAuth2 ' . $self->{'authToken'})) if $self->{'authToken'};
     $self->{'req'}->header('Accept' => 'application/json');
+
+    # allow_nonref lets us encode JSON::true and JSON::false, see #12078
+    my $json = JSON->new->allow_nonref;
     my ($p, $v);
     while (($p, $v) = each %{$self->{'queryParams'}}) {
-        $content{$p} = (ref($v) eq "") ? $v : JSON::encode_json($v);
+        $content{$p} = (ref($v) eq "") ? $v : $json->encode($v);
     }
     my $content;
     while (($p, $v) = each %content) {
index 97e1d26d2ba2e1cf7ad503ab80b2974676c87cf8..1a973586051769e816103553e22326839a0c3670 100644 (file)
@@ -24,6 +24,8 @@ collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
+job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
+container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
 
 def clear_tmpdir(path=None):
index 3986af9dc347aebe33ccd8fd26ccd4060eccee77..6d55506bb5742cf9be7c183186b60f6479ab5226 100644 (file)
@@ -16,7 +16,7 @@ class Arvados::V1::HealthcheckController < ApplicationController
   before_filter :check_auth_header
 
   def check_auth_header
-    mgmt_token = Rails.configuration.management_token
+    mgmt_token = Rails.configuration.ManagementToken
     auth_header = request.headers['Authorization']
 
     if !mgmt_token
index 26c714eb4a39bda9ec1df0f6f69d488c64adf7c6..f0934104d32ca6454ec6cfcf2f4c227bc3cd4a71 100644 (file)
@@ -10,6 +10,11 @@ class User < ArvadosModel
   include CommonApiTemplate
   include CanBeAnOwner
 
+  # To avoid upgrade bugs, when changing the permission cache value
+  # format, change PERM_CACHE_PREFIX too:
+  PERM_CACHE_PREFIX = "perm_v20170725_"
+  PERM_CACHE_TTL = 172800
+
   serialize :prefs, Hash
   has_many :api_client_authorizations
   validates(:username,
@@ -141,7 +146,7 @@ class User < ArvadosModel
       timestamp = DbCurrentTime::db_current_time.to_i if timestamp.nil?
       connection.execute "NOTIFY invalidate_permissions_cache, '#{timestamp}'"
     else
-      Rails.cache.delete_matched(/^groups_for_user_/)
+      Rails.cache.delete_matched(/^#{PERM_CACHE_PREFIX}/)
     end
   end
 
@@ -183,7 +188,7 @@ class User < ArvadosModel
                   ).rows.each do |group_uuid, max_p_val|
       group_perms[group_uuid] = PERMS_FOR_VAL[max_p_val.to_i]
     end
-    Rails.cache.write "groups_for_user_#{self.uuid}", group_perms
+    Rails.cache.write "#{PERM_CACHE_PREFIX}#{self.uuid}", group_perms, expires_in: PERM_CACHE_TTL
     group_perms
   end
 
@@ -191,12 +196,12 @@ class User < ArvadosModel
   # and perm_hash[:write] are true if this user can read and write
   # objects owned by group_uuid.
   def group_permissions
-    r = Rails.cache.read "groups_for_user_#{self.uuid}"
+    r = Rails.cache.read "#{PERM_CACHE_PREFIX}#{self.uuid}"
     if r.nil?
       if Rails.configuration.async_permissions_update
         while r.nil?
           sleep(0.1)
-          r = Rails.cache.read "groups_for_user_#{self.uuid}"
+          r = Rails.cache.read "#{PERM_CACHE_PREFIX}#{self.uuid}"
         end
       else
         r = calculate_group_permissions
index 8dafd1c2da713caa21ae6e1be392d21b7400be88..2f32556733b1a8a186bc3ad51a540518c485ac57 100644 (file)
@@ -444,7 +444,7 @@ common:
 
   # Token to be included in all healthcheck requests. Disabled by default.
   # Server expects request header of the format "Authorization: Bearer xxx"
-  management_token: false
+  ManagementToken: false
 
 development:
   force_ssl: false
index 282bdf14eb0c3c5b4298927536a1009e541f4f8a..551eefa8787baf10e3507a8d6768484de06fb8df 100644 (file)
@@ -13,7 +13,7 @@ class Arvados::V1::HealthcheckControllerTest < ActionController::TestCase
     [true, 'Bearer configuredmanagementtoken', 200, '{"health":"OK"}'],
   ].each do |enabled, header, error_code, error_msg|
     test "ping when #{if enabled then 'enabled' else 'disabled' end} with header '#{header}'" do
-      Rails.configuration.management_token = 'configuredmanagementtoken' if enabled
+      Rails.configuration.ManagementToken = 'configuredmanagementtoken' if enabled
 
       @request.headers['Authorization'] = header
       get :ping
index 6a2b6401b5e0b293664e4ff0803c9ea285b331ad..10c69eedd3bf2e2c81cb51ea7c92961d108f1204 100644 (file)
@@ -77,9 +77,10 @@ func (s *IntegrationSuite) SetUpTest(c *check.C) {
                                APIHost:  arvadostest.APIHost(),
                                Insecure: true,
                        },
-                       Listen:     ":0",
-                       GitCommand: "/usr/bin/git",
-                       RepoRoot:   s.tmpRepoRoot,
+                       Listen:          ":0",
+                       GitCommand:      "/usr/bin/git",
+                       RepoRoot:        s.tmpRepoRoot,
+                       ManagementToken: arvadostest.ManagementToken,
                }
        }
 
index bee02693ae62745be83a0ac80a3430cfb3cf804e..79a3eb3f7b85de9667c04fe1b24f2a5217db5772 100644 (file)
@@ -18,11 +18,12 @@ import (
 
 // Server configuration
 type Config struct {
-       Client       arvados.Client
-       Listen       string
-       GitCommand   string
-       RepoRoot     string
-       GitoliteHome string
+       Client          arvados.Client
+       Listen          string
+       GitCommand      string
+       RepoRoot        string
+       GitoliteHome    string
+       ManagementToken string
 }
 
 var theConfig = defaultConfig()
@@ -49,6 +50,10 @@ func main() {
 
        cfgPath := flag.String("config", defaultCfgPath, "Configuration file `path`.")
        dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+
+       flag.StringVar(&theConfig.ManagementToken, "management-token", theConfig.ManagementToken,
+               "Authorization token to be included in all health check requests.")
+
        flag.Usage = usage
        flag.Parse()
 
index 8a7819a2a6133099d99f6058f01d94e36edb6018..8f0d90f89e29cad0a0d8d590b66e412b02195ad2 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "net/http"
 
+       "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
 )
 
@@ -17,6 +18,10 @@ type server struct {
 func (srv *server) Start() error {
        mux := http.NewServeMux()
        mux.Handle("/", &authHandler{handler: newGitHandler()})
+       mux.Handle("/_health/", &health.Handler{
+               Token:  theConfig.ManagementToken,
+               Prefix: "/_health/",
+       })
        srv.Handler = mux
        srv.Addr = theConfig.Listen
        return srv.Server.Start()
index 45336010a1397b06954e09b52fb66a312dd76fdd..b4fc532e8f1a2000e070c86131b60ae143a0edf7 100644 (file)
@@ -5,9 +5,13 @@
 package main
 
 import (
+       "net/http"
+       "net/http/httptest"
        "os"
        "os/exec"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+
        check "gopkg.in/check.v1"
 )
 
@@ -104,3 +108,16 @@ func (s *GitSuite) makeArvadosRepo(c *check.C) {
        c.Log(string(msg))
        c.Assert(err, check.Equals, nil)
 }
+
+func (s *GitSuite) TestHealthCheckPing(c *check.C) {
+       req, err := http.NewRequest("GET",
+               "http://"+s.testServer.Addr+"/_health/ping",
+               nil)
+       c.Assert(err, check.Equals, nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+
+       resp := httptest.NewRecorder()
+       s.testServer.Handler.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, 200)
+       c.Check(resp.Body.String(), check.Matches, `{"health":"OK"}\n`)
+}
index a43a556866d524920b04303aea252ff127ac8c44..30770fc0152a58125495420ebed2d8836768cfd3 100644 (file)
@@ -394,30 +394,33 @@ class Operations(llfuse.Operations):
 
     @catch_exceptions
     def on_event(self, ev):
-        if 'event_type' not in ev:
+        if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
             return
         with llfuse.lock:
-            new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
-            pdh = new_attrs.get("portable_data_hash")
-            # new_attributes.modified_at currently lacks
-            # subsecond precision (see #6347) so use event_at
-            # which should always be the same.
-            stamp = ev.get("event_at")
+            properties = ev.get("properties") or {}
+            old_attrs = properties.get("old_attributes") or {}
+            new_attrs = properties.get("new_attributes") or {}
 
             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
                 item.invalidate()
-                if stamp and pdh and ev.get("object_kind") == "arvados#collection":
-                    item.update(to_record_version=(stamp, pdh))
-                else:
-                    item.update()
-
-            oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
+                if ev.get("object_kind") == "arvados#collection":
+                    pdh = new_attrs.get("portable_data_hash")
+                    # new_attributes.modified_at currently lacks
+                    # subsecond precision (see #6347) so use event_at
+                    # which should always be the same.
+                    stamp = ev.get("event_at")
+                    if (stamp and pdh and item.writable() and
+                        item.collection is not None and
+                        item.collection.modified() and
+                        new_attrs.get("is_trashed") is not True):
+                        item.update(to_record_version=(stamp, pdh))
+
+            oldowner = old_attrs.get("owner_uuid")
             newowner = ev.get("object_owner_uuid")
             for parent in (
                     self.inodes.inode_cache.find_by_uuid(oldowner) +
                     self.inodes.inode_cache.find_by_uuid(newowner)):
-                parent.invalidate()
-                parent.update()
+                parent.child_event(ev)
 
     @catch_exceptions
     def getattr(self, inode, ctx=None):
index b3717ff07c23cb665505645d869b0670d1566b54..4dad90c86758edb118d7ab4b04958417533b9653 100644 (file)
@@ -205,12 +205,16 @@ class Mount(object):
         self.logger.info("enable write is %s", self.args.enable_write)
 
     def _setup_api(self):
-        self.api = arvados.safeapi.ThreadSafeApiCache(
-            apiconfig=arvados.config.settings(),
-            keep_params={
-                'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
-                'num_retries': self.args.retries,
-            })
+        try:
+            self.api = arvados.safeapi.ThreadSafeApiCache(
+                apiconfig=arvados.config.settings(),
+                keep_params={
+                    'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+                    'num_retries': self.args.retries,
+                })
+        except KeyError as e:
+            self.logger.error("Missing environment: %s", e)
+            exit(1)
         # Do a sanity check that we have a working arvados host + token.
         self.api.users().current().execute()
 
index 34295ef319afb125d1fe4971e37519cdb0ec983c..a51dd909b690df3cb39865d021b8f4daea4b471b 100644 (file)
@@ -139,3 +139,6 @@ class FreshBase(object):
 
     def finalize(self):
         pass
+
+    def child_event(self, ev):
+        pass
index 30ae6b40e0ae95c751ccaa1b3c0760b623d793c5..0178fe5544b07ddb30b9fa9c8e08d734a12cde0c 100644 (file)
@@ -771,6 +771,7 @@ class ProjectDirectory(Directory):
         self._poll_time = poll_time
         self._updating_lock = threading.Lock()
         self._current_user = None
+        self._full_listing = False
 
     def want_event_subscribe(self):
         return True
@@ -793,27 +794,35 @@ class ProjectDirectory(Directory):
     def uuid(self):
         return self.project_uuid
 
+    def items(self):
+        self._full_listing = True
+        return super(ProjectDirectory, self).items()
+
+    def namefn(self, i):
+        if 'name' in i:
+            if i['name'] is None or len(i['name']) == 0:
+                return None
+            elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
+                # collection or subproject
+                return i['name']
+            elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
+                # name link
+                return i['name']
+            elif 'kind' in i and i['kind'].startswith('arvados#'):
+                # something else
+                return "{}.{}".format(i['name'], i['kind'][8:])
+        else:
+            return None
+
+
     @use_counter
     def update(self):
         if self.project_object_file == None:
             self.project_object_file = ObjectFile(self.inode, self.project_object)
             self.inodes.add_entry(self.project_object_file)
 
-        def namefn(i):
-            if 'name' in i:
-                if i['name'] is None or len(i['name']) == 0:
-                    return None
-                elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
-                    # collection or subproject
-                    return i['name']
-                elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
-                    # name link
-                    return i['name']
-                elif 'kind' in i and i['kind'].startswith('arvados#'):
-                    # something else
-                    return "{}.{}".format(i['name'], i['kind'][8:])
-            else:
-                return None
+        if not self._full_listing:
+            return
 
         def samefn(a, i):
             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
@@ -835,31 +844,62 @@ class ProjectDirectory(Directory):
                     self.project_object = self.api.users().get(
                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
 
-                contents = arvados.util.list_all(self.api.groups().contents,
-                                                 self.num_retries, uuid=self.project_uuid)
+                contents = arvados.util.list_all(self.api.groups().list,
+                                                 self.num_retries,
+                                                 filters=[["owner_uuid", "=", self.project_uuid],
+                                                          ["group_class", "=", "project"]])
+                contents.extend(arvados.util.list_all(self.api.collections().list,
+                                                      self.num_retries,
+                                                      filters=[["owner_uuid", "=", self.project_uuid]]))
 
             # end with llfuse.lock_released, re-acquire lock
 
             self.merge(contents,
-                       namefn,
+                       self.namefn,
                        samefn,
                        self.createDirectory)
         finally:
             self._updating_lock.release()
 
+    def _add_entry(self, i, name):
+        ent = self.createDirectory(i)
+        self._entries[name] = self.inodes.add_entry(ent)
+        return self._entries[name]
+
     @use_counter
     @check_update
-    def __getitem__(self, item):
-        if item == '.arvados#project':
+    def __getitem__(self, k):
+        if k == '.arvados#project':
             return self.project_object_file
-        else:
-            return super(ProjectDirectory, self).__getitem__(item)
+        elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
+            return super(ProjectDirectory, self).__getitem__(k)
+        with llfuse.lock_released:
+            contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                       ["group_class", "=", "project"],
+                                                       ["name", "=", k]],
+                                              limit=1).execute(num_retries=self.num_retries)["items"]
+            if not contents:
+                contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                                ["name", "=", k]],
+                                                       limit=1).execute(num_retries=self.num_retries)["items"]
+        if contents:
+            name = sanitize_filename(self.namefn(contents[0]))
+            if name != k:
+                raise KeyError(k)
+            return self._add_entry(contents[0], name)
+
+        # Didn't find item
+        raise KeyError(k)
 
     def __contains__(self, k):
         if k == '.arvados#project':
             return True
-        else:
-            return super(ProjectDirectory, self).__contains__(k)
+        try:
+            self[k]
+            return True
+        except KeyError:
+            pass
+        return False
 
     @use_counter
     @check_update
@@ -925,6 +965,51 @@ class ProjectDirectory(Directory):
         self._entries[name_new] = ent
         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
 
+    @use_counter
+    def child_event(self, ev):
+        properties = ev.get("properties") or {}
+        old_attrs = properties.get("old_attributes") or {}
+        new_attrs = properties.get("new_attributes") or {}
+        old_attrs["uuid"] = ev["object_uuid"]
+        new_attrs["uuid"] = ev["object_uuid"]
+        old_name = sanitize_filename(self.namefn(old_attrs))
+        new_name = sanitize_filename(self.namefn(new_attrs))
+
+        # create events will have a new name, but not an old name
+        # delete events will have an old name, but not a new name
+        # update events will have an old and new name, and they may be same or different
+        # if they are the same, an unrelated field changed and there is nothing to do.
+
+        if old_attrs.get("owner_uuid") != self.project_uuid:
+            # Was moved from somewhere else, so don't try to remove entry.
+            old_name = None
+        if ev.get("object_owner_uuid") != self.project_uuid:
+            # Was moved to somewhere else, so don't try to add entry
+            new_name = None
+
+        if ev.get("object_kind") == "arvados#collection":
+            if old_attrs.get("is_trashed"):
+                # Was previously deleted
+                old_name = None
+            if new_attrs.get("is_trashed"):
+                # Has been deleted
+                new_name = None
+
+        if new_name != old_name:
+            ent = None
+            if old_name in self._entries:
+                ent = self._entries[old_name]
+                del self._entries[old_name]
+                self.inodes.invalidate_entry(self.inode, old_name.encode(self.inodes.encoding))
+
+            if new_name:
+                if ent is not None:
+                    self._entries[new_name] = ent
+                else:
+                    self._add_entry(new_attrs, new_name)
+            elif ent is not None:
+                self.inodes.del_entry(ent)
+
 
 class SharedDirectory(Directory):
     """A special directory that represents users or groups who have shared projects with me."""
index 225e4b2d22bc50d8dd8a7a97fae8cf767cc3d638..ec8868af7d799857d0eba14e8478f3030d9969cd 100644 (file)
@@ -220,66 +220,62 @@ class FuseTagsUpdateTest(MountTestBase):
             attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
 
 
+def fuseSharedTestHelper(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            # Double check that we can open and read objects in this folder as a file,
+            # and that its contents are what we expect.
+            baz_path = os.path.join(
+                mounttmp,
+                'FUSE User',
+                'FUSE Test Project',
+                'collection in FUSE project',
+                'baz')
+            with open(baz_path) as f:
+                self.assertEqual("baz", f.read())
+
+            # check mtime on collection
+            st = os.stat(baz_path)
+            try:
+                mtime = st.st_mtime_ns / 1000000000
+            except AttributeError:
+                mtime = st.st_mtime
+            self.assertEqual(mtime, 1391448174)
+
+            # shared_dirs is a list of the directories exposed
+            # by fuse.SharedDirectory (i.e. any object visible
+            # to the current user)
+            shared_dirs = llfuse.listdir(mounttmp)
+            shared_dirs.sort()
+            self.assertIn('FUSE User', shared_dirs)
+
+            # fuse_user_objs is a list of the objects owned by the FUSE
+            # test user (which present as files in the 'FUSE User'
+            # directory)
+            fuse_user_objs = llfuse.listdir(os.path.join(mounttmp, 'FUSE User'))
+            fuse_user_objs.sort()
+            self.assertEqual(['FUSE Test Project',                    # project owned by user
+                              'collection #1 owned by FUSE',          # collection owned by user
+                              'collection #2 owned by FUSE'          # collection owned by user
+                          ], fuse_user_objs)
+
+            # test_proj_files is a list of the files in the FUSE Test Project.
+            test_proj_files = llfuse.listdir(os.path.join(mounttmp, 'FUSE User', 'FUSE Test Project'))
+            test_proj_files.sort()
+            self.assertEqual(['collection in FUSE project'
+                          ], test_proj_files)
+
+
+    Test().runTest()
+
 class FuseSharedTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.SharedDirectory,
                         exclude=self.api.users().current().execute()['uuid'])
+        keep = arvados.keep.KeepClient()
+        keep.put("baz")
 
-        # shared_dirs is a list of the directories exposed
-        # by fuse.SharedDirectory (i.e. any object visible
-        # to the current user)
-        shared_dirs = llfuse.listdir(self.mounttmp)
-        shared_dirs.sort()
-        self.assertIn('FUSE User', shared_dirs)
-
-        # fuse_user_objs is a list of the objects owned by the FUSE
-        # test user (which present as files in the 'FUSE User'
-        # directory)
-        fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
-        fuse_user_objs.sort()
-        self.assertEqual(['FUSE Test Project',                    # project owned by user
-                          'collection #1 owned by FUSE',          # collection owned by user
-                          'collection #2 owned by FUSE',          # collection owned by user
-                          'pipeline instance owned by FUSE.pipelineInstance',  # pipeline instance owned by user
-                      ], fuse_user_objs)
-
-        # test_proj_files is a list of the files in the FUSE Test Project.
-        test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
-        test_proj_files.sort()
-        self.assertEqual(['collection in FUSE project',
-                          'pipeline instance in FUSE project.pipelineInstance',
-                          'pipeline template in FUSE project.pipelineTemplate'
-                      ], test_proj_files)
-
-        # Double check that we can open and read objects in this folder as a file,
-        # and that its contents are what we expect.
-        pipeline_template_path = os.path.join(
-                self.mounttmp,
-                'FUSE User',
-                'FUSE Test Project',
-                'pipeline template in FUSE project.pipelineTemplate')
-        with open(pipeline_template_path) as f:
-            j = json.load(f)
-            self.assertEqual("pipeline template in FUSE project", j['name'])
-
-        # check mtime on template
-        st = os.stat(pipeline_template_path)
-        try:
-            mtime = st.st_mtime_ns / 1000000000
-        except AttributeError:
-            mtime = st.st_mtime
-        self.assertEqual(mtime, 1397493304)
-
-        # check mtime on collection
-        st = os.stat(os.path.join(
-                self.mounttmp,
-                'FUSE User',
-                'collection #1 owned by FUSE'))
-        try:
-            mtime = st.st_mtime_ns / 1000000000
-        except AttributeError:
-            mtime = st.st_mtime
-        self.assertEqual(mtime, 1391448174)
+        self.pool.apply(fuseSharedTestHelper, (self.mounttmp,))
 
 
 class FuseHomeTest(MountTestBase):
index 23c38ca52351427ddc749dafe3c004edf4297d8e..67d46f6716b2ce22b09cb6b1204b08fdb3c3dd96 100644 (file)
@@ -21,14 +21,16 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
 type handler struct {
-       Config     *Config
-       clientPool *arvadosclient.ClientPool
-       setupOnce  sync.Once
+       Config        *Config
+       clientPool    *arvadosclient.ClientPool
+       setupOnce     sync.Once
+       healthHandler http.Handler
 }
 
 // parseCollectionIDFromDNSName returns a UUID or PDH if s begins with
@@ -70,7 +72,13 @@ func parseCollectionIDFromURL(s string) string {
 
 func (h *handler) setup() {
        h.clientPool = arvadosclient.MakeClientPool()
+
        keepclient.RefreshServiceDiscoveryOnSIGHUP()
+
+       h.healthHandler = &health.Handler{
+               Token:  h.Config.ManagementToken,
+               Prefix: "/_health/",
+       }
 }
 
 func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
@@ -110,6 +118,11 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                httpserver.Log(remoteAddr, statusCode, statusText, w.WroteBodyBytes(), r.Method, r.Host, r.URL.Path, r.URL.RawQuery)
        }()
 
+       if strings.HasPrefix(r.URL.Path, "/_health/") && r.Method == "GET" {
+               h.healthHandler.ServeHTTP(w, r)
+               return
+       }
+
        if r.Method == "OPTIONS" {
                method := r.Header.Get("Access-Control-Request-Method")
                if method != "GET" && method != "POST" {
index 1d03b90a3a60e604ef2023a06dada39b000d6f97..04859595e6f337b7975f9305e05189c5aa6f3ce2 100644 (file)
@@ -589,3 +589,24 @@ func (s *IntegrationSuite) TestDirectoryListing(c *check.C) {
                }
        }
 }
+
+func (s *IntegrationSuite) TestHealthCheckPing(c *check.C) {
+       s.testServer.Config.ManagementToken = arvadostest.ManagementToken
+       authHeader := http.Header{
+               "Authorization": {"Bearer " + arvadostest.ManagementToken},
+       }
+
+       resp := httptest.NewRecorder()
+       u := mustParseURL("http://download.example.com/_health/ping")
+       req := &http.Request{
+               Method:     "GET",
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header:     authHeader,
+       }
+       s.testServer.Handler.ServeHTTP(resp, req)
+
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, `{"health":"OK"}\n`)
+}
index c4103f409723fbc3c7bbbc3bbd8edcbf7184be42..585dab13e3cbf0ce68ef620c6dad31e4a17c974b 100644 (file)
@@ -34,6 +34,9 @@ type Config struct {
        // Hack to support old command line flag, which is a bool
        // meaning "get actual token from environment".
        deprecatedAllowAnonymous bool
+
+       //Authorization token to be included in all health check requests.
+       ManagementToken string
 }
 
 // DefaultConfig returns the default configuration.
@@ -76,6 +79,9 @@ func main() {
                "Only serve attachments at the given `host:port`"+deprecated)
        flag.BoolVar(&cfg.TrustAllContent, "trust-all-content", false,
                "Serve non-public content from a single origin. Dangerous: read docs before using!"+deprecated)
+       flag.StringVar(&cfg.ManagementToken, "management-token", "",
+               "Authorization token to be included in all health check requests.")
+
        dumpConfig := flag.Bool("dump-config", false,
                "write current configuration to stdout and exit")
        flag.Usage = usage
index 682435515ab5a05ceae51eef2feacffc28e85c58..3d1b4476255d0cadf2274ad12b9aae78f164f72f 100644 (file)
@@ -24,6 +24,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "github.com/coreos/go-systemd/daemon"
        "github.com/ghodss/yaml"
@@ -39,6 +40,7 @@ type Config struct {
        Timeout         arvados.Duration
        PIDFile         string
        Debug           bool
+       ManagementToken string
 }
 
 func DefaultConfig() *Config {
@@ -66,6 +68,7 @@ func main() {
        flagset.IntVar(&cfg.DefaultReplicas, "default-replicas", cfg.DefaultReplicas, "Default number of replicas to write if not specified by the client. If 0, use site default."+deprecated)
        flagset.StringVar(&cfg.PIDFile, "pid", cfg.PIDFile, "Path to write pid file."+deprecated)
        timeoutSeconds := flagset.Int("timeout", int(time.Duration(cfg.Timeout)/time.Second), "Timeout (in seconds) on requests to internal Keep services."+deprecated)
+       flagset.StringVar(&cfg.ManagementToken, "management-token", cfg.ManagementToken, "Authorization token to be included in all health check requests.")
 
        var cfgPath string
        const defaultCfgPath = "/etc/arvados/keepproxy/keepproxy.yml"
@@ -160,7 +163,7 @@ func main() {
        signal.Notify(term, syscall.SIGINT)
 
        // Start serving requests.
-       router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout))
+       router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
        http.Serve(listener, router)
 
        log.Println("shutting down")
@@ -250,7 +253,7 @@ type proxyHandler struct {
 
 // MakeRESTRouter returns an http.Handler that passes GET and PUT
 // requests to the appropriate handlers.
-func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient, timeout time.Duration) http.Handler {
+func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient, timeout time.Duration, mgmtToken string) http.Handler {
        rest := mux.NewRouter()
 
        transport := *(http.DefaultTransport.(*http.Transport))
@@ -292,6 +295,11 @@ func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient,
                rest.HandleFunc(`/`, h.Options).Methods("OPTIONS")
        }
 
+       rest.Handle("/_health/{check}", &health.Handler{
+               Token:  mgmtToken,
+               Prefix: "/_health/",
+       }).Methods("GET")
+
        rest.NotFoundHandler = InvalidPathHandler{}
        return h
 }
index 94be8e23ed1918d93d0d783814103dc4ee956a4c..25619bbb91a3629d8d90563d051b8a31578ed121 100644 (file)
@@ -189,7 +189,7 @@ func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
        // fixes the invalid Content-Length header. In order to test
        // our server behavior, we have to call the handler directly
        // using an httptest.ResponseRecorder.
-       rtr := MakeRESTRouter(true, true, kc, 10*time.Second)
+       rtr := MakeRESTRouter(true, true, kc, 10*time.Second, "")
 
        type testcase struct {
                sendLength   string
@@ -597,3 +597,21 @@ func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
                c.Check(err, ErrorMatches, `.*HTTP 502.*`)
        }
 }
+
+func (s *ServerRequiredSuite) TestPing(c *C) {
+       kc := runProxy(c, nil, false)
+       defer closeListener()
+
+       rtr := MakeRESTRouter(true, true, kc, 10*time.Second, arvadostest.ManagementToken)
+
+       req, err := http.NewRequest("GET",
+               "http://"+listener.Addr().String()+"/_health/ping",
+               nil)
+       c.Assert(err, IsNil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+
+       resp := httptest.NewRecorder()
+       rtr.ServeHTTP(resp, req)
+       c.Check(resp.Code, Equals, 200)
+       c.Assert(strings.Contains(resp.Body.String(), `{"health":"OK"}`), Equals, true)
+}
index 97509699b8b4cfe4e4d9d37bd9c8573889673329..6d3d21e6f25217744941957a0a2a869fa02fde33 100644 (file)
@@ -82,5 +82,9 @@ Debug:
 
     Enable debug logging.
 
+ManagementToken:
+
+    Authorization token to be included in all health check requests.
+
 `, exampleConfigFile)
 }
index 0fe773a59e278b93264bf1d63457a14d9b709ef8..0ab3e969a0ebaa3f0d007e0c764a1e7507f6ec8a 100644 (file)
@@ -133,7 +133,7 @@ func init() {
                &s3UnsafeDelete,
                "s3-unsafe-delete",
                false,
-               "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
+               "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
 }
 
 // S3Volume implements Volume using an S3 bucket.
index 1ba4e375a50c5b84b6fb321e38088537bad31b1a..e47f9fcb1d036b78f94af0af25e8c37dc17b5ad0 100644 (file)
@@ -58,7 +58,8 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                        'watchdog': '600',
                        'node_mem_scaling': '0.95'},
             'Manage': {'address': '127.0.0.1',
-                       'port': '-1'},
+                       'port': '-1',
+                       'ManagementToken': ''},
             'Logging': {'file': '/dev/stderr',
                         'level': 'WARNING'}
         }.iteritems():
index ca914e1096def7d28a9be41e90ffcbac2d01d203..4d2d3e0c0ace3e6ff9db5832d3f8a9dcc4b7ad9a 100644 (file)
@@ -8,9 +8,12 @@ from __future__ import absolute_import, print_function
 import logging
 import subprocess
 
+import arvados.util
+
 from . import clientactor
 from .config import ARVADOS_ERRORS
 
+
 class ServerCalculator(object):
     """Generate cloud server wishlists from an Arvados job queue.
 
@@ -58,7 +61,6 @@ class ServerCalculator(object):
         self.max_nodes = max_nodes or float('inf')
         self.max_price = max_price or float('inf')
         self.logger = logging.getLogger('arvnodeman.jobqueue')
-        self.logged_jobs = set()
 
         self.logger.info("Using cloud node sizes:")
         for s in self.cloud_sizes:
@@ -83,20 +85,26 @@ class ServerCalculator(object):
 
     def servers_for_queue(self, queue):
         servers = []
-        seen_jobs = set()
+        unsatisfiable_jobs = {}
         for job in queue:
-            seen_jobs.add(job['uuid'])
             constraints = job['runtime_constraints']
             want_count = max(1, self.coerce_int(constraints.get('min_nodes'), 1))
             cloud_size = self.cloud_size_for_constraints(constraints)
             if cloud_size is None:
-                if job['uuid'] not in self.logged_jobs:
-                    self.logged_jobs.add(job['uuid'])
-                    self.logger.debug("job %s not satisfiable", job['uuid'])
-            elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
+                unsatisfiable_jobs[job['uuid']] = (
+                    'Requirements for a single node exceed the available '
+                    'cloud node size')
+            elif (want_count > self.max_nodes):
+                unsatisfiable_jobs[job['uuid']] = (
+                    "Job's min_nodes constraint is greater than the configured "
+                    "max_nodes (%d)" % self.max_nodes)
+            elif (want_count*cloud_size.price <= self.max_price):
                 servers.extend([cloud_size.real] * want_count)
-        self.logged_jobs.intersection_update(seen_jobs)
-        return servers
+            else:
+                unsatisfiable_jobs[job['uuid']] = (
+                    "Job's price (%d) is above system's max_price "
+                    "limit (%d)" % (want_count*cloud_size.price, self.max_price))
+        return (servers, unsatisfiable_jobs)
 
     def cheapest_size(self):
         return self.cloud_sizes[0]
@@ -107,6 +115,7 @@ class ServerCalculator(object):
                 return s
         return None
 
+
 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     """Actor to generate server wishlists from the job queue.
 
@@ -147,7 +156,7 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
             for out in squeue_out.splitlines():
                 try:
                     cpu, ram, disk, reason, jobname = out.split("|", 4)
-                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
+                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
                         queuelist.append({
                             "uuid": jobname,
                             "runtime_constraints": {
@@ -165,7 +174,28 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
         return queuelist
 
     def _got_response(self, queue):
-        server_list = self._calculator.servers_for_queue(queue)
+        server_list, unsatisfiable_jobs = self._calculator.servers_for_queue(queue)
+        # Cancel any job/container with unsatisfiable requirements, emitting
+        # a log explaining why.
+        for job_uuid, reason in unsatisfiable_jobs.iteritems():
+            try:
+                self._client.logs().create(body={
+                    'object_uuid': job_uuid,
+                    'event_type': 'stderr',
+                    'properties': {'text': reason},
+                }).execute()
+                # Cancel the job depending on its type
+                if arvados.util.container_uuid_pattern.match(job_uuid):
+                    subprocess.check_call(['scancel', '--name='+job_uuid])
+                elif arvados.util.job_uuid_pattern.match(job_uuid):
+                    self._client.jobs().cancel(uuid=job_uuid).execute()
+                else:
+                    raise Exception('Unknown job type')
+                self._logger.debug("Cancelled unsatisfiable job '%s'", job_uuid)
+            except Exception as error:
+                self._logger.error("Trying to cancel job '%s': %s",
+                                   job_uuid,
+                                   error)
         self._logger.debug("Calculated wishlist: %s",
                            ', '.join(s.name for s in server_list) or "(empty)")
         return super(JobQueueMonitorActor, self)._got_response(server_list)
index 85719d3069103e8f3d14296b8ff3946241973ac3..cfd611285cffc91933fefb15a2dee37e14995c7e 100644 (file)
@@ -41,12 +41,34 @@ class Handler(http.server.BaseHTTPRequestHandler, object):
             self.send_header('Content-type', 'application/json')
             self.end_headers()
             self.wfile.write(tracker.get_json())
+        elif self.path == '/_health/ping':
+            code, msg = self.check_auth()
+
+            if code != 200:
+              self.send_response(code)
+              self.wfile.write(msg)
+            else:
+              self.send_response(200)
+              self.send_header('Content-type', 'application/json')
+              self.end_headers()
+              self.wfile.write(json.dumps({"health":"OK"}))
         else:
             self.send_response(404)
 
     def log_message(self, fmt, *args, **kwargs):
         _logger.info(fmt, *args, **kwargs)
 
+    def check_auth(self):
+        mgmt_token = self.server._config.get('Manage', 'ManagementToken')
+        auth_header = self.headers.get('Authorization', None)
+
+        if mgmt_token == '':
+          return 404, "disabled"
+        elif auth_header == None:
+          return 401, "authorization required"
+        elif auth_header != 'Bearer '+mgmt_token:
+          return 403, "authorization error"
+        return 200, ""
 
 class Tracker(object):
     def __init__(self):
index feba3ce185caaf46517adc988fd63fafbc4985b1..bdd3ffdcb7e0a247676ddbac49943ce32b25eb30 100755 (executable)
@@ -40,6 +40,7 @@ detail.addHandler(logging.StreamHandler(detail_content))
 fake_slurm = None
 compute_nodes = None
 all_jobs = None
+unsatisfiable_job_scancelled = None
 
 def update_script(path, val):
     with open(path+"_", "w") as f:
@@ -54,6 +55,33 @@ def set_squeue(g):
                   "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
     return 0
 
+def set_queue_unsatisfiable(g):
+    global all_jobs, unsatisfiable_job_scancelled
+    # Simulate a job requesting a 99 core node.
+    update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
+                  "\n".join("echo '99|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+    update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
+                  "\ntouch %s" % unsatisfiable_job_scancelled)
+    return 0
+
+def job_cancelled(g):
+    global unsatisfiable_job_scancelled
+    cancelled_job = g.group(1)
+    api = arvados.api('v1')
+    # Check that 'scancel' was called
+    if not os.path.isfile(unsatisfiable_job_scancelled):
+        return 1
+    # Check for the log entry
+    log_entry = api.logs().list(
+        filters=[
+            ['object_uuid', '=', cancelled_job],
+            ['event_type', '=', 'stderr'],
+        ]).execute()['items'][0]
+    if not re.match(
+            r"Requirements for a single node exceed the available cloud node size",
+            log_entry['properties']['text']):
+        return 1
+    return 0
 
 def node_paired(g):
     global compute_nodes
@@ -115,6 +143,9 @@ def expect_count(count, checks, pattern, g):
 
 def run_test(name, actions, checks, driver_class, jobs, provider):
     code = 0
+    global unsatisfiable_job_scancelled
+    unsatisfiable_job_scancelled = os.path.join(tempfile.mkdtemp(),
+                                                "scancel_called")
 
     # Delete any stale node records
     api = arvados.api('v1')
@@ -159,7 +190,7 @@ def run_test(name, actions, checks, driver_class, jobs, provider):
 
     # Test main loop:
     # - Read line
-    # - Apply negative checks (thinks that are not supposed to happen)
+    # - Apply negative checks (things that are not supposed to happen)
     # - Check timeout
     # - Check if the next action should trigger
     # - If all actions are exhausted, terminate with test success
@@ -213,6 +244,7 @@ def run_test(name, actions, checks, driver_class, jobs, provider):
         code = 1
 
     shutil.rmtree(fake_slurm)
+    shutil.rmtree(os.path.dirname(unsatisfiable_job_scancelled))
 
     if code == 0:
         logger.info("%s passed", name)
@@ -228,6 +260,23 @@ def main():
     # Test lifecycle.
 
     tests = {
+        "test_unsatisfiable_jobs" : (
+            # Actions (pattern -> action)
+            [
+                (r".*Daemon started", set_queue_unsatisfiable),
+                (r".*Cancelled unsatisfiable job '(\S+)'", job_cancelled),
+            ],
+            # Checks (things that shouldn't happen)
+            {
+                r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": fail,
+                r".*Trying to cancel job '(\S+)'": fail,
+            },
+            # Driver class
+            "arvnodeman.test.fake_driver.FakeDriver",
+            # Jobs
+            {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+            # Provider
+            "azure"),
         "test_single_node_azure": (
             [
                 (r".*Daemon started", set_squeue),
index 8aa0835aca395af7594659e6c836e6245d189494..669b6247114c0f4843f5c2dd51eb9f9d4c00d4a9 100644 (file)
@@ -24,63 +24,69 @@ class ServerCalculatorTestCase(unittest.TestCase):
 
     def test_empty_queue_needs_no_servers(self):
         servcalc = self.make_calculator([1])
-        self.assertEqual([], servcalc.servers_for_queue([]))
+        self.assertEqual(([], {}), servcalc.servers_for_queue([]))
 
     def test_easy_server_count(self):
         servcalc = self.make_calculator([1])
-        servlist = self.calculate(servcalc, {'min_nodes': 3})
+        servlist, _ = self.calculate(servcalc, {'min_nodes': 3})
         self.assertEqual(3, len(servlist))
 
     def test_default_5pct_ram_value_decrease(self):
         servcalc = self.make_calculator([1])
-        servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
+        servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
         self.assertEqual(0, len(servlist))
-        servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 121})
+        servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 121})
         self.assertEqual(1, len(servlist))
 
     def test_custom_node_mem_scaling_factor(self):
         # Simulate a custom 'node_mem_scaling' config parameter by passing
         # the value to ServerCalculator
         servcalc = self.make_calculator([1], node_mem_scaling=0.5)
-        servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
+        servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 128})
         self.assertEqual(0, len(servlist))
-        servlist = self.calculate(servcalc, {'min_ram_mb_per_node': 64})
+        servlist, _ = self.calculate(servcalc, {'min_ram_mb_per_node': 64})
         self.assertEqual(1, len(servlist))
 
     def test_implicit_server_count(self):
         servcalc = self.make_calculator([1])
-        servlist = self.calculate(servcalc, {}, {'min_nodes': 3})
+        servlist, _ = self.calculate(servcalc, {}, {'min_nodes': 3})
         self.assertEqual(4, len(servlist))
 
     def test_bad_min_nodes_override(self):
         servcalc = self.make_calculator([1])
-        servlist = self.calculate(servcalc,
-                                  {'min_nodes': -2}, {'min_nodes': 'foo'})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_nodes': -2}, {'min_nodes': 'foo'})
         self.assertEqual(2, len(servlist))
 
-    def test_ignore_unsatisfiable_jobs(self):
+    def test_ignore_and_return_unsatisfiable_jobs(self):
         servcalc = self.make_calculator([1], max_nodes=9)
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 2},
-                                  {'min_ram_mb_per_node': 256},
-                                  {'min_nodes': 6},
-                                  {'min_nodes': 12},
-                                  {'min_scratch_mb_per_node': 300000})
+        servlist, u_jobs = self.calculate(servcalc,
+                                          {'min_cores_per_node': 2},
+                                          {'min_ram_mb_per_node': 256},
+                                          {'min_nodes': 6},
+                                          {'min_nodes': 12},
+                                          {'min_scratch_mb_per_node': 300000})
         self.assertEqual(6, len(servlist))
+        # Only unsatisfiable jobs are returned on u_jobs
+        self.assertIn('zzzzz-jjjjj-000000000000000', u_jobs.keys())
+        self.assertIn('zzzzz-jjjjj-000000000000001', u_jobs.keys())
+        self.assertNotIn('zzzzz-jjjjj-000000000000002', u_jobs.keys())
+        self.assertIn('zzzzz-jjjjj-000000000000003', u_jobs.keys())
+        self.assertIn('zzzzz-jjjjj-000000000000004', u_jobs.keys())
 
     def test_ignore_too_expensive_jobs(self):
         servcalc = self.make_calculator([1, 2], max_nodes=12, max_price=6)
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 1, 'min_nodes': 6})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 1, 'min_nodes': 6})
         self.assertEqual(6, len(servlist))
 
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 2, 'min_nodes': 6})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 2, 'min_nodes': 6})
         self.assertEqual(0, len(servlist))
 
     def test_job_requesting_max_nodes_accepted(self):
         servcalc = self.make_calculator([1], max_nodes=4)
-        servlist = self.calculate(servcalc, {'min_nodes': 4})
+        servlist, _ = self.calculate(servcalc, {'min_nodes': 4})
         self.assertEqual(4, len(servlist))
 
     def test_cheapest_size(self):
@@ -89,37 +95,37 @@ class ServerCalculatorTestCase(unittest.TestCase):
 
     def test_next_biggest(self):
         servcalc = self.make_calculator([1, 2, 4, 8])
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 3},
-                                  {'min_cores_per_node': 6})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 3},
+                                     {'min_cores_per_node': 6})
         self.assertEqual([servcalc.cloud_sizes[2].id,
                           servcalc.cloud_sizes[3].id],
                          [s.id for s in servlist])
 
     def test_multiple_sizes(self):
         servcalc = self.make_calculator([1, 2])
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 2},
-                                  {'min_cores_per_node': 1},
-                                  {'min_cores_per_node': 1})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 2},
+                                     {'min_cores_per_node': 1},
+                                     {'min_cores_per_node': 1})
         self.assertEqual([servcalc.cloud_sizes[1].id,
                           servcalc.cloud_sizes[0].id,
                           servcalc.cloud_sizes[0].id],
                          [s.id for s in servlist])
 
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 1},
-                                  {'min_cores_per_node': 2},
-                                  {'min_cores_per_node': 1})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 1},
+                                     {'min_cores_per_node': 2},
+                                     {'min_cores_per_node': 1})
         self.assertEqual([servcalc.cloud_sizes[0].id,
                           servcalc.cloud_sizes[1].id,
                           servcalc.cloud_sizes[0].id],
                          [s.id for s in servlist])
 
-        servlist = self.calculate(servcalc,
-                                  {'min_cores_per_node': 1},
-                                  {'min_cores_per_node': 1},
-                                  {'min_cores_per_node': 2})
+        servlist, _ = self.calculate(servcalc,
+                                     {'min_cores_per_node': 1},
+                                     {'min_cores_per_node': 1},
+                                     {'min_cores_per_node': 2})
         self.assertEqual([servcalc.cloud_sizes[0].id,
                           servcalc.cloud_sizes[0].id,
                           servcalc.cloud_sizes[1].id],
@@ -131,16 +137,38 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
                                    unittest.TestCase):
     TEST_CLASS = jobqueue.JobQueueMonitorActor
 
+
     class MockCalculator(object):
         @staticmethod
         def servers_for_queue(queue):
-            return [testutil.MockSize(n) for n in queue]
+            return ([testutil.MockSize(n) for n in queue], {})
+
+
+    class MockCalculatorUnsatisfiableJobs(object):
+        @staticmethod
+        def servers_for_queue(queue):
+            return ([], {k["uuid"]: "Unsatisfiable job mock" for k in queue})
 
 
     def build_monitor(self, side_effect, *args, **kwargs):
         super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs)
         self.client.jobs().queue().execute.side_effect = side_effect
 
+    @mock.patch("subprocess.check_call")
+    @mock.patch("subprocess.check_output")
+    def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
+        #mock_scancel.return_value = ""
+        job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
+        container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
+        mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
+
+        self.build_monitor([{'items': [{'uuid': job_uuid}]}],
+                           self.MockCalculatorUnsatisfiableJobs(), True, True)
+        self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
+        self.client.jobs().cancel.assert_called_with(uuid=job_uuid)
+        mock_scancel.assert_called_with(['scancel', '--name='+container_uuid])
+
     @mock.patch("subprocess.check_output")
     def test_subscribers_get_server_lists(self, mock_squeue):
         mock_squeue.return_value = ""
index a18eff3a5955b0dc0faf7c7bfcbf3b7192633a6c..a236e4f0eecd1d2ba30c3e01adc227e245aba03e 100644 (file)
@@ -6,6 +6,7 @@
 from __future__ import absolute_import, print_function
 from future import standard_library
 
+import json
 import requests
 import unittest
 
@@ -14,10 +15,15 @@ import arvnodeman.config as config
 
 
 class TestServer(object):
+    def __init__(self, management_token=None):
+        self.mgmt_token = management_token
+
     def __enter__(self):
         cfg = config.NodeManagerConfig()
         cfg.set('Manage', 'port', '0')
         cfg.set('Manage', 'address', '127.0.0.1')
+        if self.mgmt_token != None:
+            cfg.set('Manage', 'ManagementToken', self.mgmt_token)
         self.srv = status.Server(cfg)
         self.srv.start()
         addr, port = self.srv.server_address
@@ -33,6 +39,11 @@ class TestServer(object):
     def get_status(self):
         return self.get_status_response().json()
 
+    def get_healthcheck_ping(self, auth_header=None):
+        headers = {}
+        if auth_header != None:
+            headers['Authorization'] = auth_header
+        return requests.get(self.srv_base+'/_health/ping', headers=headers)
 
 class StatusServerUpdates(unittest.TestCase):
     def test_updates(self):
@@ -56,3 +67,32 @@ class StatusServerDisabled(unittest.TestCase):
         self.srv.start()
         self.assertFalse(self.srv.enabled)
         self.assertFalse(getattr(self.srv, '_thread', False))
+
+class HealthcheckPing(unittest.TestCase):
+    def test_ping_disabled(self):
+        with TestServer() as srv:
+            r = srv.get_healthcheck_ping()
+            self.assertEqual(404, r.status_code)
+
+    def test_ping_no_auth(self):
+        with TestServer('configuredmanagementtoken') as srv:
+            r = srv.get_healthcheck_ping()
+            self.assertEqual(401, r.status_code)
+
+    def test_ping_bad_auth_format(self):
+        with TestServer('configuredmanagementtoken') as srv:
+            r = srv.get_healthcheck_ping('noBearer')
+            self.assertEqual(403, r.status_code)
+
+    def test_ping_bad_auth_token(self):
+        with TestServer('configuredmanagementtoken') as srv:
+            r = srv.get_healthcheck_ping('Bearer badtoken')
+            self.assertEqual(403, r.status_code)
+
+    def test_ping_success(self):
+        with TestServer('configuredmanagementtoken') as srv:
+            r = srv.get_healthcheck_ping('Bearer configuredmanagementtoken')
+            self.assertEqual(200, r.status_code)
+            self.assertEqual('application/json', r.headers['content-type'])
+            resp = r.json()
+            self.assertEqual('{"health": "OK"}', json.dumps(resp))
index 938f6a438172c668aba980d9a585baaaf9ba846f..f9c46e7325aed6358e1cc8296d0a364fea27a908 100644 (file)
@@ -4,21 +4,24 @@
        "package": [
                {
                        "checksumSHA1": "jf7K+UTQNIzRdlG5F4zX/8b++/E=",
+                       "origin": "github.com/curoverse/goamz/aws",
                        "path": "github.com/AdRoll/goamz/aws",
-                       "revision": "c5d7d9bd6c743fae44efc6c18450282022445ffc",
-                       "revisionTime": "2017-02-25T09:28:51Z"
+                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
-                       "checksumSHA1": "ey9ddXTW9dncjJz/COKpeYm+sgg=",
+                       "checksumSHA1": "9nUwQXI+pNxZo6bnR7NslpMpfPI=",
+                       "origin": "github.com/curoverse/goamz/s3",
                        "path": "github.com/AdRoll/goamz/s3",
-                       "revision": "c5d7d9bd6c743fae44efc6c18450282022445ffc",
-                       "revisionTime": "2017-02-25T09:28:51Z"
+                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
-                       "checksumSHA1": "pDHYVqUQtRsPYw/X4kUrdK7pxMs=",
+                       "checksumSHA1": "tvxbsTkdjB0C/uxEglqD6JfVnMg=",
+                       "origin": "github.com/curoverse/goamz/s3/s3test",
                        "path": "github.com/AdRoll/goamz/s3/s3test",
-                       "revision": "c5d7d9bd6c743fae44efc6c18450282022445ffc",
-                       "revisionTime": "2017-02-25T09:28:51Z"
+                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
                        "checksumSHA1": "Rjy2uYZkQ8Kjht6ZFU0qzm2I/kI=",
                        "revision": "1620af6b32398bfc91827ceae54a8cc1f55df04d",
                        "revisionTime": "2016-12-14T20:08:43Z"
                },
-               {
-                       "checksumSHA1": "qjY3SPlNvqT179DPiRaIsRhYZQI=",
-                       "origin": "github.com/docker/docker/vendor/github.com/docker/distribution",
-                       "path": "github.com/docker/distribution",
-                       "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
-                       "revisionTime": "2017-05-17T20:48:28Z"
-               },
-               {
-                       "checksumSHA1": "0au+tD+jymXNssdb1JgcctY7PN4=",
-                       "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/context",
-                       "path": "github.com/docker/distribution/context",
-                       "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
-                       "revisionTime": "2017-05-17T20:48:28Z"
-               },
                {
                        "checksumSHA1": "Gj+xR1VgFKKmFXYOJMnAczC3Znk=",
                        "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/digestset",
                        "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
                        "revisionTime": "2017-05-17T20:48:28Z"
                },
-               {
-                       "checksumSHA1": "oYy5Q1HBImMQvh9t96cmNzWar80=",
-                       "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/manifest",
-                       "path": "github.com/docker/distribution/manifest",
-                       "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
-                       "revisionTime": "2017-05-17T20:48:28Z"
-               },
-               {
-                       "checksumSHA1": "SK1g7ll2cPbgDyWpK0oVT9beVZY=",
-                       "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/manifest/manifestlist",
-                       "path": "github.com/docker/distribution/manifest/manifestlist",
-                       "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
-                       "revisionTime": "2017-05-17T20:48:28Z"
-               },
                {
                        "checksumSHA1": "m4wEFD0Mh+ClfprUqgl0GyNmk7Q=",
                        "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/reference",
                        "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
                        "revisionTime": "2017-05-17T20:48:28Z"
                },
-               {
-                       "checksumSHA1": "cNp7rNReJHvdSfrIetXS9RGsLSo=",
-                       "origin": "github.com/docker/docker/vendor/github.com/docker/distribution/uuid",
-                       "path": "github.com/docker/distribution/uuid",
-                       "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",
-                       "revisionTime": "2017-05-17T20:48:28Z"
-               },
                {
                        "checksumSHA1": "5b7eC73lORtIUFCjz548jXkLlKU=",
                        "path": "github.com/docker/docker/api",
                        "revisionTime": "2017-03-24T20:46:54Z"
                },
                {
-                       "checksumSHA1": "Gk3jTNQ5uGDUE0WMJFWcYz9PMps=",
+                       "checksumSHA1": "q5SZBWFVC3wOIzftf+l/h5WLG1k=",
                        "path": "github.com/lib/pq/oid",
                        "revision": "2704adc878c21e1329f46f6e56a1c387d788ff94",
                        "revisionTime": "2017-03-24T20:46:54Z"
                        "revisionTime": "2017-05-12T22:20:15Z"
                },
                {
-                       "checksumSHA1": "ENl6I8+3AaBanbn9CVExMjDTHPc=",
+                       "checksumSHA1": "dUfdXzRJupI9VpqNR2LlppeZvLc=",
                        "origin": "github.com/docker/docker/vendor/golang.org/x/sys/unix",
                        "path": "golang.org/x/sys/unix",
                        "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",