From: Lisa Knox Date: Fri, 12 Jan 2024 20:24:42 +0000 (-0500) Subject: Merge branch '21315-row-select' X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/f9ae14ce8c4d749cfdc007ec45929cc9a09a3790?hp=7109f4f33c919b07f8e87412c3bc2cc28725296a Merge branch '21315-row-select' closes #21315 Arvados-DCO-1.1-Signed-off-by: Lisa Knox --- diff --git a/build/build-dev-docker-jobs-image.sh b/build/build-dev-docker-jobs-image.sh index bf1ab34189..b0990d0c49 100755 --- a/build/build-dev-docker-jobs-image.sh +++ b/build/build-dev-docker-jobs-image.sh @@ -17,7 +17,6 @@ WORKSPACE=path Path to the Arvados source tree to build packages from CWLTOOL=path (optional) Path to cwltool git repository. SALAD=path (optional) Path to schema_salad git repository. CWL_UTILS=path (optional) Path to cwl-utils git repository. -PYCMD=pythonexec (optional) Specify the python3 executable to use in the docker image. Defaults to "python3". EOF @@ -28,72 +27,26 @@ if [[ -z "$WORKSPACE" ]] ; then echo "Using WORKSPACE $WORKSPACE" fi -if [[ -z "$ARVADOS_API_HOST" || -z "$ARVADOS_API_TOKEN" ]] ; then - echo "$helpmessage" - echo - echo "Must set ARVADOS_API_HOST and ARVADOS_API_TOKEN" - exit 1 -fi - -cd "$WORKSPACE" - -py=python3 -pipcmd=pip -if [[ -n "$PYCMD" ]] ; then - py="$PYCMD" -fi -if [[ $py = python3 ]] ; then - pipcmd=pip3 -fi +context_dir="$(mktemp --directory --tmpdir dev-jobs.XXXXXXXX)" +trap 'rm -rf "$context_dir"' EXIT INT TERM QUIT -(cd sdk/python && python3 setup.py sdist) -sdk=$(cd sdk/python/dist && ls -t arvados-python-client-*.tar.gz | head -n1) - -(cd sdk/cwl && python3 setup.py sdist) -runner=$(cd sdk/cwl/dist && ls -t arvados-cwl-runner-*.tar.gz | head -n1) - -rm -rf sdk/cwl/salad_dist -mkdir -p sdk/cwl/salad_dist -if [[ -n "$SALAD" ]] ; then - (cd "$SALAD" && python3 setup.py sdist) - salad=$(cd "$SALAD/dist" && ls -t schema-salad-*.tar.gz | head -n1) - cp "$SALAD/dist/$salad" $WORKSPACE/sdk/cwl/salad_dist -fi - -rm -rf sdk/cwl/cwltool_dist -mkdir -p sdk/cwl/cwltool_dist -if [[ -n "$CWLTOOL" ]] ; then - (cd "$CWLTOOL" && python3 setup.py sdist) - cwltool=$(cd "$CWLTOOL/dist" && ls -t cwltool-*.tar.gz | head -n1) - cp "$CWLTOOL/dist/$cwltool" $WORKSPACE/sdk/cwl/cwltool_dist -fi - -rm -rf sdk/cwl/cwlutils_dist -mkdir -p sdk/cwl/cwlutils_dist -if [[ -n "$CWL_UTILS" ]] ; then - (cd "$CWL_UTILS" && python3 setup.py sdist) - cwlutils=$(cd "$CWL_UTILS/dist" && ls -t cwl-utils-*.tar.gz | head -n1) - cp "$CWL_UTILS/dist/$cwlutils" $WORKSPACE/sdk/cwl/cwlutils_dist -fi +for src_dir in "$WORKSPACE/sdk/python" "${CWLTOOL:-}" "${CWL_UTILS:-}" "${SALAD:-}" "$WORKSPACE/sdk/cwl"; do + if [[ -z "$src_dir" ]]; then + continue + fi + env -C "$src_dir" python3 setup.py sdist --dist-dir="$context_dir" +done +cd "$WORKSPACE" . build/run-library.sh - # This defines python_sdk_version and cwl_runner_version with python-style # package suffixes (.dev/rc) calculate_python_sdk_cwl_package_versions set -x docker build --no-cache \ - --build-arg sdk=$sdk \ - --build-arg runner=$runner \ - --build-arg salad=$salad \ - --build-arg cwltool=$cwltool \ - --build-arg pythoncmd=$py \ - --build-arg pipcmd=$pipcmd \ - --build-arg cwlutils=$cwlutils \ -f "$WORKSPACE/sdk/dev-jobs.dockerfile" \ -t arvados/jobs:$cwl_runner_version \ - "$WORKSPACE/sdk" + "$context_dir" -echo arv-keepdocker arvados/jobs $cwl_runner_version arv-keepdocker arvados/jobs $cwl_runner_version diff --git a/build/package-build-dockerfiles/rocky8/Dockerfile b/build/package-build-dockerfiles/rocky8/Dockerfile index a1bf643f9a..0eab1f5d36 100644 --- a/build/package-build-dockerfiles/rocky8/Dockerfile +++ b/build/package-build-dockerfiles/rocky8/Dockerfile @@ -6,7 +6,7 @@ ARG HOSTTYPE ARG BRANCH ARG GOVERSION -FROM rockylinux:8.6-minimal as build_x86_64 +FROM rockylinux:8.8-minimal as build_x86_64 ONBUILD ARG BRANCH # Install go ONBUILD ARG GOVERSION @@ -18,7 +18,7 @@ ONBUILD RUN ln -s /usr/local/node-v12.22.12-linux-x64/bin/* /usr/local/bin/ ONBUILD RUN npm install -g yarn ONBUILD RUN ln -sf /usr/local/node-v12.22.12-linux-x64/bin/* /usr/local/bin/ -FROM rockylinux:8.6-minimal as build_aarch64 +FROM rockylinux:8.8-minimal as build_aarch64 ONBUILD ARG BRANCH # Install go ONBUILD ARG GOVERSION @@ -55,10 +55,8 @@ RUN microdnf --assumeyes --enablerepo=devel install \ patch \ postgresql-devel \ procps-ng \ - python3 \ - python3-devel \ - python3-pip \ - python3-virtualenv \ + python39 \ + python39-devel \ readline-devel \ rpm-build \ ruby \ diff --git a/build/package-testing/deb-common-test-packages.sh b/build/package-testing/deb-common-test-packages.sh index cb9d538e8e..32788175d2 100755 --- a/build/package-testing/deb-common-test-packages.sh +++ b/build/package-testing/deb-common-test-packages.sh @@ -47,13 +47,21 @@ fi dpkg-deb -x $debpkg . if [[ "$DEBUG" != "0" ]]; then - while read so && [ -n "$so" ]; do - echo - echo "== Packages dependencies for $so ==" - ldd "$so" | awk '($3 ~ /^\//){print $3}' | sort -u | xargs -r dpkg -S | cut -d: -f1 | sort -u - done <python3 metapackages -build_metapackage "arvados-fuse" "services/fuse" -build_metapackage "arvados-python-client" "services/fuse" -build_metapackage "arvados-cwl-runner" "sdk/cwl" -build_metapackage "crunchstat-summary" "tools/crunchstat-summary" -build_metapackage "arvados-docker-cleaner" "services/dockercleaner" -build_metapackage "arvados-user-activity" "tools/user-activity" - # The cwltest package, which lives out of tree handle_cwltest "$FORMAT" "$ARCH" diff --git a/build/run-library.sh b/build/run-library.sh index 7a70ed23a5..bb224a7172 100755 --- a/build/run-library.sh +++ b/build/run-library.sh @@ -593,8 +593,6 @@ handle_cwltest () { # signal to our build script that we want a cwltest executable installed in /usr/bin/ mkdir cwltest/bin && touch cwltest/bin/cwltest fpm_build_virtualenv "cwltest" "cwltest" "$package_format" "$target_arch" - # The python->python3 metapackage - build_metapackage "cwltest" "cwltest" cd "$WORKSPACE" rm -rf "$WORKSPACE/cwltest" } @@ -696,10 +694,7 @@ fpm_build_virtualenv_worker () { ARVADOS_BUILDING_ITERATION=1 fi - local python=$PYTHON3_EXECUTABLE - pip=pip3 PACKAGE_PREFIX=$PYTHON3_PKG_PREFIX - if [[ "$PKG" != "arvados-docker-cleaner" ]]; then PYTHON_PKG=$PACKAGE_PREFIX-$PKG else @@ -710,28 +705,27 @@ fpm_build_virtualenv_worker () { cd $WORKSPACE/$PKG_DIR rm -rf dist/* + local venv_dir="dist/build/usr/share/python$PYTHON3_VERSION/dist/$PYTHON_PKG" + echo "Creating virtualenv..." + if ! "$PYTHON3_EXECUTABLE" -m venv "$venv_dir"; then + printf "Error, unable to run\n %s -m venv %s\n" "$PYTHON3_EXECUTABLE" "$venv_dir" + exit 1 + fi - # Get the latest setuptools. - # - # Note "pip3 install setuptools" fails on debian12 ("error: - # externally-managed-environment") even if that requirement is - # already satisfied, so we parse "pip3 list" output instead to check - # whether we need to do anything. - if [[ "$($pip list | grep -P -o '^setuptools\s+\K[0-9]+')" -ge 66 ]]; then - : # OK, already installed - elif ! $pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U 'setuptools>=66'; then - echo "Error, unable to upgrade setuptools with" - echo " $pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U 'setuptools>=66'" + local venv_py="$venv_dir/bin/python$PYTHON3_VERSION" + if ! "$venv_py" -m pip install --upgrade $DASHQ_UNLESS_DEBUG $CACHE_FLAG pip setuptools wheel; then + printf "Error, unable to upgrade pip, setuptools, and wheel with + %s -m pip install --upgrade $DASHQ_UNLESS_DEBUG $CACHE_FLAG pip setuptools wheel +" "$venv_py" exit 1 fi + # filter a useless warning (when building the cwltest package) from the stderr output - if ! $python setup.py $DASHQ_UNLESS_DEBUG sdist 2> >(grep -v 'warning: no previously-included files matching'); then - echo "Error, unable to run $python setup.py sdist for $PKG" + if ! "$venv_py" setup.py $DASHQ_UNLESS_DEBUG sdist 2> >(grep -v 'warning: no previously-included files matching'); then + echo "Error, unable to run $venv_py setup.py sdist for $PKG" exit 1 fi - PACKAGE_PATH=`(cd dist; ls *tar.gz)` - if [[ "arvados-python-client" == "$PKG" ]]; then PYSDK_PATH="-f $(pwd)/dist/" fi @@ -750,92 +744,44 @@ fpm_build_virtualenv_worker () { fi # See if we actually need to build this package; does it exist already? - # We can't do this earlier than here, because we need PYTHON_VERSION... - # This isn't so bad; the sdist call above is pretty quick compared to - # the invocation of virtualenv and fpm, below. - if ! test_package_presence "$PYTHON_PKG" "$UNFILTERED_PYTHON_VERSION" "$python" "$ARVADOS_BUILDING_ITERATION" "$target_arch"; then + # We can't do this earlier than here, because we need PYTHON_VERSION. + if ! test_package_presence "$PYTHON_PKG" "$UNFILTERED_PYTHON_VERSION" python3 "$ARVADOS_BUILDING_ITERATION" "$target_arch"; then return 0 fi echo "Building $package_format ($target_arch) package for $PKG from $PKG_DIR" - # Package the sdist in a virtualenv - echo "Creating virtualenv..." - - cd dist - - rm -rf build - rm -f $PYTHON_PKG*deb - echo "virtualenv version: `virtualenv --version`" - virtualenv_command="virtualenv --python `which $python` $DASHQ_UNLESS_DEBUG build/usr/share/$python/dist/$PYTHON_PKG" - - if ! $virtualenv_command; then - echo "Error, unable to run" - echo " $virtualenv_command" - exit 1 - fi - - if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip; then - echo "Error, unable to upgrade pip with" - echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip" - exit 1 - fi - echo "pip version: `build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip --version`" - - if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U 'setuptools<45'; then - echo "Error, unable to upgrade setuptools with" - echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U 'setuptools<45'" - exit 1 - fi - echo "setuptools version: `build/usr/share/$python/dist/$PYTHON_PKG/bin/$python -c 'import setuptools; print(setuptools.__version__)'`" - - if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel; then - echo "Error, unable to upgrade wheel with" - echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel" - exit 1 - fi - echo "wheel version: `build/usr/share/$python/dist/$PYTHON_PKG/bin/wheel version`" - - if [[ "$TARGET" != "centos7" ]] || [[ "$PYTHON_PKG" != "python-arvados-fuse" ]]; then - build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PYSDK_PATH $PACKAGE_PATH - else - # centos7 needs these special tweaks to install python-arvados-fuse - build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG docutils - PYCURL_SSL_LIBRARY=nss build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PYSDK_PATH $PACKAGE_PATH - fi - - if [[ "$?" != "0" ]]; then - echo "Error, unable to run" - echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PYSDK_PATH $PACKAGE_PATH" + local sdist_path="$(ls dist/*.tar.gz)" + if ! "$venv_py" -m pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PYSDK_PATH "$sdist_path"; then + printf "Error, unable to run + %s -m pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG %s %s +" "$venv_py" "$PYSDK_PATH" "$sdist_path" exit 1 fi - cd build/usr/share/$python/dist/$PYTHON_PKG/ + pushd "$venv_dir" >$STDOUT_IF_DEBUG # Replace the shebang lines in all python scripts, and handle the activate # scripts too. This is a functional replacement of the 237 line # virtualenv_tools.py script that doesn't work in python3 without serious # patching, minus the parts we don't need (modifying pyc files, etc). + local sys_venv_dir="${venv_dir#dist/build/}" + local sys_venv_py="$sys_venv_dir/bin/python$PYTHON3_VERSION" for binfile in `ls bin/`; do - if ! file --mime bin/$binfile |grep -q binary; then - # Not a binary file - if [[ "$binfile" =~ ^activate(.csh|.fish|)$ ]]; then - # these 'activate' scripts need special treatment - sed -i "s/VIRTUAL_ENV=\".*\"/VIRTUAL_ENV=\"\/usr\/share\/$python\/dist\/$PYTHON_PKG\"/" bin/$binfile - sed -i "s/VIRTUAL_ENV \".*\"/VIRTUAL_ENV \"\/usr\/share\/$python\/dist\/$PYTHON_PKG\"/" bin/$binfile - else - if grep -q -E '^#!.*/bin/python\d?' bin/$binfile; then - # Replace shebang line - sed -i "1 s/^.*$/#!\/usr\/share\/$python\/dist\/$PYTHON_PKG\/bin\/python/" bin/$binfile - fi - fi + if file --mime "bin/$binfile" | grep -q binary; then + : # Nothing to do for binary files + elif [[ "$binfile" =~ ^activate(.csh|.fish|)$ ]]; then + sed -ri "s@VIRTUAL_ENV(=| )\".*\"@VIRTUAL_ENV\\1\"/$sys_venv_dir\"@" "bin/$binfile" + else + # Replace shebang line + sed -ri "1 s@^#\![^[:space:]]+/bin/python[0-9.]*@#\!/$sys_venv_py@" "bin/$binfile" fi done - cd - >$STDOUT_IF_DEBUG + popd >$STDOUT_IF_DEBUG + cd dist - find build -iname '*.pyc' -exec rm {} \; - find build -iname '*.pyo' -exec rm {} \; + find build -iname '*.py[co]' -delete # Finally, generate the package echo "Creating package..." @@ -922,7 +868,7 @@ fpm_build_virtualenv_worker () { # make sure the systemd service file ends up in the right place # used by arvados-docker-cleaner if [[ -e "${systemd_unit}" ]]; then - COMMAND_ARR+=("usr/share/$python/dist/$PKG/share/doc/$PKG/$PKG.service=/lib/systemd/system/$PKG.service") + COMMAND_ARR+=("usr/share/python$PYTHON3_VERSION/dist/$PKG/share/doc/$PKG/$PKG.service=/lib/systemd/system/$PKG.service") fi COMMAND_ARR+=("${fpm_args[@]}") @@ -935,13 +881,13 @@ fpm_build_virtualenv_worker () { # because those are the ones we rewrote the shebang line of, above. if [[ -e "$WORKSPACE/$PKG_DIR/bin" ]]; then for binary in `ls $WORKSPACE/$PKG_DIR/bin`; do - COMMAND_ARR+=("usr/share/$python/dist/$PYTHON_PKG/bin/$binary=/usr/bin/") + COMMAND_ARR+=("$sys_venv_dir/bin/$binary=/usr/bin/") done fi # the python3-arvados-cwl-runner package comes with cwltool, expose that version - if [[ -e "$WORKSPACE/$PKG_DIR/dist/build/usr/share/$python/dist/$PYTHON_PKG/bin/cwltool" ]]; then - COMMAND_ARR+=("usr/share/$python/dist/$PYTHON_PKG/bin/cwltool=/usr/bin/") + if [[ -e "$WORKSPACE/$PKG_DIR/$venv_dir/bin/cwltool" ]]; then + COMMAND_ARR+=("$sys_venv_dir/bin/cwltool=/usr/bin/") fi COMMAND_ARR+=(".") @@ -963,136 +909,6 @@ fpm_build_virtualenv_worker () { echo } -# build_metapackage builds meta packages that help with the python to python 3 package migration -build_metapackage() { - # base package name (e.g. arvados-python-client) - BASE_NAME=$1 - shift - PKG_DIR=$1 - shift - - if [[ -n "$ONLY_BUILD" ]] && [[ "python-$BASE_NAME" != "$ONLY_BUILD" ]]; then - return 0 - fi - - if [[ "$ARVADOS_BUILDING_ITERATION" == "" ]]; then - ARVADOS_BUILDING_ITERATION=1 - fi - - if [[ -z "$ARVADOS_BUILDING_VERSION" ]]; then - cd $WORKSPACE/$PKG_DIR - pwd - rm -rf dist/* - - # Get the latest setuptools - if ! pip3 install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U 'setuptools<45'; then - echo "Error, unable to upgrade setuptools with XY" - echo " pip3 install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U 'setuptools<45'" - exit 1 - fi - # filter a useless warning (when building the cwltest package) from the stderr output - if ! python3 setup.py $DASHQ_UNLESS_DEBUG sdist 2> >(grep -v 'warning: no previously-included files matching'); then - echo "Error, unable to run python3 setup.py sdist for $PKG" - exit 1 - fi - - PYTHON_VERSION=$(awk '($1 == "Version:"){print $2}' *.egg-info/PKG-INFO) - UNFILTERED_PYTHON_VERSION=$(echo -n $PYTHON_VERSION | sed s/\.dev/~dev/g |sed 's/\([0-9]\)rc/\1~rc/g') - - else - UNFILTERED_PYTHON_VERSION=$ARVADOS_BUILDING_VERSION - PYTHON_VERSION=$(echo -n $ARVADOS_BUILDING_VERSION | sed s/~dev/.dev/g | sed s/~rc/rc/g) - fi - - cd - >$STDOUT_IF_DEBUG - if [[ -d "$BASE_NAME" ]]; then - rm -rf $BASE_NAME - fi - mkdir $BASE_NAME - cd $BASE_NAME - - if [[ "$FORMAT" == "deb" ]]; then - cat >ns-control < -Depends: python3-${BASE_NAME} -Description: metapackage to ease the upgrade to the Pyhon 3 version of ${BASE_NAME} - This package is a metapackage that will automatically install the new version of - ${BASE_NAME} which is Python 3 based and has a different name. -EOF - - /usr/bin/equivs-build ns-control - if [[ $? -ne 0 ]]; then - echo "Error running 'equivs-build ns-control', is the 'equivs' package installed?" - return 1 - fi - elif [[ "$FORMAT" == "rpm" ]]; then - cat >meta.spec < -- initial release -EOF - - /usr/bin/rpmbuild -ba meta.spec - if [[ $? -ne 0 ]]; then - echo "Error running 'rpmbuild -ba meta.spec', is the 'rpm-build' package installed?" - return 1 - else - mv /root/rpmbuild/RPMS/x86_64/python-${BASE_NAME}*.${FORMAT} . - if [[ $? -ne 0 ]]; then - echo "Error finding rpm file output of 'rpmbuild -ba meta.spec'" - return 1 - fi - fi - else - echo "Unknown format" - return 1 - fi - - if [[ $EXITCODE -ne 0 ]]; then - return 1 - else - echo `ls *$FORMAT` - mv *$FORMAT $WORKSPACE/packages/$TARGET/ - fi - - # clean up - cd - >$STDOUT_IF_DEBUG - if [[ -d "$BASE_NAME" ]]; then - rm -rf $BASE_NAME - fi -} - # Build packages for everything fpm_build() { # Source dir where fpm-info.sh (if any) will be found. diff --git a/doc/_includes/_hpc_max_gateway_tunnels.liquid b/doc/_includes/_hpc_max_gateway_tunnels.liquid new file mode 100644 index 0000000000..ba8769c653 --- /dev/null +++ b/doc/_includes/_hpc_max_gateway_tunnels.liquid @@ -0,0 +1,18 @@ +{% comment %} +Copyright (C) The Arvados Authors. All rights reserved. + +SPDX-License-Identifier: CC-BY-SA-3.0 +{% endcomment %} + +h3(#MaxGatewayTunnels). API.MaxGatewayTunnels + +Each Arvados container that runs on your HPC cluster will bring up a long-lived connection to the Arvados controller and keep it open for the entire duration of the container. This connection is used to access real-time container logs from Workbench, and to enable the "container shell":{{site.baseurl}}/install/container-shell-access.html feature. + +Set the @MaxGatewayTunnels@ config entry high enough to accommodate the maximum number of containers you expect to run concurrently on your HPC cluster, plus incoming container shell sessions. + + +
    API:
+      MaxGatewayTunnels: 2000
+
+ +Also, configure Nginx (and any other HTTP proxies or load balancers running between the HPC and Arvados controller) to allow the expected number of connections, i.e., @MaxConcurrentRequests + MaxQueuedRequests + MaxGatewayTunnels@. diff --git a/doc/admin/upgrading.html.textile.liquid b/doc/admin/upgrading.html.textile.liquid index 81b32c1444..6d37009e91 100644 --- a/doc/admin/upgrading.html.textile.liquid +++ b/doc/admin/upgrading.html.textile.liquid @@ -32,6 +32,10 @@ h2(#main). development main "previous: Upgrading to 2.7.1":#v2_7_1 +h3. Check MaxGatewayTunnels config + +If you use the LSF or Slurm dispatcher, ensure the new @API.MaxGatewayTunnels@ config entry is high enough to support the size of your cluster. See "LSF docs":{{site.baseurl}}/install/crunch2-lsf/install-dispatch.html#MaxGatewayTunnels or "Slurm docs":{{site.baseurl}}/install/crunch2-slurm/install-dispatch.html#MaxGatewayTunnels for details. + h2(#2_7_1). v2.7.1 (2023-12-12) "previous: Upgrading to 2.7.0":#v2_7_0 diff --git a/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid b/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid index d4328d89a3..fc4393d0b6 100644 --- a/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid +++ b/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid @@ -40,6 +40,8 @@ Add a DispatchLSF entry to the Services section, using the hostname where @arvad Review the following configuration parameters and adjust as needed. +{% include 'hpc_max_gateway_tunnels' %} + h3(#BsubSudoUser). Containers.LSF.BsubSudoUser arvados-dispatch-lsf uses @sudo@ to execute @bsub@, for example @sudo -E -u crunch bsub [...]@. This means the @crunch@ account must exist on the hosts where LSF jobs run ("execution hosts"), as well as on the host where you are installing the Arvados LSF dispatcher (the "submission host"). To use a user account other than @crunch@, configure @BsubSudoUser@: diff --git a/doc/install/crunch2-slurm/install-dispatch.html.textile.liquid b/doc/install/crunch2-slurm/install-dispatch.html.textile.liquid index 554f53dd38..16af80d127 100644 --- a/doc/install/crunch2-slurm/install-dispatch.html.textile.liquid +++ b/doc/install/crunch2-slurm/install-dispatch.html.textile.liquid @@ -41,6 +41,8 @@ Add a DispatchSLURM entry to the Services section, using the hostname where @cru The following configuration parameters are optional. +{% include 'hpc_max_gateway_tunnels' %} + h3(#PollPeriod). Containers.PollInterval crunch-dispatch-slurm polls the API server periodically for new containers to run. The @PollInterval@ option controls how often this poll happens. Set this to a string of numbers suffixed with one of the time units @ns@, @us@, @ms@, @s@, @m@, or @h@. For example: diff --git a/docker/jobs/Dockerfile b/docker/jobs/Dockerfile index 1b75e13420..371b9cc984 100644 --- a/docker/jobs/Dockerfile +++ b/docker/jobs/Dockerfile @@ -3,31 +3,16 @@ # SPDX-License-Identifier: Apache-2.0 # Based on Debian -FROM debian:buster-slim +FROM debian:bullseye-slim MAINTAINER Arvados Package Maintainers -ENV DEBIAN_FRONTEND noninteractive - -RUN apt-get update -q -RUN apt-get install -yq --no-install-recommends gnupg - ARG repo_version -RUN echo repo_version $repo_version -ADD apt.arvados.org-$repo_version.list /etc/apt/sources.list.d/ - -ADD 1078ECD7.key /tmp/ -RUN cat /tmp/1078ECD7.key | apt-key add - - -ARG python_sdk_version ARG cwl_runner_version -RUN echo cwl_runner_version $cwl_runner_version python_sdk_version $python_sdk_version +ADD apt.arvados.org-$repo_version.list /etc/apt/sources.list.d/ +ADD 1078ECD7.key /etc/apt/trusted.gpg.d/arvados.asc RUN apt-get update -q -RUN apt-get install -yq --no-install-recommends python3-arvados-cwl-runner=$cwl_runner_version - -# use the Python executable from the python-arvados-cwl-runner package -RUN PYTHON=`ls /usr/share/python3*/dist/python3-arvados-cwl-runner/bin/python|head -n1` && rm -f /usr/bin/python && ln -s $PYTHON /usr/bin/python -RUN PYTHON3=`ls /usr/share/python3*/dist/python3-arvados-cwl-runner/bin/python3|head -n1` && rm -f /usr/bin/python3 && ln -s $PYTHON3 /usr/bin/python3 +RUN DEBIAN_FRONTEND=noninteractive apt-get install -yq --no-install-recommends python3-arvados-cwl-runner=$cwl_runner_version # Install dependencies and set up system. RUN /usr/sbin/adduser --disabled-password \ @@ -35,3 +20,4 @@ RUN /usr/sbin/adduser --disabled-password \ /usr/bin/install --directory --owner=crunch --group=crunch --mode=0700 /keep /tmp/crunch-src /tmp/crunch-job USER crunch +ENV PATH=/usr/share/python3.9/dist/python3-arvados-cwl-runner/bin:/usr/local/bin:/usr/bin:/bin diff --git a/docker/jobs/apt.arvados.org-dev.list b/docker/jobs/apt.arvados.org-dev.list index 210f5d5511..155244ba9f 100644 --- a/docker/jobs/apt.arvados.org-dev.list +++ b/docker/jobs/apt.arvados.org-dev.list @@ -1,2 +1,2 @@ # apt.arvados.org -deb http://apt.arvados.org/buster buster-dev main +deb http://apt.arvados.org/bullseye bullseye-dev main diff --git a/docker/jobs/apt.arvados.org-stable.list b/docker/jobs/apt.arvados.org-stable.list index 153e729805..5a4b8c91c8 100644 --- a/docker/jobs/apt.arvados.org-stable.list +++ b/docker/jobs/apt.arvados.org-stable.list @@ -1,2 +1,2 @@ # apt.arvados.org -deb http://apt.arvados.org/buster buster main +deb http://apt.arvados.org/bullseye bullseye main diff --git a/docker/jobs/apt.arvados.org-testing.list b/docker/jobs/apt.arvados.org-testing.list index d5f4581685..302862ca64 100644 --- a/docker/jobs/apt.arvados.org-testing.list +++ b/docker/jobs/apt.arvados.org-testing.list @@ -1,2 +1,2 @@ # apt.arvados.org -deb http://apt.arvados.org/buster buster-testing main +deb http://apt.arvados.org/bullseye bullseye-testing main diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 05bc1309cd..ddf7a01e90 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -231,6 +231,10 @@ Clusters: # also effectively limited by MaxConcurrentRailsRequests (see # below) because most controller requests proxy through to the # RailsAPI service. + # + # HTTP proxies and load balancers downstream of arvados services + # should be configured to allow at least {MaxConcurrentRequest + + # MaxQueuedRequests + MaxGatewayTunnels} concurrent requests. MaxConcurrentRequests: 64 # Maximum number of concurrent requests to process concurrently @@ -250,6 +254,13 @@ Clusters: # the incoming request queue before returning 503. MaxQueueTimeForLockRequests: 2s + # Maximum number of active gateway tunnel connections. One slot + # is consumed by each "container shell" connection. If using an + # HPC dispatcher (LSF or Slurm), one slot is consumed by each + # running container. These do not count toward + # MaxConcurrentRequests. + MaxGatewayTunnels: 1000 + # Fraction of MaxConcurrentRequests that can be "log create" # messages at any given time. This is to prevent logging # updates from crowding out more important requests. diff --git a/lib/config/export.go b/lib/config/export.go index e51e6fc32c..4b6c142ff2 100644 --- a/lib/config/export.go +++ b/lib/config/export.go @@ -70,6 +70,7 @@ var whitelist = map[string]bool{ "API.LogCreateRequestFraction": false, "API.MaxConcurrentRailsRequests": false, "API.MaxConcurrentRequests": false, + "API.MaxGatewayTunnels": false, "API.MaxIndexDatabaseRead": false, "API.MaxItemsPerResponse": true, "API.MaxKeepBlobBuffers": false, diff --git a/lib/controller/rpc/conn.go b/lib/controller/rpc/conn.go index a8ecc57bba..9f518d9c7a 100644 --- a/lib/controller/rpc/conn.go +++ b/lib/controller/rpc/conn.go @@ -482,11 +482,11 @@ func (conn *Conn) socket(ctx context.Context, u *url.URL, upgradeHeader string, } else { message = fmt.Sprintf("%q", body) } - return connresp, fmt.Errorf("server did not provide a tunnel: %s: %s", resp.Status, message) + return connresp, httpserver.ErrorWithStatus(fmt.Errorf("server did not provide a tunnel: %s: %s", resp.Status, message), resp.StatusCode) } if strings.ToLower(resp.Header.Get("Upgrade")) != upgradeHeader || strings.ToLower(resp.Header.Get("Connection")) != "upgrade" { - return connresp, fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection")) + return connresp, httpserver.ErrorWithStatus(fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection")), http.StatusBadGateway) } connresp.Conn = netconn connresp.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw} diff --git a/lib/crunchrun/container_gateway.go b/lib/crunchrun/container_gateway.go index 30f8957a2d..5b68e2c50e 100644 --- a/lib/crunchrun/container_gateway.go +++ b/lib/crunchrun/container_gateway.go @@ -220,7 +220,7 @@ func (gw *Gateway) runTunnel(addr string) error { AuthSecret: gw.AuthSecret, }) if err != nil { - return fmt.Errorf("error creating gateway tunnel: %s", err) + return fmt.Errorf("error creating gateway tunnel: %w", err) } mux, err := yamux.Client(tun.Conn, nil) if err != nil { diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go index 51c2c3d6a3..20185554b8 100644 --- a/lib/dispatchcloud/dispatcher_test.go +++ b/lib/dispatchcloud/dispatcher_test.go @@ -207,6 +207,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { finishContainer(ctr) return int(rand.Uint32() & 0x3) } + var type4BrokenUntil time.Time var countCapacityErrors int64 vmCount := int32(0) s.stubDriver.Queue = queue @@ -224,6 +225,17 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { stubvm.CrashRunningContainer = finishContainer stubvm.ExtraCrunchRunArgs = "'--runtime-engine=stub' '--foo' '--extra='\\''args'\\'''" switch { + case stubvm.Instance().ProviderType() == test.InstanceType(4).ProviderType && + (type4BrokenUntil.IsZero() || time.Now().Before(type4BrokenUntil)): + // Initially (at least 2*TimeoutBooting), all + // instances of this type are completely + // broken. This ensures the + // boot_outcomes{outcome="failure"} metric is + // not zero. + stubvm.Broken = time.Now() + if type4BrokenUntil.IsZero() { + type4BrokenUntil = time.Now().Add(2 * s.cluster.Containers.CloudVMs.TimeoutBooting.Duration()) + } case n%7 == 0: // some instances start out OK but then stop // running any commands @@ -235,11 +247,6 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { // some instances start out OK but then start // reporting themselves as broken stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond) - case n == 3: - // 1 instance is completely broken, ensuring - // the boot_outcomes{outcome="failure"} metric - // is not zero - stubvm.Broken = time.Now() default: stubvm.CrunchRunCrashRate = 0.1 stubvm.ArvMountDeadlockRate = 0.1 diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go index 2f1f175890..03fa592777 100644 --- a/lib/dispatchcloud/scheduler/run_queue.go +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -239,7 +239,7 @@ tryrun: // so mark it as allocated, and try to // start the container. unalloc[unallocType]-- - logger = logger.WithField("InstanceType", unallocType) + logger = logger.WithField("InstanceType", unallocType.Name) if dontstart[unallocType] { // We already tried & failed to start // a higher-priority container on the @@ -282,7 +282,7 @@ tryrun: logger.Trace("all eligible types at capacity") continue } - logger = logger.WithField("InstanceType", availableType) + logger = logger.WithField("InstanceType", availableType.Name) if !sch.pool.Create(availableType) { // Failed despite not being at quota, // e.g., cloud ops throttled. diff --git a/lib/service/cmd.go b/lib/service/cmd.go index 725f86f3bd..82e95fe0b4 100644 --- a/lib/service/cmd.go +++ b/lib/service/cmd.go @@ -16,6 +16,7 @@ import ( _ "net/http/pprof" "net/url" "os" + "regexp" "strings" "time" @@ -148,32 +149,13 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout return 1 } - maxReqs := cluster.API.MaxConcurrentRequests - if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 && - (maxRails < maxReqs || maxReqs == 0) && - strings.HasSuffix(prog, "controller") { - // Ideally, we would accept up to - // MaxConcurrentRequests, and apply the - // MaxConcurrentRailsRequests limit only for requests - // that require calling upstream to RailsAPI. But for - // now we make the simplifying assumption that every - // controller request causes an upstream RailsAPI - // request. - maxReqs = maxRails - } instrumented := httpserver.Instrument(reg, log, httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(), httpserver.AddRequestIDs( httpserver.Inspect(reg, cluster.ManagementToken, httpserver.LogRequests( interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth, - &httpserver.RequestLimiter{ - Handler: handler, - MaxConcurrent: maxReqs, - MaxQueue: cluster.API.MaxQueuedRequests, - MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(), - Priority: c.requestPriority, - Registry: reg})))))) + c.requestLimiter(handler, cluster, reg))))))) srv := &httpserver.Server{ Server: http.Server{ Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)), @@ -212,7 +194,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout <-handler.Done() srv.Close() }() - go c.requestQueueDumpCheck(cluster, maxReqs, prog, reg, &srv.Server, logger) + go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger) err = srv.Wait() if err != nil { return 1 @@ -221,12 +203,13 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout } // If SystemLogs.RequestQueueDumpDirectory is set, monitor the -// server's incoming HTTP request queue size. When it exceeds 90% of -// API.MaxConcurrentRequests, write the /_inspect/requests data to a -// JSON file in the specified directory. -func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) { +// server's incoming HTTP request limiters. When the number of +// concurrent requests in any queue ("api" or "tunnel") exceeds 90% of +// its maximum slots, write the /_inspect/requests data to a JSON file +// in the specified directory. +func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) { outdir := cluster.SystemLogs.RequestQueueDumpDirectory - if outdir == "" || cluster.ManagementToken == "" || maxReqs < 1 { + if outdir == "" || cluster.ManagementToken == "" { return } logger = logger.WithField("worker", "RequestQueueDump") @@ -237,16 +220,29 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, p logger.WithError(err).Warn("error getting metrics") continue } - dump := false + cur := map[string]int{} // queue label => current + max := map[string]int{} // queue label => max for _, mf := range mfs { - if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 { - n := int(mf.Metric[0].GetGauge().GetValue()) - if n > 0 && n >= maxReqs*9/10 { - dump = true - break + for _, m := range mf.GetMetric() { + for _, ml := range m.GetLabel() { + if ml.GetName() == "queue" { + n := int(m.GetGauge().GetValue()) + if name := mf.GetName(); name == "arvados_concurrent_requests" { + cur[*ml.Value] = n + } else if name == "arvados_max_concurrent_requests" { + max[*ml.Value] = n + } + } } } } + dump := false + for queue, n := range cur { + if n > 0 && max[queue] > 0 && n >= max[queue]*9/10 { + dump = true + break + } + } if dump { req, err := http.NewRequest("GET", "/_inspect/requests", nil) if err != nil { @@ -269,6 +265,67 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, p } } +// Set up a httpserver.RequestLimiter with separate queues/streams for +// API requests (obeying MaxConcurrentRequests etc) and gateway tunnel +// requests (obeying MaxGatewayTunnels). +func (c *command) requestLimiter(handler http.Handler, cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler { + maxReqs := cluster.API.MaxConcurrentRequests + if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 && + (maxRails < maxReqs || maxReqs == 0) && + c.svcName == arvados.ServiceNameController { + // Ideally, we would accept up to + // MaxConcurrentRequests, and apply the + // MaxConcurrentRailsRequests limit only for requests + // that require calling upstream to RailsAPI. But for + // now we make the simplifying assumption that every + // controller request causes an upstream RailsAPI + // request. + maxReqs = maxRails + } + rqAPI := &httpserver.RequestQueue{ + Label: "api", + MaxConcurrent: maxReqs, + MaxQueue: cluster.API.MaxQueuedRequests, + MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(), + } + rqTunnel := &httpserver.RequestQueue{ + Label: "tunnel", + MaxConcurrent: cluster.API.MaxGatewayTunnels, + MaxQueue: 0, + } + return &httpserver.RequestLimiter{ + Handler: handler, + Priority: c.requestPriority, + Registry: reg, + Queue: func(req *http.Request) *httpserver.RequestQueue { + if req.Method == http.MethodPost && reTunnelPath.MatchString(req.URL.Path) { + return rqTunnel + } else { + return rqAPI + } + }, + } +} + +// reTunnelPath matches paths of API endpoints that go in the "tunnel" +// queue. +var reTunnelPath = regexp.MustCompile(func() string { + rePathVar := regexp.MustCompile(`{.*?}`) + out := "" + for _, endpoint := range []arvados.APIEndpoint{ + arvados.EndpointContainerGatewayTunnel, + arvados.EndpointContainerGatewayTunnelCompat, + arvados.EndpointContainerSSH, + arvados.EndpointContainerSSHCompat, + } { + if out != "" { + out += "|" + } + out += `\Q/` + rePathVar.ReplaceAllString(endpoint.Path, `\E[^/]*\Q`) + `\E` + } + return "^(" + out + ")$" +}()) + func (c *command) requestPriority(req *http.Request, queued time.Time) int64 { switch { case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"): diff --git a/lib/service/cmd_test.go b/lib/service/cmd_test.go index 08b3a239dc..9ead90019e 100644 --- a/lib/service/cmd_test.go +++ b/lib/service/cmd_test.go @@ -17,6 +17,8 @@ import ( "net/url" "os" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -198,15 +200,24 @@ func (*Suite) TestCommand(c *check.C) { c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`) } -func (s *Suite) TestDumpRequestsKeepweb(c *check.C) { - s.testDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests") +func (s *Suite) TestTunnelPathRegexp(c *check.C) { + c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true) + c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true) + c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true) + c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true) + c.Check(reTunnelPath.MatchString(`/blah/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, false) + c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa`), check.Equals, false) } -func (s *Suite) TestDumpRequestsController(c *check.C) { - s.testDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests") +func (s *Suite) TestRequestLimitsAndDumpRequests_Keepweb(c *check.C) { + s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests") } -func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) { +func (s *Suite) TestRequestLimitsAndDumpRequests_Controller(c *check.C) { + s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests") +} + +func (*Suite) testRequestLimitAndDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) { defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval) requestQueueDumpCheckInterval = time.Second / 10 @@ -218,6 +229,7 @@ func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxR defer cf.Close() max := 24 + maxTunnels := 30 fmt.Fprintf(cf, ` Clusters: zzzzz: @@ -225,7 +237,8 @@ Clusters: ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb API: `+maxReqsConfigKey+`: %d - MaxQueuedRequests: 0 + MaxQueuedRequests: 1 + MaxGatewayTunnels: %d SystemLogs: {RequestQueueDumpDirectory: %q} Services: Controller: @@ -234,14 +247,18 @@ Clusters: WebDAV: ExternalURL: "http://localhost:`+port+`" InternalURLs: {"http://localhost:`+port+`": {}} -`, max, tmpdir) +`, max, maxTunnels, tmpdir) cf.Close() started := make(chan bool, max+1) hold := make(chan bool) handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - started <- true - <-hold + if strings.Contains(r.URL.Path, "/ssh") || strings.Contains(r.URL.Path, "/gateway_tunnel") { + <-hold + } else { + started <- true + <-hold + } }) healthCheck := make(chan bool, 1) ctx, cancel := context.WithCancel(context.Background()) @@ -267,15 +284,59 @@ Clusters: } client := http.Client{} deadline := time.Now().Add(time.Second * 2) + var activeReqs sync.WaitGroup + + // Start some API reqs + var apiResp200, apiResp503 int64 for i := 0; i < max+1; i++ { + activeReqs.Add(1) + go func() { + defer activeReqs.Done() + target := "http://localhost:" + port + "/testpath" + resp, err := client.Get(target) + for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) { + time.Sleep(time.Second / 100) + resp, err = client.Get(target) + } + if c.Check(err, check.IsNil) { + if resp.StatusCode == http.StatusOK { + atomic.AddInt64(&apiResp200, 1) + } else if resp.StatusCode == http.StatusServiceUnavailable { + atomic.AddInt64(&apiResp503, 1) + } + } + }() + } + + // Start some gateway tunnel reqs that don't count toward our + // API req limit + extraTunnelReqs := 20 + var tunnelResp200, tunnelResp503 int64 + var paths = []string{ + "/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1), + "/" + strings.Replace(arvados.EndpointContainerSSHCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1), + "/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1), + "/" + strings.Replace(arvados.EndpointContainerGatewayTunnelCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1), + } + for i := 0; i < maxTunnels+extraTunnelReqs; i++ { + i := i + activeReqs.Add(1) go func() { - resp, err := client.Get("http://localhost:" + port + "/testpath") + defer activeReqs.Done() + target := "http://localhost:" + port + paths[i%len(paths)] + resp, err := client.Post(target, "application/octet-stream", nil) for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) { time.Sleep(time.Second / 100) - resp, err = client.Get("http://localhost:" + port + "/testpath") + resp, err = client.Post(target, "application/octet-stream", nil) } if c.Check(err, check.IsNil) { - c.Logf("resp StatusCode %d", resp.StatusCode) + if resp.StatusCode == http.StatusOK { + atomic.AddInt64(&tunnelResp200, 1) + } else if resp.StatusCode == http.StatusServiceUnavailable { + atomic.AddInt64(&tunnelResp503, 1) + } else { + c.Errorf("tunnel response code %d", resp.StatusCode) + } } }() } @@ -284,6 +345,10 @@ Clusters: case <-started: case <-time.After(time.Second): c.Logf("%s", stderr.String()) + c.Logf("apiResp200 %d", apiResp200) + c.Logf("apiResp503 %d", apiResp503) + c.Logf("tunnelResp200 %d", tunnelResp200) + c.Logf("tunnelResp503 %d", tunnelResp503) c.Fatal("timed out") } } @@ -300,6 +365,20 @@ Clusters: var loaded []struct{ URL string } err = json.Unmarshal(j, &loaded) c.Check(err, check.IsNil) + + for i := 0; i < len(loaded); i++ { + if strings.Contains(loaded[i].URL, "/ssh") || strings.Contains(loaded[i].URL, "/gateway_tunnel") { + // Filter out a gateway tunnel req + // that doesn't count toward our API + // req limit + if i < len(loaded)-1 { + copy(loaded[i:], loaded[i+1:]) + i-- + } + loaded = loaded[:len(loaded)-1] + } + } + if len(loaded) < max { // Dumped when #requests was >90% but <100% of // limit. If we stop now, we won't be able to @@ -309,7 +388,7 @@ Clusters: c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max) continue } - c.Check(loaded, check.HasLen, max) + c.Check(loaded, check.HasLen, max+1) c.Check(loaded[0].URL, check.Equals, "/testpath") break } @@ -328,7 +407,8 @@ Clusters: c.Check(err, check.IsNil) switch path { case "/metrics": - c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests `+fmt.Sprintf("%d", max)+`\n.*`) + c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests{queue="api"} `+fmt.Sprintf("%d", max)+`\n.*`) + c.Check(string(buf), check.Matches, `(?ms).*arvados_queued_requests{priority="normal",queue="api"} 1\n.*`) case "/_inspect/requests": c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`) default: @@ -336,6 +416,11 @@ Clusters: } } close(hold) + activeReqs.Wait() + c.Check(int(apiResp200), check.Equals, max+1) + c.Check(int(apiResp503), check.Equals, 0) + c.Check(int(tunnelResp200), check.Equals, maxTunnels) + c.Check(int(tunnelResp503), check.Equals, extraTunnelReqs) cancel() } diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index fd3b7a5d16..9fc00c0017 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -10,11 +10,12 @@ from future.utils import viewitems from builtins import str import argparse +import importlib.metadata +import importlib.resources import logging import os import sys import re -import pkg_resources # part of setuptools from schema_salad.sourceline import SourceLine import schema_salad.validate as validate @@ -57,15 +58,12 @@ arvados.log_handler.setFormatter(logging.Formatter( def versionstring(): """Print version string of key packages for provenance and debugging.""" - - arvcwlpkg = pkg_resources.require("arvados-cwl-runner") - arvpkg = pkg_resources.require("arvados-python-client") - cwlpkg = pkg_resources.require("cwltool") - - return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version, - "arvados-python-client", arvpkg[0].version, - "cwltool", cwlpkg[0].version) - + return "{} {}, arvados-python-client {}, cwltool {}".format( + sys.argv[0], + importlib.metadata.version('arvados-cwl-runner'), + importlib.metadata.version('arvados-python-client'), + importlib.metadata.version('cwltool'), + ) def arg_parser(): # type: () -> argparse.ArgumentParser parser = argparse.ArgumentParser( @@ -270,10 +268,8 @@ def add_arv_hints(): cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE supported_versions = ["v1.0", "v1.1", "v1.2"] for s in supported_versions: - res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-%s.yml' % s) - customschema = res.read().decode('utf-8') + customschema = importlib.resources.read_text(__name__, f'arv-cwl-schema-{s}.yml', 'utf-8') use_custom_schema(s, "http://arvados.org/cwl", customschema) - res.close() cwltool.process.supportedProcessRequirements.extend([ "http://arvados.org/cwl#RunInSingleContainer", "http://arvados.org/cwl#OutputDirType", diff --git a/sdk/dev-jobs.dockerfile b/sdk/dev-jobs.dockerfile index 95b039eba9..0169b94706 100644 --- a/sdk/dev-jobs.dockerfile +++ b/sdk/dev-jobs.dockerfile @@ -9,47 +9,23 @@ # version. # # Use arvados/build/build-dev-docker-jobs-image.sh to build. -# -# (This dockerfile file must be located in the arvados/sdk/ directory because -# of the docker build root.) -FROM debian:buster-slim +FROM debian:bullseye-slim MAINTAINER Arvados Package Maintainers -ENV DEBIAN_FRONTEND noninteractive - -ARG pythoncmd=python3 -ARG pipcmd=pip3 - -RUN apt-get update -q && apt-get install -qy --no-install-recommends \ - git ${pythoncmd}-pip ${pythoncmd}-virtualenv ${pythoncmd}-dev libcurl4-gnutls-dev \ - libgnutls28-dev nodejs ${pythoncmd}-pyasn1-modules build-essential ${pythoncmd}-setuptools - -ARG sdk -ARG runner -ARG salad -ARG cwlutils -ARG cwltool - -ADD python/dist/$sdk /tmp/ -ADD cwl/salad_dist/$salad /tmp/ -ADD cwl/cwltool_dist/$cwltool /tmp/ -ADD cwl/cwlutils_dist/$cwlutils /tmp/ -ADD cwl/dist/$runner /tmp/ +RUN DEBIAN_FRONTEND=noninteractive apt-get update -q && apt-get install -qy --no-install-recommends \ + git python3-dev python3-venv libcurl4-gnutls-dev libgnutls28-dev nodejs build-essential -RUN $pipcmd install wheel -RUN cd /tmp/arvados-python-client-* && $pipcmd install . -RUN if test -d /tmp/schema-salad-* ; then cd /tmp/schema-salad-* && $pipcmd install . ; fi -RUN if test -d /tmp/cwl-utils-* ; then cd /tmp/cwl-utils-* && $pipcmd install . ; fi -RUN if test -d /tmp/cwltool-* ; then cd /tmp/cwltool-* && $pipcmd install . ; fi -RUN cd /tmp/arvados-cwl-runner-* && $pipcmd install . +RUN python3 -m venv /opt/arvados-py +ENV PATH=/opt/arvados-py/bin:/usr/local/bin:/usr/bin:/bin +RUN python3 -m pip install --no-cache-dir setuptools wheel -# Sometimes Python dependencies install successfully but don't -# actually work. So run arvados-cwl-runner here to catch fun -# dependency errors like pkg_resources.DistributionNotFound. -RUN arvados-cwl-runner --version +# The build script sets up our build context with all the Python source +# packages to install. +COPY . /usr/local/src/ +# Run a-c-r afterward to check for a successful install. +RUN python3 -m pip install --no-cache-dir /usr/local/src/* && arvados-cwl-runner --version -# Install dependencies and set up system. RUN /usr/sbin/adduser --disabled-password \ --gecos 'Crunch execution user' crunch && \ /usr/bin/install --directory --owner=crunch --group=crunch --mode=0700 /keep /tmp/crunch-src /tmp/crunch-job diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index 6301ed047a..16d789daf5 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -102,6 +102,7 @@ type Cluster struct { MaxConcurrentRailsRequests int MaxConcurrentRequests int MaxQueuedRequests int + MaxGatewayTunnels int MaxQueueTimeForLockRequests Duration LogCreateRequestFraction float64 MaxKeepBlobBuffers int diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go index 9d501ab0eb..1e3316ed48 100644 --- a/sdk/go/httpserver/request_limiter.go +++ b/sdk/go/httpserver/request_limiter.go @@ -34,13 +34,8 @@ const metricsUpdateInterval = time.Second type RequestLimiter struct { Handler http.Handler - // Maximum number of requests being handled at once. Beyond - // this limit, requests will be queued. - MaxConcurrent int - - // Maximum number of requests in the queue. Beyond this limit, - // the lowest priority requests will return 503. - MaxQueue int + // Queue determines which queue a request is assigned to. + Queue func(req *http.Request) *RequestQueue // Priority determines queue ordering. Requests with higher // priority are handled first. Requests with equal priority @@ -48,11 +43,6 @@ type RequestLimiter struct { // handled FIFO. Priority func(req *http.Request, queued time.Time) int64 - // Return 503 for any request for which Priority() returns - // MinPriority if it spends longer than this in the queue - // before starting processing. - MaxQueueTimeForMinPriority time.Duration - // "concurrent_requests", "max_concurrent_requests", // "queued_requests", and "max_queued_requests" metrics are // registered with Registry, if it is not nil. @@ -63,11 +53,32 @@ type RequestLimiter struct { mQueueTimeout *prometheus.SummaryVec mQueueUsage *prometheus.GaugeVec mtx sync.Mutex - handling int - queue queue + rqs map[*RequestQueue]bool // all RequestQueues in use +} + +type RequestQueue struct { + // Label for metrics. No two queues should have the same label. + Label string + + // Maximum number of requests being handled at once. Beyond + // this limit, requests will be queued. + MaxConcurrent int + + // Maximum number of requests in the queue. Beyond this limit, + // the lowest priority requests will return 503. + MaxQueue int + + // Return 503 for any request for which Priority() returns + // MinPriority if it spends longer than this in the queue + // before starting processing. + MaxQueueTimeForMinPriority time.Duration + + queue queue + handling int } type qent struct { + rq *RequestQueue queued time.Time priority int64 heappos int @@ -121,101 +132,96 @@ func (h *queue) remove(i int) { func (rl *RequestLimiter) setup() { if rl.Registry != nil { - rl.Registry.MustRegister(prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "arvados", - Name: "concurrent_requests", - Help: "Number of requests in progress", - }, - func() float64 { - rl.mtx.Lock() - defer rl.mtx.Unlock() - return float64(rl.handling) - }, - )) - rl.Registry.MustRegister(prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "arvados", - Name: "max_concurrent_requests", - Help: "Maximum number of concurrent requests", - }, - func() float64 { return float64(rl.MaxConcurrent) }, - )) + mCurrentReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "concurrent_requests", + Help: "Number of requests in progress", + }, []string{"queue"}) + rl.Registry.MustRegister(mCurrentReqs) + mMaxReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "max_concurrent_requests", + Help: "Maximum number of concurrent requests", + }, []string{"queue"}) + rl.Registry.MustRegister(mMaxReqs) + mMaxQueue := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: "max_queued_requests", + Help: "Maximum number of queued requests", + }, []string{"queue"}) + rl.Registry.MustRegister(mMaxQueue) rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", Name: "queued_requests", Help: "Number of requests in queue", - }, []string{"priority"}) + }, []string{"queue", "priority"}) rl.Registry.MustRegister(rl.mQueueUsage) - rl.Registry.MustRegister(prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: "arvados", - Name: "max_queued_requests", - Help: "Maximum number of queued requests", - }, - func() float64 { return float64(rl.MaxQueue) }, - )) rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "arvados", Name: "queue_delay_seconds", Help: "Time spent in the incoming request queue before start of processing", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, - }, []string{"priority"}) + }, []string{"queue", "priority"}) rl.Registry.MustRegister(rl.mQueueDelay) rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "arvados", Name: "queue_timeout_seconds", Help: "Time spent in the incoming request queue before client timed out or disconnected", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, - }, []string{"priority"}) + }, []string{"queue", "priority"}) rl.Registry.MustRegister(rl.mQueueTimeout) go func() { for range time.NewTicker(metricsUpdateInterval).C { - var low, normal, high int rl.mtx.Lock() - for _, ent := range rl.queue { - switch { - case ent.priority < 0: - low++ - case ent.priority > 0: - high++ - default: - normal++ + for rq := range rl.rqs { + var low, normal, high int + for _, ent := range rq.queue { + switch { + case ent.priority < 0: + low++ + case ent.priority > 0: + high++ + default: + normal++ + } } + mCurrentReqs.WithLabelValues(rq.Label).Set(float64(rq.handling)) + mMaxReqs.WithLabelValues(rq.Label).Set(float64(rq.MaxConcurrent)) + mMaxQueue.WithLabelValues(rq.Label).Set(float64(rq.MaxQueue)) + rl.mQueueUsage.WithLabelValues(rq.Label, "low").Set(float64(low)) + rl.mQueueUsage.WithLabelValues(rq.Label, "normal").Set(float64(normal)) + rl.mQueueUsage.WithLabelValues(rq.Label, "high").Set(float64(high)) } rl.mtx.Unlock() - rl.mQueueUsage.WithLabelValues("low").Set(float64(low)) - rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal)) - rl.mQueueUsage.WithLabelValues("high").Set(float64(high)) } }() } } // caller must have lock -func (rl *RequestLimiter) runqueue() { +func (rq *RequestQueue) runqueue() { // Handle entries from the queue as capacity permits - for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) { - rl.handling++ - ent := rl.queue.removeMax() + for len(rq.queue) > 0 && (rq.MaxConcurrent == 0 || rq.handling < rq.MaxConcurrent) { + rq.handling++ + ent := rq.queue.removeMax() ent.ready <- true } } // If the queue is too full, fail and remove the lowest-priority // entry. Caller must have lock. Queue must not be empty. -func (rl *RequestLimiter) trimqueue() { - if len(rl.queue) <= rl.MaxQueue { +func (rq *RequestQueue) trimqueue() { + if len(rq.queue) <= rq.MaxQueue { return } min := 0 - for i := range rl.queue { - if i == 0 || rl.queue.Less(min, i) { + for i := range rq.queue { + if i == 0 || rq.queue.Less(min, i) { min = i } } - rl.queue[min].ready <- false - rl.queue.remove(min) + rq.queue[min].ready <- false + rq.queue.remove(min) } func (rl *RequestLimiter) enqueue(req *http.Request) *qent { @@ -227,19 +233,24 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent { priority = rl.Priority(req, qtime) } ent := &qent{ + rq: rl.Queue(req), queued: qtime, priority: priority, ready: make(chan bool, 1), heappos: -1, } - if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling { + if rl.rqs == nil { + rl.rqs = map[*RequestQueue]bool{} + } + rl.rqs[ent.rq] = true + if ent.rq.MaxConcurrent == 0 || ent.rq.MaxConcurrent > ent.rq.handling { // fast path, skip the queue - rl.handling++ + ent.rq.handling++ ent.ready <- true return ent } - rl.queue.add(ent) - rl.trimqueue() + ent.rq.queue.add(ent) + ent.rq.trimqueue() return ent } @@ -247,7 +258,7 @@ func (rl *RequestLimiter) remove(ent *qent) { rl.mtx.Lock() defer rl.mtx.Unlock() if ent.heappos >= 0 { - rl.queue.remove(ent.heappos) + ent.rq.queue.remove(ent.heappos) ent.ready <- false } } @@ -255,14 +266,14 @@ func (rl *RequestLimiter) remove(ent *qent) { func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) { rl.setupOnce.Do(rl.setup) ent := rl.enqueue(req) - SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority}) + SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority, "queue": ent.rq.Label}) if ent.priority == MinPriority { // Note that MaxQueueTime==0 does not cancel a req // that skips the queue, because in that case // rl.enqueue() has already fired ready<-true and // rl.remove() is a no-op. go func() { - time.Sleep(rl.MaxQueueTimeForMinPriority) + time.Sleep(ent.rq.MaxQueueTimeForMinPriority) rl.remove(ent) }() } @@ -273,7 +284,7 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) // we still need to wait for ent.ready, because // sometimes runqueue() will have already decided to // send true before our rl.remove() call, and in that - // case we'll need to decrement rl.handling below. + // case we'll need to decrement ent.rq.handling below. ok = <-ent.ready case ok = <-ent.ready: } @@ -298,7 +309,7 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) default: qlabel = "normal" } - series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds()) + series.WithLabelValues(ent.rq.Label, qlabel).Observe(time.Now().Sub(ent.queued).Seconds()) } if !ok { @@ -308,9 +319,9 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) defer func() { rl.mtx.Lock() defer rl.mtx.Unlock() - rl.handling-- + ent.rq.handling-- // unblock the next waiting request - rl.runqueue() + ent.rq.runqueue() }() rl.Handler.ServeHTTP(resp, req) } diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go index 55f13b4625..7366e1426b 100644 --- a/sdk/go/httpserver/request_limiter_test.go +++ b/sdk/go/httpserver/request_limiter_test.go @@ -34,7 +34,11 @@ func newTestHandler() *testHandler { func (s *Suite) TestRequestLimiter1(c *check.C) { h := newTestHandler() - l := RequestLimiter{MaxConcurrent: 1, Handler: h} + rq := &RequestQueue{ + MaxConcurrent: 1} + l := RequestLimiter{ + Queue: func(*http.Request) *RequestQueue { return rq }, + Handler: h} var wg sync.WaitGroup resps := make([]*httptest.ResponseRecorder, 10) for i := 0; i < 10; i++ { @@ -94,7 +98,11 @@ func (s *Suite) TestRequestLimiter1(c *check.C) { func (*Suite) TestRequestLimiter10(c *check.C) { h := newTestHandler() - l := RequestLimiter{MaxConcurrent: 10, Handler: h} + rq := &RequestQueue{ + MaxConcurrent: 10} + l := RequestLimiter{ + Queue: func(*http.Request) *RequestQueue { return rq }, + Handler: h} var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) @@ -114,29 +122,32 @@ func (*Suite) TestRequestLimiter10(c *check.C) { func (*Suite) TestRequestLimiterQueuePriority(c *check.C) { h := newTestHandler() - rl := RequestLimiter{ + rq := &RequestQueue{ MaxConcurrent: 1000, MaxQueue: 200, - Handler: h, + } + rl := RequestLimiter{ + Handler: h, + Queue: func(*http.Request) *RequestQueue { return rq }, Priority: func(r *http.Request, _ time.Time) int64 { p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64) return p }} c.Logf("starting initial requests") - for i := 0; i < rl.MaxConcurrent; i++ { + for i := 0; i < rq.MaxConcurrent; i++ { go func() { rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}}) }() } c.Logf("waiting for initial requests to consume all MaxConcurrent slots") - for i := 0; i < rl.MaxConcurrent; i++ { + for i := 0; i < rq.MaxConcurrent; i++ { <-h.inHandler } - c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rl.MaxQueue) + c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rq.MaxQueue) var wgX sync.WaitGroup - for i := 0; i < rl.MaxQueue; i++ { + for i := 0; i < rq.MaxQueue; i++ { wgX.Add(1) go func() { defer wgX.Done() @@ -147,13 +158,13 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) { } wgX.Wait() - c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rl.MaxQueue) + c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rq.MaxQueue) // Usage docs say the caller isn't allowed to change fields // after first use, but we secretly know it's OK to change // this field on the fly as long as no requests are arriving // concurrently. - rl.MaxQueueTimeForMinPriority = time.Millisecond * 100 - for i := 0; i < rl.MaxQueue; i++ { + rq.MaxQueueTimeForMinPriority = time.Millisecond * 100 + for i := 0; i < rq.MaxQueue; i++ { wgX.Add(1) go func() { defer wgX.Done() @@ -162,17 +173,17 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) { rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}}) c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable) elapsed := time.Since(t0) - c.Check(elapsed > rl.MaxQueueTimeForMinPriority, check.Equals, true) - c.Check(elapsed < rl.MaxQueueTimeForMinPriority*10, check.Equals, true) + c.Check(elapsed > rq.MaxQueueTimeForMinPriority, check.Equals, true) + c.Check(elapsed < rq.MaxQueueTimeForMinPriority*10, check.Equals, true) }() } wgX.Wait() - c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue) + c.Logf("starting %d priority=1 and %d priority=1 requests", rq.MaxQueue, rq.MaxQueue) var wg1, wg2 sync.WaitGroup - wg1.Add(rl.MaxQueue) - wg2.Add(rl.MaxQueue) - for i := 0; i < rl.MaxQueue*2; i++ { + wg1.Add(rq.MaxQueue) + wg2.Add(rq.MaxQueue) + for i := 0; i < rq.MaxQueue*2; i++ { i := i go func() { pri := (i & 1) + 1 @@ -192,12 +203,12 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) { wg1.Wait() c.Logf("allowing initial requests to proceed") - for i := 0; i < rl.MaxConcurrent; i++ { + for i := 0; i < rq.MaxConcurrent; i++ { h.okToProceed <- struct{}{} } c.Logf("allowing queued priority=2 requests to proceed") - for i := 0; i < rl.MaxQueue; i++ { + for i := 0; i < rq.MaxQueue; i++ { <-h.inHandler h.okToProceed <- struct{}{} } diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go index f66194e2a2..81e4c7b867 100644 --- a/services/keep-balance/balance_run_test.go +++ b/services/keep-balance/balance_run_test.go @@ -556,6 +556,10 @@ func (s *runSuite) TestDryRun(c *check.C) { c.Check(bal.stats.trashesDeferred, check.Not(check.Equals), 0) c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0) c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0) + + metrics := arvadostest.GatherMetricsAsString(srv.Metrics.reg) + c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_trash_entries_deferred_count [1-9].*`) + c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_pull_entries_deferred_count [1-9].*`) } func (s *runSuite) TestCommit(c *check.C) { @@ -593,6 +597,36 @@ func (s *runSuite) TestCommit(c *check.C) { c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`) c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`) c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`) + + for _, cat := range []string{ + "dedup_byte_ratio", "dedup_block_ratio", "collection_bytes", + "referenced_bytes", "referenced_blocks", "reference_count", + "pull_entries_sent_count", + "trash_entries_sent_count", + } { + c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_`+cat+` [1-9].*`) + } + + for _, cat := range []string{ + "pull_entries_deferred_count", + "trash_entries_deferred_count", + } { + c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_`+cat+` 0\n.*`) + } + + c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_replicated_block_count{replicas="0"} [1-9].*`) + c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_replicated_block_count{replicas="1"} [1-9].*`) + c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_replicated_block_count{replicas="9"} 0\n.*`) + + for _, sub := range []string{"replicas", "blocks", "bytes"} { + for _, cat := range []string{"needed", "unneeded", "unachievable", "pulling"} { + c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_usage_`+sub+`{status="`+cat+`",storage_class="default"} [1-9].*`) + } + for _, cat := range []string{"total", "garbage", "transient", "overreplicated", "underreplicated", "unachievable", "balanced", "desired", "lost"} { + c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_`+cat+`_`+sub+` [0-9].*`) + } + } + c.Logf("%s", metrics) } func (s *runSuite) TestChunkPrefix(c *check.C) { diff --git a/services/keep-balance/metrics.go b/services/keep-balance/metrics.go index 4683b67b98..02cee3955f 100644 --- a/services/keep-balance/metrics.go +++ b/services/keep-balance/metrics.go @@ -7,6 +7,7 @@ package keepbalance import ( "fmt" "net/http" + "strconv" "sync" "github.com/prometheus/client_golang/prometheus" @@ -17,18 +18,20 @@ type observer interface{ Observe(float64) } type setter interface{ Set(float64) } type metrics struct { - reg *prometheus.Registry - statsGauges map[string]setter - observers map[string]observer - setupOnce sync.Once - mtx sync.Mutex + reg *prometheus.Registry + statsGauges map[string]setter + statsGaugeVecs map[string]*prometheus.GaugeVec + observers map[string]observer + setupOnce sync.Once + mtx sync.Mutex } func newMetrics(registry *prometheus.Registry) *metrics { return &metrics{ - reg: registry, - statsGauges: map[string]setter{}, - observers: map[string]observer{}, + reg: registry, + statsGauges: map[string]setter{}, + statsGaugeVecs: map[string]*prometheus.GaugeVec{}, + observers: map[string]observer{}, } } @@ -63,9 +66,24 @@ func (m *metrics) UpdateStats(s balancerStats) { "transient": {s.unref, "transient (unreferenced, new)"}, "overreplicated": {s.overrep, "overreplicated"}, "underreplicated": {s.underrep, "underreplicated"}, + "unachievable": {s.unachievable, "unachievable"}, + "balanced": {s.justright, "optimally balanced"}, + "desired": {s.desired, "desired"}, "lost": {s.lost, "lost"}, "dedup_byte_ratio": {s.dedupByteRatio(), "deduplication ratio, bytes referenced / bytes stored"}, "dedup_block_ratio": {s.dedupBlockRatio(), "deduplication ratio, blocks referenced / blocks stored"}, + "collection_bytes": {s.collectionBytes, "total apparent size of all collections"}, + "referenced_bytes": {s.collectionBlockBytes, "total size of unique referenced blocks"}, + "reference_count": {s.collectionBlockRefs, "block references in all collections"}, + "referenced_blocks": {s.collectionBlocks, "blocks referenced by any collection"}, + + "pull_entries_sent_count": {s.pulls, "total entries sent in pull lists"}, + "pull_entries_deferred_count": {s.pullsDeferred, "total entries deferred (not sent) in pull lists"}, + "trash_entries_sent_count": {s.trashes, "total entries sent in trash lists"}, + "trash_entries_deferred_count": {s.trashesDeferred, "total entries deferred (not sent) in trash lists"}, + + "replicated_block_count": {s.replHistogram, "blocks with indicated number of replicas at last count"}, + "usage": {s.classStats, "stored in indicated storage class"}, } m.setupOnce.Do(func() { // Register gauge(s) for each balancerStats field. @@ -87,6 +105,29 @@ func (m *metrics) UpdateStats(s balancerStats) { } case int, int64, float64: addGauge(name, gauge.Help) + case []int: + // replHistogram + gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: name, + Subsystem: "keep", + Help: gauge.Help, + }, []string{"replicas"}) + m.reg.MustRegister(gv) + m.statsGaugeVecs[name] = gv + case map[string]replicationStats: + // classStats + for _, sub := range []string{"blocks", "bytes", "replicas"} { + name := name + "_" + sub + gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "arvados", + Name: name, + Subsystem: "keep", + Help: gauge.Help, + }, []string{"storage_class", "status"}) + m.reg.MustRegister(gv) + m.statsGaugeVecs[name] = gv + } default: panic(fmt.Sprintf("bad gauge type %T", gauge.Value)) } @@ -105,6 +146,38 @@ func (m *metrics) UpdateStats(s balancerStats) { m.statsGauges[name].Set(float64(val)) case float64: m.statsGauges[name].Set(float64(val)) + case []int: + // replHistogram + for r, n := range val { + m.statsGaugeVecs[name].WithLabelValues(strconv.Itoa(r)).Set(float64(n)) + } + // Record zero for higher-than-max-replication + // metrics, so we don't incorrectly continue + // to report stale metrics. + // + // For example, if we previously reported n=1 + // for repl=6, but have since restarted + // keep-balance and the most replicated block + // now has repl=5, then the repl=6 gauge will + // still say n=1 until we clear it explicitly + // here. + for r := len(val); r < len(val)+4 || r < len(val)*2; r++ { + m.statsGaugeVecs[name].WithLabelValues(strconv.Itoa(r)).Set(0) + } + case map[string]replicationStats: + // classStats + for class, cs := range val { + for label, val := range map[string]blocksNBytes{ + "needed": cs.needed, + "unneeded": cs.unneeded, + "pulling": cs.pulling, + "unachievable": cs.unachievable, + } { + m.statsGaugeVecs[name+"_blocks"].WithLabelValues(class, label).Set(float64(val.blocks)) + m.statsGaugeVecs[name+"_bytes"].WithLabelValues(class, label).Set(float64(val.bytes)) + m.statsGaugeVecs[name+"_replicas"].WithLabelValues(class, label).Set(float64(val.replicas)) + } + } default: panic(fmt.Sprintf("bad gauge type %T", gauge.Value)) } diff --git a/services/workbench2/src/store/details-panel/details-panel-action.ts b/services/workbench2/src/store/details-panel/details-panel-action.ts index b708ad622c..e14c70ace7 100644 --- a/services/workbench2/src/store/details-panel/details-panel-action.ts +++ b/services/workbench2/src/store/details-panel/details-panel-action.ts @@ -19,7 +19,9 @@ export const SLIDE_TIMEOUT = 500; export const detailsPanelActions = unionize({ TOGGLE_DETAILS_PANEL: ofType<{}>(), OPEN_DETAILS_PANEL: ofType(), - LOAD_DETAILS_PANEL: ofType() + LOAD_DETAILS_PANEL: ofType(), + START_TRANSITION: ofType<{}>(), + END_TRANSITION: ofType<{}>(), }); export type DetailsPanelAction = UnionOf; @@ -41,6 +43,7 @@ export const loadDetailsPanel = (uuid: string) => export const openDetailsPanel = (uuid?: string, tabNr: number = 0) => (dispatch: Dispatch) => { + startDetailsPanelTransition(dispatch) dispatch(detailsPanelActions.OPEN_DETAILS_PANEL(tabNr)); if (uuid !== undefined) { dispatch(loadDetailsPanel(uuid)); @@ -69,8 +72,16 @@ export const toggleDetailsPanel = () => (dispatch: Dispatch, getState: () => Roo setTimeout(() => { window.dispatchEvent(new Event('resize')); }, SLIDE_TIMEOUT); + startDetailsPanelTransition(dispatch) dispatch(detailsPanelActions.TOGGLE_DETAILS_PANEL()); if (getState().detailsPanel.isOpened) { dispatch(loadDetailsPanel(getState().detailsPanel.resourceUuid)); } }; + +const startDetailsPanelTransition = (dispatch) => { + dispatch(detailsPanelActions.START_TRANSITION()) + setTimeout(() => { + dispatch(detailsPanelActions.END_TRANSITION()) + }, SLIDE_TIMEOUT); +} \ No newline at end of file diff --git a/services/workbench2/src/store/details-panel/details-panel-reducer.ts b/services/workbench2/src/store/details-panel/details-panel-reducer.ts index 6c32551cbf..8a0e1d5cd3 100644 --- a/services/workbench2/src/store/details-panel/details-panel-reducer.ts +++ b/services/workbench2/src/store/details-panel/details-panel-reducer.ts @@ -8,12 +8,14 @@ export interface DetailsPanelState { resourceUuid: string; isOpened: boolean; tabNr: number; + isTransitioning: boolean; } const initialState = { resourceUuid: '', isOpened: false, - tabNr: 0 + tabNr: 0, + isTransitioning: false }; export const detailsPanelReducer = (state: DetailsPanelState = initialState, action: DetailsPanelAction) => @@ -22,4 +24,6 @@ export const detailsPanelReducer = (state: DetailsPanelState = initialState, act LOAD_DETAILS_PANEL: resourceUuid => ({ ...state, resourceUuid }), OPEN_DETAILS_PANEL: tabNr => ({ ...state, isOpened: true, tabNr }), TOGGLE_DETAILS_PANEL: () => ({ ...state, isOpened: !state.isOpened }), + START_TRANSITION: () => ({...state, isTransitioning: true}), + END_TRANSITION: () => ({...state, isTransitioning: false}) }); diff --git a/services/workbench2/src/store/side-panel/side-panel-action.ts b/services/workbench2/src/store/side-panel/side-panel-action.ts index e4f53ceaa6..644f76cd4a 100644 --- a/services/workbench2/src/store/side-panel/side-panel-action.ts +++ b/services/workbench2/src/store/side-panel/side-panel-action.ts @@ -6,7 +6,8 @@ import { Dispatch } from 'redux'; import { navigateTo } from 'store/navigation/navigation-action'; export const sidePanelActions = { - TOGGLE_COLLAPSE: 'TOGGLE_COLLAPSE' + TOGGLE_COLLAPSE: 'TOGGLE_COLLAPSE', + SET_CURRENT_WIDTH: 'SET_CURRENT_WIDTH' } export const navigateFromSidePanel = (id: string) => @@ -18,4 +19,10 @@ export const toggleSidePanel = (collapsedState: boolean) => { return (dispatch) => { dispatch({type: sidePanelActions.TOGGLE_COLLAPSE, payload: !collapsedState}) } +} + +export const setCurrentSideWidth = (width: number) => { + return (dispatch) => { + dispatch({type: sidePanelActions.SET_CURRENT_WIDTH, payload: width}) + } } \ No newline at end of file diff --git a/services/workbench2/src/store/side-panel/side-panel-reducer.tsx b/services/workbench2/src/store/side-panel/side-panel-reducer.tsx index a6ed03b6ab..0d9b1ad3df 100644 --- a/services/workbench2/src/store/side-panel/side-panel-reducer.tsx +++ b/services/workbench2/src/store/side-panel/side-panel-reducer.tsx @@ -5,14 +5,17 @@ import { sidePanelActions } from "./side-panel-action" interface SidePanelState { - collapsedState: boolean + collapsedState: boolean, + currentSideWidth: number } const sidePanelInitialState = { - collapsedState: false + collapsedState: false, + currentSideWidth: 0 } export const sidePanelReducer = (state: SidePanelState = sidePanelInitialState, action)=>{ if(action.type === sidePanelActions.TOGGLE_COLLAPSE) return {...state, collapsedState: action.payload} + if(action.type === sidePanelActions.SET_CURRENT_WIDTH) return {...state, currentSideWidth: action.payload} return state } \ No newline at end of file diff --git a/services/workbench2/src/views-components/side-panel-button/side-panel-button.tsx b/services/workbench2/src/views-components/side-panel-button/side-panel-button.tsx index 6acbb16118..47b2316831 100644 --- a/services/workbench2/src/views-components/side-panel-button/side-panel-button.tsx +++ b/services/workbench2/src/views-components/side-panel-button/side-panel-button.tsx @@ -121,7 +121,7 @@ export const SidePanelButton = withStyles(styles)( menuItems = React.createElement(React.Fragment, null, pluginConfig.newButtonMenuList.reduce(reduceItemsFn, React.Children.toArray(menuItems.props.children))); - return + return