--- /dev/null
+#!/usr/bin/env python3
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+"""pypkg_info.py - Introspect installed Python packages
+
+This tool can read metadata about any Python package installed in the current
+environment and report it out in various formats. We use this mainly to pass
+information through when building distribution packages.
+"""
+
+import argparse
+import enum
+import importlib.metadata
+import os
+import sys
+
+from pathlib import PurePath
+
+class RawFormat:
+ def format_metadata(self, key, value):
+ return value
+
+ def format_path(self, path):
+ return str(path)
+
+
+class FPMFormat(RawFormat):
+ PYTHON_METADATA_MAP = {
+ 'summary': 'description',
+ }
+
+ def format_metadata(self, key, value):
+ key = key.lower()
+ key = self.PYTHON_METADATA_MAP.get(key, key)
+ return f'--{key}={value}'
+
+
+class Formats(enum.Enum):
+ RAW = RawFormat
+ FPM = FPMFormat
+
+ @classmethod
+ def from_arg(cls, arg):
+ try:
+ return cls[arg.upper()]
+ except KeyError:
+ raise ValueError(f"unknown format {arg!r}") from None
+
+
+def report_binfiles(args):
+ bin_names = [
+ PurePath('bin', path.name)
+ for pkg_name in args.package_names
+ for path in importlib.metadata.distribution(pkg_name).files
+ if path.parts[-3:-1] == ('..', 'bin')
+ ]
+ fmt = args.format.value().format_path
+ return (fmt(path) for path in bin_names)
+
+def report_metadata(args):
+ dist = importlib.metadata.distribution(args.package_name)
+ fmt = args.format.value().format_metadata
+ for key in args.metadata_key:
+ yield fmt(key, dist.metadata.get(key, ''))
+
+def unescape_str(arg):
+ arg = arg.replace('\'', '\\\'')
+ return eval(f"'''{arg}'''", {})
+
+def parse_arguments(arglist=None):
+ parser = argparse.ArgumentParser()
+ parser.set_defaults(action=None)
+ format_names = ', '.join(fmt.name.lower() for fmt in Formats)
+ parser.add_argument(
+ '--format', '-f',
+ choices=list(Formats),
+ default=Formats.RAW,
+ type=Formats.from_arg,
+ help=f"Output format. Choices are: {format_names}",
+ )
+ parser.add_argument(
+ '--delimiter', '-d',
+ default='\n',
+ type=unescape_str,
+ help="Line ending. Python backslash escapes are supported. Default newline.",
+ )
+ subparsers = parser.add_subparsers()
+
+ binfiles = subparsers.add_parser('binfiles')
+ binfiles.set_defaults(action=report_binfiles)
+ binfiles.add_argument(
+ 'package_names',
+ nargs=argparse.ONE_OR_MORE,
+ )
+
+ metadata = subparsers.add_parser('metadata')
+ metadata.set_defaults(action=report_metadata)
+ metadata.add_argument(
+ 'package_name',
+ )
+ metadata.add_argument(
+ 'metadata_key',
+ nargs=argparse.ONE_OR_MORE,
+ )
+
+ args = parser.parse_args()
+ if args.action is None:
+ parser.error("subcommand is required")
+ return args
+
+def main(arglist=None):
+ args = parse_arguments(arglist)
+ try:
+ for line in args.action(args):
+ print(line, end=args.delimiter)
+ except importlib.metadata.PackageNotFoundError as error:
+ print(f"error: package not found: {error.args[0]}", file=sys.stderr)
+ return os.EX_NOTFOUND
+ else:
+ return os.EX_OK
+
+if __name__ == '__main__':
+ exit(main())
# Python packages
debug_echo -e "\nPython packages\n"
-# The Python SDK - Python3 package
+# Before a Python package can be built, its dependencies must already be built.
+# This list is ordered accordingly.
+setup_build_virtualenv
+fpm_build_virtualenv cwltest "==2.3.20230108193615" "$FORMAT" "$ARCH"
fpm_build_virtualenv "arvados-python-client" "sdk/python" "$FORMAT" "$ARCH"
-
-# Arvados cwl runner - Python3 package
-fpm_build_virtualenv "arvados-cwl-runner" "sdk/cwl" "$FORMAT" "$ARCH"
-
-# The FUSE driver - Python3 package
-fpm_build_virtualenv "arvados-fuse" "services/fuse" "$FORMAT" "$ARCH"
-
-# The Arvados crunchstat-summary tool
fpm_build_virtualenv "crunchstat-summary" "tools/crunchstat-summary" "$FORMAT" "$ARCH"
-
-# The Docker image cleaner
+fpm_build_virtualenv "arvados-cwl-runner" "sdk/cwl" "$FORMAT" "$ARCH"
fpm_build_virtualenv "arvados-docker-cleaner" "services/dockercleaner" "$FORMAT" "$ARCH"
-
-# The Arvados user activity tool
+fpm_build_virtualenv "arvados-fuse" "services/fuse" "$FORMAT" "$ARCH"
fpm_build_virtualenv "arvados-user-activity" "tools/user-activity" "$FORMAT" "$ARCH"
-# The cwltest package, which lives out of tree
-handle_cwltest "$FORMAT" "$ARCH"
-
# Workbench2
package_workbench2
local src=services/workbench2
local dst=/var/www/arvados-workbench2/workbench2
local description="Arvados Workbench 2"
- local version="$(version_from_git)"
cd "$WORKSPACE/$src"
+ local version="$(version_from_git)"
rm -rf ./build
NODE_ENV=production yarn install
VERSION="$version" BUILD_NUMBER="$(default_iteration "$pkgname" "$version" yarn)" GIT_COMMIT="$(git rev-parse HEAD | head -c9)" yarn build
fi
}
-# Usage: handle_cwltest [deb|rpm] [amd64|arm64]
-handle_cwltest () {
- local package_format="$1"; shift
- local target_arch="${1:-amd64}"; shift
-
- if [[ -n "$ONLY_BUILD" ]] && [[ "$ONLY_BUILD" != "python3-cwltest" ]] ; then
- debug_echo -e "Skipping build of cwltest package."
- return 0
- fi
- cd "$WORKSPACE"
- if [[ -e "$WORKSPACE/cwltest" ]]; then
- rm -rf "$WORKSPACE/cwltest"
- fi
- git clone https://github.com/common-workflow-language/cwltest.git
-
- # The subsequent release of cwltest confirms that files exist on disk, since
- # our files are in Keep, all the tests fail.
- # We should add [optional] Arvados support to cwltest so it can access
- # Keep but for the time being just package the last working version.
- (cd cwltest && git checkout 2.3.20230108193615)
-
- # signal to our build script that we want a cwltest executable installed in /usr/bin/
- mkdir cwltest/bin && touch cwltest/bin/cwltest
- fpm_build_virtualenv "cwltest" "cwltest" "$package_format" "$target_arch"
- cd "$WORKSPACE"
- rm -rf "$WORKSPACE/cwltest"
-}
-
# Usage: handle_arvados_src
handle_arvados_src () {
if [[ -n "$ONLY_BUILD" ]] && [[ "$ONLY_BUILD" != "arvados-src" ]] ; then
)
}
+setup_build_virtualenv() {
+ PYTHON_BUILDROOT="$(mktemp --directory --tmpdir pybuild.XXXXXXXX)"
+ "$PYTHON3_EXECUTABLE" -m venv "$PYTHON_BUILDROOT/venv"
+ "$PYTHON_BUILDROOT/venv/bin/pip" install --upgrade build piprepo setuptools wheel
+ mkdir "$PYTHON_BUILDROOT/wheelhouse"
+}
+
# Build python packages with a virtualenv built-in
# Usage: fpm_build_virtualenv arvados-python-client sdk/python [deb|rpm] [amd64|arm64]
fpm_build_virtualenv () {
local target_arch="${1:-amd64}"; shift
native_arch=$(get_native_arch)
-
- if [[ "$pkg" != "arvados-docker-cleaner" ]]; then
- PYTHON_PKG=$PYTHON3_PKG_PREFIX-$pkg
- else
- # Exception to our package naming convention
- PYTHON_PKG=$pkg
- fi
-
- if [[ -n "$ONLY_BUILD" ]] && [[ "$PYTHON_PKG" != "$ONLY_BUILD" ]]; then
- # arvados-python-client sdist should always be built if we are building a
- # python package.
- if [[ "$ONLY_BUILD" != "python3-arvados-cwl-runner" ]] &&
- [[ "$ONLY_BUILD" != "python3-arvados-fuse" ]] &&
- [[ "$ONLY_BUILD" != "python3-crunchstat-summary" ]] &&
- [[ "$ONLY_BUILD" != "arvados-docker-cleaner" ]] &&
- [[ "$ONLY_BUILD" != "python3-arvados-user-activity" ]]; then
- debug_echo -e "Skipping build of $pkg package."
- return 0
- fi
- fi
-
if [[ -n "$target_arch" ]] && [[ "$native_arch" == "$target_arch" ]]; then
fpm_build_virtualenv_worker "$pkg" "$pkg_dir" "$package_format" "$native_arch" "$target_arch"
elif [[ -z "$target_arch" ]]; then
PYTHON_PKG=$PKG
fi
- cd $WORKSPACE/$PKG_DIR
+ # We must always add a wheel to our repository, even if we're not building
+ # this distro package, because it might be a dependency for a later
+ # package we do build.
+ if [[ "$PKG_DIR" =~ ^.=[0-9]+\. ]]; then
+ # Not source to build, but a version to download.
+ # The rest of the function expects a filesystem path, so set one afterwards.
+ "$PYTHON_BUILDROOT/venv/bin/pip" download --dest="$PYTHON_BUILDROOT/wheelhouse" "$PKG$PKG_DIR" \
+ && PKG_DIR="$PYTHON_BUILDROOT/nonexistent"
+ else
+ # Make PKG_DIR absolute.
+ PKG_DIR="$(env -C "$WORKSPACE" readlink -e "$PKG_DIR")"
+ if [[ -e "$PKG_DIR/pyproject.toml" ]]; then
+ "$PYTHON_BUILDROOT/venv/bin/python" -m build --outdir="$PYTHON_BUILDROOT/wheelhouse" "$PKG_DIR"
+ else
+ env -C "$PKG_DIR" "$PYTHON_BUILDROOT/venv/bin/python" setup.py bdist_wheel --dist-dir="$PYTHON_BUILDROOT/wheelhouse"
+ fi
+ fi
+ if [[ $? -ne 0 ]]; then
+ printf "Error, unable to download/build wheel for %s @ %s" "$PKG" "$PKG_DIR"
+ exit 1
+ elif ! "$PYTHON_BUILDROOT/venv/bin/piprepo" build "$PYTHON_BUILDROOT/wheelhouse"; then
+ printf "Error, unable to update local wheel repository"
+ exit 1
+ fi
+
+ if [[ -n "$ONLY_BUILD" ]] && [[ "$PYTHON_PKG" != "$ONLY_BUILD" ]] && [[ "$PKG" != "$ONLY_BUILD" ]]; then
+ return 0
+ fi
- rm -rf dist/*
- local venv_dir="dist/build/usr/lib/$PYTHON_PKG"
+ local venv_dir="$PYTHON_BUILDROOT/$PYTHON_PKG"
echo "Creating virtualenv..."
if ! "$PYTHON3_EXECUTABLE" -m venv "$venv_dir"; then
printf "Error, unable to run\n %s -m venv %s\n" "$PYTHON3_EXECUTABLE" "$venv_dir"
exit 1
- fi
-
- local venv_py="$venv_dir/bin/python$PYTHON3_VERSION"
- if ! "$venv_py" -m pip install --upgrade $DASHQ_UNLESS_DEBUG $CACHE_FLAG pip setuptools wheel; then
- printf "Error, unable to upgrade pip, setuptools, and wheel with
- %s -m pip install --upgrade $DASHQ_UNLESS_DEBUG $CACHE_FLAG pip setuptools wheel
-" "$venv_py"
+ # We must have the dependency resolver introduced in late 2020 for the rest
+ # of our install process to work.
+ # <https://blog.python.org/2020/11/pip-20-3-release-new-resolver.html>
+ elif ! "$venv_dir/bin/pip" install "pip>=20.3"; then
+ printf "Error, unable to run\n %s/bin/pip install 'pip>=20.3'\n" "$venv_dir"
exit 1
fi
- # filter a useless warning (when building the cwltest package) from the stderr output
- if ! "$venv_py" setup.py $DASHQ_UNLESS_DEBUG sdist 2> >(grep -v 'warning: no previously-included files matching'); then
- echo "Error, unable to run $venv_py setup.py sdist for $PKG"
+ local pip_wheel="$(ls --sort=time --reverse "$PYTHON_BUILDROOT/wheelhouse/$(echo "$PKG" | sed s/-/_/g)-"*.whl | tail -n1)"
+ if [[ -z "$pip_wheel" ]]; then
+ printf "Error, unable to find built wheel for $PKG"
+ exit 1
+ elif ! "$venv_dir/bin/pip" install $DASHQ_UNLESS_DEBUG $CACHE_FLAG --extra-index-url="file://$PYTHON_BUILDROOT/wheelhouse/simple" "$pip_wheel"; then
+ printf "Error, unable to run
+ %s/bin/pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG --extra-index-url=file://%s %s
+" "$venv_dir" "$PYTHON_BUILDROOT/wheelhouse/simple" "$pip_wheel"
exit 1
fi
- if [[ "arvados-python-client" == "$PKG" ]]; then
- PYSDK_PATH="-f $(pwd)/dist/"
- fi
-
- if [[ -n "$ONLY_BUILD" ]] && [[ "$PYTHON_PKG" != "$ONLY_BUILD" ]] && [[ "$PKG" != "$ONLY_BUILD" ]]; then
- return 0
- fi
-
- # Determine the package version from the generated sdist archive
- if [[ -n "$ARVADOS_BUILDING_VERSION" ]] ; then
- UNFILTERED_PYTHON_VERSION=$ARVADOS_BUILDING_VERSION
- PYTHON_VERSION=$(echo -n $ARVADOS_BUILDING_VERSION | sed s/~dev/.dev/g | sed s/~rc/rc/g)
- else
- PYTHON_VERSION=$(awk '($1 == "Version:"){print $2}' *.egg-info/PKG-INFO)
- UNFILTERED_PYTHON_VERSION=$(echo -n $PYTHON_VERSION | sed s/\.dev/~dev/g |sed 's/\([0-9]\)rc/\1~rc/g')
- fi
+ # Determine the package version from the wheel
+ PYTHON_VERSION="$("$venv_dir/bin/python" "$WORKSPACE/build/pypkg_info.py" metadata "$PKG" Version)"
+ UNFILTERED_PYTHON_VERSION="$(echo "$PYTHON_VERSION" | sed 's/\.dev/~dev/; s/\([0-9]\)rc/\1~rc/')"
# See if we actually need to build this package; does it exist already?
# We can't do this earlier than here, because we need PYTHON_VERSION.
if ! test_package_presence "$PYTHON_PKG" "$UNFILTERED_PYTHON_VERSION" python3 "$ARVADOS_BUILDING_ITERATION" "$target_arch"; then
return 0
fi
-
echo "Building $package_format ($target_arch) package for $PKG from $PKG_DIR"
- local sdist_path="$(ls dist/*.tar.gz)"
- if ! "$venv_py" -m pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PYSDK_PATH "$sdist_path"; then
- printf "Error, unable to run
- %s -m pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG %s %s
-" "$venv_py" "$PYSDK_PATH" "$sdist_path"
- exit 1
- fi
-
- pushd "$venv_dir" >$STDOUT_IF_DEBUG
-
# Replace the shebang lines in all python scripts, and handle the activate
# scripts too. This is a functional replacement of the 237 line
# virtualenv_tools.py script that doesn't work in python3 without serious
# patching, minus the parts we don't need (modifying pyc files, etc).
- local sys_venv_dir="${venv_dir#dist/build/}"
+ local sys_venv_dir="/usr/lib/$PYTHON_PKG"
local sys_venv_py="$sys_venv_dir/bin/python$PYTHON3_VERSION"
- for binfile in `ls bin/`; do
- if file --mime "bin/$binfile" | grep -q binary; then
+ find "$venv_dir/bin" -type f | while read binfile; do
+ if file --mime "$binfile" | grep -q binary; then
: # Nothing to do for binary files
- elif [[ "$binfile" =~ ^activate(.csh|.fish|)$ ]]; then
- sed -ri "s@VIRTUAL_ENV(=| )\".*\"@VIRTUAL_ENV\\1\"/$sys_venv_dir\"@" "bin/$binfile"
+ elif [[ "$binfile" =~ /activate(.csh|.fish|)$ ]]; then
+ sed -ri "s@VIRTUAL_ENV(=| )\".*\"@VIRTUAL_ENV\\1\"$sys_venv_dir\"@" "$binfile"
else
# Replace shebang line
- sed -ri "1 s@^#\![^[:space:]]+/bin/python[0-9.]*@#\!/$sys_venv_py@" "bin/$binfile"
+ sed -ri "1 s@^#\![^[:space:]]+/bin/python[0-9.]*@#\!$sys_venv_py@" "$binfile"
fi
done
- popd >$STDOUT_IF_DEBUG
- cd dist
-
- find build -iname '*.py[co]' -delete
-
- # Finally, generate the package
- echo "Creating package..."
-
- declare -a COMMAND_ARR=("fpm" "-s" "dir" "-t" "$package_format")
+ # Using `env -C` sets the directory where the package is built.
+ # Using `fpm --chdir` sets the root directory for source arguments.
+ declare -a COMMAND_ARR=(
+ env -C "$PYTHON_BUILDROOT" fpm
+ --chdir="$venv_dir"
+ --name="$PYTHON_PKG"
+ --version="$UNFILTERED_PYTHON_VERSION"
+ --input-type=dir
+ --output-type="$package_format"
+ --depends="$PYTHON3_PACKAGE"
+ --iteration="$ARVADOS_BUILDING_ITERATION"
+ --replaces="python-$PKG"
+ --url="https://arvados.org"
+ )
+ # Append fpm flags corresponding to Python package metadata.
+ readarray -d "" -O "${#COMMAND_ARR[@]}" -t COMMAND_ARR < \
+ <("$venv_dir/bin/python3" "$WORKSPACE/build/pypkg_info.py" \
+ --delimiter=\\0 --format=fpm \
+ metadata "$PKG" License Summary)
if [[ -n "$target_arch" ]] && [[ "$target_arch" != "amd64" ]]; then
COMMAND_ARR+=("-a$target_arch")
COMMAND_ARR+=('--vendor' "$VENDOR")
fi
- COMMAND_ARR+=('--url' 'https://arvados.org')
-
- # Get description
- DESCRIPTION=`grep '\sdescription' $WORKSPACE/$PKG_DIR/setup.py|cut -f2 -d=|sed -e "s/[',\\"]//g"`
- COMMAND_ARR+=('--description' "$DESCRIPTION")
-
- # Get license string
- LICENSE_STRING=`grep license $WORKSPACE/$PKG_DIR/setup.py|cut -f2 -d=|sed -e "s/[',\\"]//g"`
- COMMAND_ARR+=('--license' "$LICENSE_STRING")
-
if [[ "$DEBUG" != "0" ]]; then
COMMAND_ARR+=('--verbose' '--log' 'info')
fi
- COMMAND_ARR+=('-v' $(echo -n "$PYTHON_VERSION" | sed s/.dev/~dev/g | sed s/rc/~rc/g))
- COMMAND_ARR+=('--iteration' "$ARVADOS_BUILDING_ITERATION")
- COMMAND_ARR+=('-n' "$PYTHON_PKG")
- COMMAND_ARR+=('-C' "build")
-
- systemd_unit="$WORKSPACE/$PKG_DIR/$PKG.service"
+ systemd_unit="$PKG_DIR/$PKG.service"
if [[ -e "${systemd_unit}" ]]; then
COMMAND_ARR+=('--after-install' "${WORKSPACE}/build/go-python-package-scripts/postinst")
COMMAND_ARR+=('--before-remove' "${WORKSPACE}/build/go-python-package-scripts/prerm")
fi
- COMMAND_ARR+=('--depends' "$PYTHON3_PACKAGE")
case "$package_format" in
deb)
COMMAND_ARR+=(
declare -a fpm_args=()
declare -a fpm_depends=()
- fpminfo="$WORKSPACE/$PKG_DIR/fpm-info.sh"
+ fpminfo="$PKG_DIR/fpm-info.sh"
if [[ -e "$fpminfo" ]]; then
echo "Loading fpm overrides from $fpminfo"
if ! source "$fpminfo"; then
COMMAND_ARR+=('--depends' "$i")
done
- for i in "${fpm_depends[@]}"; do
- COMMAND_ARR+=('--replaces' "python-$PKG")
- done
-
# make sure the systemd service file ends up in the right place
# used by arvados-docker-cleaner
if [[ -e "${systemd_unit}" ]]; then
- COMMAND_ARR+=("$sys_venv_dir/share/doc/$PKG/$PKG.service=/lib/systemd/system/$PKG.service")
+ COMMAND_ARR+=("share/doc/$PKG/$PKG.service=/lib/systemd/system/$PKG.service")
fi
COMMAND_ARR+=("${fpm_args[@]}")
- # Make sure to install all our package binaries in /usr/bin. We have to
- # walk $WORKSPACE/$PKG_DIR/bin rather than $venv_dir/bin to get the list
- # because the latter also includes scripts installed by all the
- # dependencies in the virtualenv, which may conflict with other
- # packages. We have to take the copies of our binaries from the latter
- # directory, though, because those are the ones we rewrote the shebang
- # line of, above.
- if [[ -e "$WORKSPACE/$PKG_DIR/bin" ]]; then
- for binary in `ls $WORKSPACE/$PKG_DIR/bin`; do
- COMMAND_ARR+=("$sys_venv_dir/bin/$binary=/usr/bin/")
- done
- fi
+ while read -d "" binpath; do
+ COMMAND_ARR+=("$binpath=/usr/$binpath")
+ done < <("$venv_dir/bin/python3" "$WORKSPACE/build/pypkg_info.py" --delimiter=\\0 binfiles "$PKG")
# the python3-arvados-cwl-runner package comes with cwltool, expose that version
- if [[ -e "$WORKSPACE/$PKG_DIR/$venv_dir/bin/cwltool" ]]; then
- COMMAND_ARR+=("$sys_venv_dir/bin/cwltool=/usr/bin/")
+ if [[ "$PKG" == arvados-cwl-runner ]]; then
+ COMMAND_ARR+=("bin/cwltool=/usr/bin/cwltool")
fi
- COMMAND_ARR+=(".")
+ COMMAND_ARR+=(".=$sys_venv_dir")
debug_echo -e "\n${COMMAND_ARR[@]}\n"
echo
echo -e "\n${COMMAND_ARR[@]}\n"
else
- echo `ls *$package_format`
- mv $WORKSPACE/$PKG_DIR/dist/*$package_format $WORKSPACE/packages/$TARGET/
+ ls "$PYTHON_BUILDROOT"/*."$package_format"
+ mv "$PYTHON_BUILDROOT"/*."$package_format" "$WORKSPACE/packages/$TARGET/"
fi
echo
}
setup_virtualenv "$VENV3DIR"
. "$VENV3DIR/bin/activate"
+ # wheel modernizes the venv (as of early 2024) and makes it more closely
+ # match our package build environment.
# PyYAML is a test requirement used by run_test_server.py and needed for
# other, non-Python tests.
# pdoc is needed to build PySDK documentation.
# We run `setup.py build` first to generate _version.py.
- env -C "$WORKSPACE/sdk/python" python3 setup.py build \
- && python3 -m pip install "$WORKSPACE/sdk/python" \
- && python3 -m pip install PyYAML pdoc \
+ pip install PyYAML pdoc wheel \
+ && env -C "$WORKSPACE/sdk/python" python3 setup.py build \
+ && pip install "$WORKSPACE/sdk/python" \
|| fatal "installing Python SDK and related dependencies failed"
}
check_arvados_config "$1"
;;
gofmt \
+ | arvados_version.py \
| cmd/arvados-package \
| doc \
| lib/boot \
go vet -composites=false ./...
}
+test_arvados_version.py() {
+ local orig_fn=""
+ local fail_count=0
+ while read -d "" fn; do
+ if [[ -z "$orig_fn" ]]; then
+ orig_fn="$fn"
+ elif ! cmp "$orig_fn" "$fn"; then
+ fail_count=$(( $fail_count + 1 ))
+ printf "FAIL: %s and %s are not identical\n" "$orig_fn" "$fn"
+ fi
+ done < <(git -C "$WORKSPACE" ls-files -z | grep -z '/arvados_version\.py$')
+ case "$orig_fn" in
+ "") return 66 ;; # EX_NOINPUT
+ *) return "$fail_count" ;;
+ esac
+}
+
test_services/api() {
rm -f "$WORKSPACE/services/api/git-commit.version"
cd "$WORKSPACE/services/api" \
# Install parts needed by test suites
do_install env
do_install cmd/arvados-server go
- do_install sdk/python pip "${VENV3DIR}/bin/"
do_install tools/crunchstat-summary pip "${VENV3DIR}/bin/"
do_install sdk/ruby-google-api-client
do_install sdk/ruby
fi
do_test gofmt
+ do_test arvados_version.py
do_test doc
do_test sdk/ruby-google-api-client
do_test sdk/ruby
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
check "gopkg.in/check.v1"
)
c.Assert(coll.StorageClassesDesired, check.DeepEquals, kc.DefaultStorageClasses)
}
+func (s *IntegrationSuite) createTestCollectionManifest(c *check.C, ac *arvados.Client, kc *keepclient.KeepClient, content string) string {
+ fs, err := (&arvados.Collection{}).FileSystem(ac, kc)
+ c.Assert(err, check.IsNil)
+ f, err := fs.OpenFile("test.txt", os.O_CREATE|os.O_RDWR, 0777)
+ c.Assert(err, check.IsNil)
+ _, err = io.WriteString(f, content)
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ mtxt, err := fs.MarshalManifest(".")
+ c.Assert(err, check.IsNil)
+ return mtxt
+}
+
func (s *IntegrationSuite) TestGetCollectionByPDH(c *check.C) {
conn1 := s.super.Conn("z1111")
rootctx1, _, _ := s.super.RootClients("z1111")
// Create the collection to find its PDH (but don't save it
// anywhere yet)
- var coll1 arvados.Collection
- fs1, err := coll1.FileSystem(ac1, kc1)
- c.Assert(err, check.IsNil)
- f, err := fs1.OpenFile("test.txt", os.O_CREATE|os.O_RDWR, 0777)
- c.Assert(err, check.IsNil)
- _, err = io.WriteString(f, "IntegrationSuite.TestGetCollectionByPDH")
- c.Assert(err, check.IsNil)
- err = f.Close()
- c.Assert(err, check.IsNil)
- mtxt, err := fs1.MarshalManifest(".")
- c.Assert(err, check.IsNil)
+ mtxt := s.createTestCollectionManifest(c, ac1, kc1, c.TestName())
pdh := arvados.PortableDataHash(mtxt)
// Looking up the PDH before saving returns 404 if cycle
// detection is working.
- _, err = conn1.CollectionGet(userctx1, arvados.GetOptions{UUID: pdh})
+ _, err := conn1.CollectionGet(userctx1, arvados.GetOptions{UUID: pdh})
c.Assert(err, check.ErrorMatches, `.*404 Not Found.*`)
// Save the collection on cluster z1111.
- coll1, err = conn1.CollectionCreate(userctx1, arvados.CreateOptions{Attrs: map[string]interface{}{
+ _, err = conn1.CollectionCreate(userctx1, arvados.CreateOptions{Attrs: map[string]interface{}{
"manifest_text": mtxt,
}})
c.Assert(err, check.IsNil)
// Retrieve the collection from cluster z3333.
- coll, err := conn3.CollectionGet(userctx1, arvados.GetOptions{UUID: pdh})
+ coll2, err := conn3.CollectionGet(userctx1, arvados.GetOptions{UUID: pdh})
c.Check(err, check.IsNil)
- c.Check(coll.PortableDataHash, check.Equals, pdh)
+ c.Check(coll2.PortableDataHash, check.Equals, pdh)
+}
+
+func (s *IntegrationSuite) TestFederation_Write1Read2(c *check.C) {
+ s.testFederationCollectionAccess(c, "z1111", "z2222")
+}
+
+func (s *IntegrationSuite) TestFederation_Write2Read1(c *check.C) {
+ s.testFederationCollectionAccess(c, "z2222", "z1111")
+}
+
+func (s *IntegrationSuite) TestFederation_Write2Read3(c *check.C) {
+ s.testFederationCollectionAccess(c, "z2222", "z3333")
+}
+
+func (s *IntegrationSuite) testFederationCollectionAccess(c *check.C, writeCluster, readCluster string) {
+ conn1 := s.super.Conn("z1111")
+ rootctx1, _, _ := s.super.RootClients("z1111")
+ _, ac1, _, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+
+ connW := s.super.Conn(writeCluster)
+ userctxW, acW, kcW := s.super.ClientsWithToken(writeCluster, ac1.AuthToken)
+ kcW.DiskCacheSize = keepclient.DiskCacheDisabled
+ connR := s.super.Conn(readCluster)
+ userctxR, acR, kcR := s.super.ClientsWithToken(readCluster, ac1.AuthToken)
+ kcR.DiskCacheSize = keepclient.DiskCacheDisabled
+
+ filedata := fmt.Sprintf("%s: write to %s, read from %s", c.TestName(), writeCluster, readCluster)
+ mtxt := s.createTestCollectionManifest(c, acW, kcW, filedata)
+ collW, err := connW.CollectionCreate(userctxW, arvados.CreateOptions{Attrs: map[string]interface{}{
+ "manifest_text": mtxt,
+ }})
+ c.Assert(err, check.IsNil)
+
+ collR, err := connR.CollectionGet(userctxR, arvados.GetOptions{UUID: collW.UUID})
+ if !c.Check(err, check.IsNil) {
+ return
+ }
+ fsR, err := collR.FileSystem(acR, kcR)
+ if !c.Check(err, check.IsNil) {
+ return
+ }
+ buf, err := fs.ReadFile(arvados.FS(fsR), "test.txt")
+ if !c.Check(err, check.IsNil) {
+ return
+ }
+ c.Check(string(buf), check.Equals, filedata)
}
// Tests bug #18004
}
// Rather than have an alternate way to tell keepstore how
- // many buffers to use when starting it this way, we just
- // modify the cluster configuration that we feed it on stdin.
- configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+ // many buffers to use, etc., when starting it this way, we
+ // just modify the cluster configuration that we feed it on
+ // stdin.
+ ccfg := *configData.Cluster
+ ccfg.API.MaxKeepBlobBuffers = configData.KeepBuffers
+ ccfg.Collections.BlobTrash = false
+ ccfg.Collections.BlobTrashConcurrency = 0
+ ccfg.Collections.BlobDeleteConcurrency = 0
localaddr := localKeepstoreAddr()
ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0"))
var confJSON bytes.Buffer
err = json.NewEncoder(&confJSON).Encode(arvados.Config{
Clusters: map[string]arvados.Cluster{
- configData.Cluster.ClusterID: *configData.Cluster,
+ ccfg.ClusterID: ccfg,
},
})
if err != nil {
if trial.logConfig == "none" {
c.Check(logExists, Equals, false)
} else {
+ c.Check(log, Matches, `(?ms).*not running trash worker.*`)
+ c.Check(log, Matches, `(?ms).*not running trash emptier.*`)
c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
}
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
+#
+# This file runs in one of three modes:
+#
+# 1. If the ARVADOS_BUILDING_VERSION environment variable is set, it writes
+# _version.py and generates dependencies based on that value.
+# 2. If running from an arvados Git checkout, it writes _version.py
+# and generates dependencies from Git.
+# 3. Otherwise, we expect this is source previously generated from Git, and
+# it reads _version.py and generates dependencies from it.
-import subprocess
-import time
import os
import re
+import runpy
+import subprocess
import sys
-SETUP_DIR = os.path.dirname(os.path.abspath(__file__))
-VERSION_PATHS = {
- SETUP_DIR,
- os.path.abspath(os.path.join(SETUP_DIR, "../python")),
- os.path.abspath(os.path.join(SETUP_DIR, "../../tools/crunchstat-summary")),
- os.path.abspath(os.path.join(SETUP_DIR, "../../build/version-at-commit.sh"))
- }
+from pathlib import Path
+
+# These maps explain the relationships between different Python modules in
+# the arvados repository. We use these to help generate setup.py.
+PACKAGE_DEPENDENCY_MAP = {
+ 'arvados-cwl-runner': ['arvados-python-client', 'crunchstat_summary'],
+ 'arvados-user-activity': ['arvados-python-client'],
+ 'arvados_fuse': ['arvados-python-client'],
+ 'crunchstat_summary': ['arvados-python-client'],
+}
+PACKAGE_MODULE_MAP = {
+ 'arvados-cwl-runner': 'arvados_cwl',
+ 'arvados-docker-cleaner': 'arvados_docker',
+ 'arvados-python-client': 'arvados',
+ 'arvados-user-activity': 'arvados_user_activity',
+ 'arvados_fuse': 'arvados_fuse',
+ 'crunchstat_summary': 'crunchstat_summary',
+}
+PACKAGE_SRCPATH_MAP = {
+ 'arvados-cwl-runner': Path('sdk', 'cwl'),
+ 'arvados-docker-cleaner': Path('services', 'dockercleaner'),
+ 'arvados-python-client': Path('sdk', 'python'),
+ 'arvados-user-activity': Path('tools', 'user-activity'),
+ 'arvados_fuse': Path('services', 'fuse'),
+ 'crunchstat_summary': Path('tools', 'crunchstat-summary'),
+}
+
+ENV_VERSION = os.environ.get("ARVADOS_BUILDING_VERSION")
+SETUP_DIR = Path(__file__).absolute().parent
+try:
+ REPO_PATH = Path(subprocess.check_output(
+ ['git', '-C', str(SETUP_DIR), 'rev-parse', '--show-toplevel'],
+ stderr=subprocess.DEVNULL,
+ text=True,
+ ).rstrip('\n'))
+except (subprocess.CalledProcessError, OSError):
+ REPO_PATH = None
+else:
+ # Verify this is the arvados monorepo
+ if all((REPO_PATH / path).exists() for path in PACKAGE_SRCPATH_MAP.values()):
+ PACKAGE_NAME, = (
+ pkg_name for pkg_name, path in PACKAGE_SRCPATH_MAP.items()
+ if (REPO_PATH / path) == SETUP_DIR
+ )
+ MODULE_NAME = PACKAGE_MODULE_MAP[PACKAGE_NAME]
+ VERSION_SCRIPT_PATH = Path(REPO_PATH, 'build', 'version-at-commit.sh')
+ else:
+ REPO_PATH = None
+if REPO_PATH is None:
+ (PACKAGE_NAME, MODULE_NAME), = (
+ (pkg_name, mod_name)
+ for pkg_name, mod_name in PACKAGE_MODULE_MAP.items()
+ if (SETUP_DIR / mod_name).is_dir()
+ )
+
+def short_tests_only(arglist=sys.argv):
+ try:
+ arglist.remove('--short-tests-only')
+ except ValueError:
+ return False
+ else:
+ return True
+
+def git_log_output(path, *args):
+ return subprocess.check_output(
+ ['git', '-C', str(REPO_PATH),
+ 'log', '--first-parent', '--max-count=1',
+ *args, str(path)],
+ text=True,
+ ).rstrip('\n')
def choose_version_from():
- ts = {}
- for path in VERSION_PATHS:
- ts[subprocess.check_output(
- ['git', 'log', '--first-parent', '--max-count=1',
- '--format=format:%ct', path]).strip()] = path
-
- sorted_ts = sorted(ts.items())
- getver = sorted_ts[-1][1]
- print("Using "+getver+" for version number calculation of "+SETUP_DIR, file=sys.stderr)
+ ver_paths = [SETUP_DIR, VERSION_SCRIPT_PATH, *(
+ PACKAGE_SRCPATH_MAP[pkg]
+ for pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ())
+ )]
+ getver = max(ver_paths, key=lambda path: git_log_output(path, '--format=format:%ct'))
+ print(f"Using {getver} for version number calculation of {SETUP_DIR}", file=sys.stderr)
return getver
def git_version_at_commit():
curdir = choose_version_from()
- myhash = subprocess.check_output(['git', 'log', '-n1', '--first-parent',
- '--format=%H', curdir]).strip()
- myversion = subprocess.check_output([SETUP_DIR+'/../../build/version-at-commit.sh', myhash]).strip().decode()
- return myversion
+ myhash = git_log_output(curdir, '--format=%H')
+ return subprocess.check_output(
+ [str(VERSION_SCRIPT_PATH), myhash],
+ text=True,
+ ).rstrip('\n')
def save_version(setup_dir, module, v):
- v = v.replace("~dev", ".dev").replace("~rc", "rc")
- with open(os.path.join(setup_dir, module, "_version.py"), 'wt') as fp:
- return fp.write("__version__ = '%s'\n" % v)
+ with Path(setup_dir, module, '_version.py').open('w') as fp:
+ print(f"__version__ = {v!r}", file=fp)
def read_version(setup_dir, module):
- with open(os.path.join(setup_dir, module, "_version.py"), 'rt') as fp:
- return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
-
-def get_version(setup_dir, module):
- env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+ file_vars = runpy.run_path(Path(setup_dir, module, '_version.py'))
+ return file_vars['__version__']
- if env_version:
- save_version(setup_dir, module, env_version)
+def get_version(setup_dir=SETUP_DIR, module=MODULE_NAME):
+ if ENV_VERSION:
+ version = ENV_VERSION
+ elif REPO_PATH is None:
+ return read_version(setup_dir, module)
else:
- try:
- save_version(setup_dir, module, git_version_at_commit())
- except (subprocess.CalledProcessError, OSError) as err:
- print("ERROR: {0}".format(err), file=sys.stderr)
- pass
+ version = git_version_at_commit()
+ version = version.replace("~dev", ".dev").replace("~rc", "rc")
+ save_version(setup_dir, module, version)
+ return version
- return read_version(setup_dir, module)
+def iter_dependencies(version=None):
+ if version is None:
+ version = get_version()
+ # A packaged development release should be installed with other
+ # development packages built from the same source, but those
+ # dependencies may have earlier "dev" versions (read: less recent
+ # Git commit timestamps). This compatible version dependency
+ # expresses that as closely as possible. Allowing versions
+ # compatible with .dev0 allows any development release.
+ # Regular expression borrowed partially from
+ # <https://packaging.python.org/en/latest/specifications/version-specifiers/#version-specifiers-regex>
+ dep_ver, match_count = re.subn(r'\.dev(0|[1-9][0-9]*)$', '.dev0', version, 1)
+ dep_op = '~=' if match_count else '=='
+ for dep_pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ()):
+ yield f'{dep_pkg}{dep_op}{dep_ver}'
# Called from calculate_python_sdk_cwl_package_versions() in run-library.sh
if __name__ == '__main__':
- print(get_version(SETUP_DIR, "arvados_cwl"))
+ print(get_version())
from setuptools import setup, find_packages
-SETUP_DIR = os.path.dirname(__file__) or '.'
-README = os.path.join(SETUP_DIR, 'README.rst')
-
import arvados_version
-version = arvados_version.get_version(SETUP_DIR, "arvados_cwl")
-if os.environ.get('ARVADOS_BUILDING_VERSION', False):
- pysdk_dep = "=={}".format(version)
-else:
- # On dev releases, arvados-python-client may have a different timestamp
- pysdk_dep = "<={}".format(version)
+version = arvados_version.get_version()
+README = os.path.join(arvados_version.SETUP_DIR, 'README.rst')
setup(name='arvados-cwl-runner',
version=version,
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
+ *arvados_version.iter_dependencies(version),
'cwltool==3.1.20230601100705',
'schema-salad==8.4.20230601112322',
- 'arvados-python-client{}'.format(pysdk_dep),
- 'crunchstat-summary{}'.format(pysdk_dep),
'ciso8601 >= 2.0.0',
'networkx < 2.6',
'msgpack==1.0.3',
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
+ if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
locs = set()
data = []
block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
blockview = memoryview(block)
- data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+ data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
locs.add(lr.locator)
else:
break
- for lr in prefetch:
- if lr.locator not in locs:
- self.parent._my_block_manager().block_prefetch(lr.locator)
- locs.add(lr.locator)
+ if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+ for lr in prefetch:
+ if lr.locator not in locs:
+ self.parent._my_block_manager().block_prefetch(lr.locator)
+ locs.add(lr.locator)
- return b''.join(data)
+ if len(data) == 1:
+ return data[0]
+ else:
+ return b''.join(data)
@must_be_writable
@synchronized
import errno
import logging
import weakref
+import collections
_logger = logging.getLogger('arvados.keep')
def get(self):
self.ready.wait()
+ # 'content' can None, an empty byte string, or a nonempty mmap
+ # region. If it is an mmap region, we want to advise the
+ # kernel we're going to use it. This nudges the kernel to
+ # re-read most or all of the block if necessary (instead of
+ # just a few pages at a time), reducing the number of page
+ # faults and improving performance by 4x compared to not
+ # calling madvise.
+ if self.content:
+ self.content.madvise(mmap.MADV_WILLNEED)
return self.content
def set(self, value):
if value is None:
self.content = None
self.ready.set()
- return
+ return False
if len(value) == 0:
# Can't mmap a 0 length file
self.content = b''
self.ready.set()
- return
+ return True
if self.content is not None:
# Has been set already
self.ready.set()
- return
+ return False
blockdir = os.path.join(self.cachedir, self.locator[0:3])
os.makedirs(blockdir, mode=0o700, exist_ok=True)
self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
# only set the event when mmap is successful
self.ready.set()
+ return True
finally:
if tmpfile is not None:
# If the tempfile hasn't been renamed on disk yet, try to delete it.
return len(self.content)
def evict(self):
- if self.content is not None and len(self.content) > 0:
- # The mmap region might be in use when we decided to evict
- # it. This can happen if the cache is too small.
- #
- # If we call close() now, it'll throw an error if
- # something tries to access it.
- #
- # However, we don't need to explicitly call mmap.close()
- #
- # I confirmed in mmapmodule.c that that both close
- # and deallocate do the same thing:
+ if not self.content:
+ return
+
+ # The mmap region might be in use when we decided to evict
+ # it. This can happen if the cache is too small.
+ #
+ # If we call close() now, it'll throw an error if
+ # something tries to access it.
+ #
+ # However, we don't need to explicitly call mmap.close()
+ #
+ # I confirmed in mmapmodule.c that that both close
+ # and deallocate do the same thing:
+ #
+ # a) close the file descriptor
+ # b) unmap the memory range
+ #
+ # So we can forget it in the cache and delete the file on
+ # disk, and it will tear it down after any other
+ # lingering Python references to the mapped memory are
+ # gone.
+
+ blockdir = os.path.join(self.cachedir, self.locator[0:3])
+ final = os.path.join(blockdir, self.locator) + cacheblock_suffix
+ try:
+ fcntl.flock(self.filehandle, fcntl.LOCK_UN)
+
+ # try to get an exclusive lock, this ensures other
+ # processes are not using the block. It is
+ # nonblocking and will throw an exception if we
+ # can't get it, which is fine because that means
+ # we just won't try to delete it.
#
- # a) close the file descriptor
- # b) unmap the memory range
+ # I should note here, the file locking is not
+ # strictly necessary, we could just remove it and
+ # the kernel would ensure that the underlying
+ # inode remains available as long as other
+ # processes still have the file open. However, if
+ # you have multiple processes sharing the cache
+ # and deleting each other's files, you'll end up
+ # with a bunch of ghost files that don't show up
+ # in the file system but are still taking up
+ # space, which isn't particularly user friendly.
+ # The locking strategy ensures that cache blocks
+ # in use remain visible.
#
- # So we can forget it in the cache and delete the file on
- # disk, and it will tear it down after any other
- # lingering Python references to the mapped memory are
- # gone.
-
- blockdir = os.path.join(self.cachedir, self.locator[0:3])
- final = os.path.join(blockdir, self.locator) + cacheblock_suffix
- try:
- fcntl.flock(self.filehandle, fcntl.LOCK_UN)
-
- # try to get an exclusive lock, this ensures other
- # processes are not using the block. It is
- # nonblocking and will throw an exception if we
- # can't get it, which is fine because that means
- # we just won't try to delete it.
- #
- # I should note here, the file locking is not
- # strictly necessary, we could just remove it and
- # the kernel would ensure that the underlying
- # inode remains available as long as other
- # processes still have the file open. However, if
- # you have multiple processes sharing the cache
- # and deleting each other's files, you'll end up
- # with a bunch of ghost files that don't show up
- # in the file system but are still taking up
- # space, which isn't particularly user friendly.
- # The locking strategy ensures that cache blocks
- # in use remain visible.
- #
- fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
-
- os.remove(final)
- return True
- except OSError:
- pass
- finally:
- self.filehandle = None
- self.linger = weakref.ref(self.content)
- self.content = None
- return False
+ fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
- def gone(self):
- # Test if an evicted object is lingering
- return self.content is None and (self.linger is None or self.linger() is None)
+ os.remove(final)
+ return True
+ except OSError:
+ pass
+ finally:
+ self.filehandle = None
+ self.content = None
@staticmethod
def get_from_disk(locator, cachedir):
# Map in all the files we found, up to maxslots, if we exceed
# maxslots, start throwing things out.
- cachelist = []
+ cachelist: collections.OrderedDict = collections.OrderedDict()
for b in blocks:
got = DiskCacheSlot.get_from_disk(b[0], cachedir)
if got is None:
continue
if len(cachelist) < maxslots:
- cachelist.append(got)
+ cachelist[got.locator] = got
else:
# we found more blocks than maxslots, try to
# throw it out of the cache.
class KeepBlockCache(object):
def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
self.cache_max = cache_max
- self._cache = []
+ self._cache = collections.OrderedDict()
self._cache_lock = threading.Lock()
self._max_slots = max_slots
self._disk_cache = disk_cache
self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+ self.cache_total = 0
if self._disk_cache:
self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+ for slot in self._cache.values():
+ self.cache_total += slot.size()
self.cap_cache()
-
class CacheSlot(object):
__slots__ = ("locator", "ready", "content")
return self.content
def set(self, value):
+ if self.content is not None:
+ return False
self.content = value
self.ready.set()
+ return True
def size(self):
if self.content is None:
def evict(self):
self.content = None
- return self.gone()
- def gone(self):
- return (self.content is None)
def _resize_cache(self, cache_max, max_slots):
# Try and make sure the contents of the cache do not exceed
# the supplied maximums.
- # Select all slots except those where ready.is_set() and content is
- # None (that means there was an error reading the block).
- self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
- sm = sum([slot.size() for slot in self._cache])
- while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
- for i in range(len(self._cache)-1, -1, -1):
- # start from the back, find a slot that is a candidate to evict
- if self._cache[i].ready.is_set():
- sz = self._cache[i].size()
-
- # If evict returns false it means the
- # underlying disk cache couldn't lock the file
- # for deletion because another process was using
- # it. Don't count it as reducing the amount
- # of data in the cache, find something else to
- # throw out.
- if self._cache[i].evict():
- sm -= sz
-
- # check to make sure the underlying data is gone
- if self._cache[i].gone():
- # either way we forget about it. either the
- # other process will delete it, or if we need
- # it again and it is still there, we'll find
- # it on disk.
- del self._cache[i]
- break
+ if self.cache_total <= cache_max and len(self._cache) <= max_slots:
+ return
+
+ _evict_candidates = collections.deque(self._cache.values())
+ while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
+ slot = _evict_candidates.popleft()
+ if not slot.ready.is_set():
+ continue
+
+ sz = slot.size()
+ slot.evict()
+ self.cache_total -= sz
+ del self._cache[slot.locator]
def cap_cache(self):
def _get(self, locator):
# Test if the locator is already in the cache
- for i in range(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n
+ if locator in self._cache:
+ n = self._cache[locator]
+ if n.ready.is_set() and n.content is None:
+ del self._cache[n.locator]
+ return None
+ self._cache.move_to_end(locator)
+ return n
if self._disk_cache:
# see if it exists on disk
n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
if n is not None:
- self._cache.insert(0, n)
+ self._cache[n.locator] = n
+ self.cache_total += n.size()
return n
return None
n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
else:
n = KeepBlockCache.CacheSlot(locator)
- self._cache.insert(0, n)
+ self._cache[n.locator] = n
return n, True
def set(self, slot, blob):
try:
- slot.set(blob)
+ if slot.set(blob):
+ self.cache_total += slot.size()
return
except OSError as e:
if e.errno == errno.ENOMEM:
elif e.errno == errno.ENOSPC:
# Reduce disk max space to current - 256 MiB, cap cache and retry
with self._cache_lock:
- sm = sum([st.size() for st in self._cache])
+ sm = sum(st.size() for st in self._cache.values())
self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
elif e.errno == errno.ENODEV:
_logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
# exception handler adjusts limits downward in some cases
# to free up resources, which would make the operation
# succeed.
- slot.set(blob)
+ if slot.set(blob):
+ self.cache_total += slot.size()
except Exception as e:
# It failed again. Give up.
slot.set(None)
self.misses_counter = Counter()
self._storage_classes_unsupported_warning = False
self._default_classes = []
- self.num_prefetch_threads = num_prefetch_threads or 2
+ if num_prefetch_threads is not None:
+ self.num_prefetch_threads = num_prefetch_threads
+ else:
+ self.num_prefetch_threads = 2
self._prefetch_queue = None
self._prefetch_threads = None
does not block.
"""
+ if self.block_cache.get(locator) is not None:
+ return
+
self._start_prefetch_threads()
self._prefetch_queue.put(locator)
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
+#
+# This file runs in one of three modes:
+#
+# 1. If the ARVADOS_BUILDING_VERSION environment variable is set, it writes
+# _version.py and generates dependencies based on that value.
+# 2. If running from an arvados Git checkout, it writes _version.py
+# and generates dependencies from Git.
+# 3. Otherwise, we expect this is source previously generated from Git, and
+# it reads _version.py and generates dependencies from it.
-import subprocess
-import time
import os
import re
+import runpy
+import subprocess
import sys
-SETUP_DIR = os.path.dirname(os.path.abspath(__file__))
-VERSION_PATHS = {
- SETUP_DIR,
- os.path.abspath(os.path.join(SETUP_DIR, "../../build/version-at-commit.sh"))
- }
+from pathlib import Path
+
+# These maps explain the relationships between different Python modules in
+# the arvados repository. We use these to help generate setup.py.
+PACKAGE_DEPENDENCY_MAP = {
+ 'arvados-cwl-runner': ['arvados-python-client', 'crunchstat_summary'],
+ 'arvados-user-activity': ['arvados-python-client'],
+ 'arvados_fuse': ['arvados-python-client'],
+ 'crunchstat_summary': ['arvados-python-client'],
+}
+PACKAGE_MODULE_MAP = {
+ 'arvados-cwl-runner': 'arvados_cwl',
+ 'arvados-docker-cleaner': 'arvados_docker',
+ 'arvados-python-client': 'arvados',
+ 'arvados-user-activity': 'arvados_user_activity',
+ 'arvados_fuse': 'arvados_fuse',
+ 'crunchstat_summary': 'crunchstat_summary',
+}
+PACKAGE_SRCPATH_MAP = {
+ 'arvados-cwl-runner': Path('sdk', 'cwl'),
+ 'arvados-docker-cleaner': Path('services', 'dockercleaner'),
+ 'arvados-python-client': Path('sdk', 'python'),
+ 'arvados-user-activity': Path('tools', 'user-activity'),
+ 'arvados_fuse': Path('services', 'fuse'),
+ 'crunchstat_summary': Path('tools', 'crunchstat-summary'),
+}
+
+ENV_VERSION = os.environ.get("ARVADOS_BUILDING_VERSION")
+SETUP_DIR = Path(__file__).absolute().parent
+try:
+ REPO_PATH = Path(subprocess.check_output(
+ ['git', '-C', str(SETUP_DIR), 'rev-parse', '--show-toplevel'],
+ stderr=subprocess.DEVNULL,
+ text=True,
+ ).rstrip('\n'))
+except (subprocess.CalledProcessError, OSError):
+ REPO_PATH = None
+else:
+ # Verify this is the arvados monorepo
+ if all((REPO_PATH / path).exists() for path in PACKAGE_SRCPATH_MAP.values()):
+ PACKAGE_NAME, = (
+ pkg_name for pkg_name, path in PACKAGE_SRCPATH_MAP.items()
+ if (REPO_PATH / path) == SETUP_DIR
+ )
+ MODULE_NAME = PACKAGE_MODULE_MAP[PACKAGE_NAME]
+ VERSION_SCRIPT_PATH = Path(REPO_PATH, 'build', 'version-at-commit.sh')
+ else:
+ REPO_PATH = None
+if REPO_PATH is None:
+ (PACKAGE_NAME, MODULE_NAME), = (
+ (pkg_name, mod_name)
+ for pkg_name, mod_name in PACKAGE_MODULE_MAP.items()
+ if (SETUP_DIR / mod_name).is_dir()
+ )
+
+def short_tests_only(arglist=sys.argv):
+ try:
+ arglist.remove('--short-tests-only')
+ except ValueError:
+ return False
+ else:
+ return True
+
+def git_log_output(path, *args):
+ return subprocess.check_output(
+ ['git', '-C', str(REPO_PATH),
+ 'log', '--first-parent', '--max-count=1',
+ *args, str(path)],
+ text=True,
+ ).rstrip('\n')
def choose_version_from():
- ts = {}
- for path in VERSION_PATHS:
- ts[subprocess.check_output(
- ['git', 'log', '--first-parent', '--max-count=1',
- '--format=format:%ct', path]).strip()] = path
-
- sorted_ts = sorted(ts.items())
- getver = sorted_ts[-1][1]
- print("Using "+getver+" for version number calculation of "+SETUP_DIR, file=sys.stderr)
+ ver_paths = [SETUP_DIR, VERSION_SCRIPT_PATH, *(
+ PACKAGE_SRCPATH_MAP[pkg]
+ for pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ())
+ )]
+ getver = max(ver_paths, key=lambda path: git_log_output(path, '--format=format:%ct'))
+ print(f"Using {getver} for version number calculation of {SETUP_DIR}", file=sys.stderr)
return getver
def git_version_at_commit():
curdir = choose_version_from()
- myhash = subprocess.check_output(['git', 'log', '-n1', '--first-parent',
- '--format=%H', curdir]).strip()
- myversion = subprocess.check_output([SETUP_DIR+'/../../build/version-at-commit.sh', myhash]).strip().decode()
- return myversion
+ myhash = git_log_output(curdir, '--format=%H')
+ return subprocess.check_output(
+ [str(VERSION_SCRIPT_PATH), myhash],
+ text=True,
+ ).rstrip('\n')
def save_version(setup_dir, module, v):
- v = v.replace("~dev", ".dev").replace("~rc", "rc")
- with open(os.path.join(setup_dir, module, "_version.py"), 'wt') as fp:
- return fp.write("__version__ = '%s'\n" % v)
+ with Path(setup_dir, module, '_version.py').open('w') as fp:
+ print(f"__version__ = {v!r}", file=fp)
def read_version(setup_dir, module):
- with open(os.path.join(setup_dir, module, "_version.py"), 'rt') as fp:
- return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
-
-def get_version(setup_dir, module):
- env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+ file_vars = runpy.run_path(Path(setup_dir, module, '_version.py'))
+ return file_vars['__version__']
- if env_version:
- save_version(setup_dir, module, env_version)
+def get_version(setup_dir=SETUP_DIR, module=MODULE_NAME):
+ if ENV_VERSION:
+ version = ENV_VERSION
+ elif REPO_PATH is None:
+ return read_version(setup_dir, module)
else:
- try:
- save_version(setup_dir, module, git_version_at_commit())
- except (subprocess.CalledProcessError, OSError) as err:
- print("ERROR: {0}".format(err), file=sys.stderr)
- pass
+ version = git_version_at_commit()
+ version = version.replace("~dev", ".dev").replace("~rc", "rc")
+ save_version(setup_dir, module, version)
+ return version
- return read_version(setup_dir, module)
+def iter_dependencies(version=None):
+ if version is None:
+ version = get_version()
+ # A packaged development release should be installed with other
+ # development packages built from the same source, but those
+ # dependencies may have earlier "dev" versions (read: less recent
+ # Git commit timestamps). This compatible version dependency
+ # expresses that as closely as possible. Allowing versions
+ # compatible with .dev0 allows any development release.
+ # Regular expression borrowed partially from
+ # <https://packaging.python.org/en/latest/specifications/version-specifiers/#version-specifiers-regex>
+ dep_ver, match_count = re.subn(r'\.dev(0|[1-9][0-9]*)$', '.dev0', version, 1)
+ dep_op = '~=' if match_count else '=='
+ for dep_pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ()):
+ yield f'{dep_pkg}{dep_op}{dep_ver}'
# Called from calculate_python_sdk_cwl_package_versions() in run-library.sh
if __name__ == '__main__':
- print(get_version(SETUP_DIR, "arvados"))
+ print(get_version())
from setuptools import setup, find_packages
from setuptools.command import build_py
-SETUP_DIR = os.path.dirname(__file__) or '.'
-README = os.path.join(SETUP_DIR, 'README.rst')
-
import arvados_version
-version = arvados_version.get_version(SETUP_DIR, "arvados")
-
-short_tests_only = False
-if '--short-tests-only' in sys.argv:
- short_tests_only = True
- sys.argv.remove('--short-tests-only')
+version = arvados_version.get_version()
+short_tests_only = arvados_version.short_tests_only()
+README = os.path.join(arvados_version.SETUP_DIR, 'README.rst')
class BuildPython(build_py.build_py):
"""Extend setuptools `build_py` to generate API documentation
('share/doc/arvados-python-client', ['LICENSE-2.0.txt', 'README.rst']),
],
install_requires=[
+ *arvados_version.iter_dependencies(version),
'ciso8601 >=2.0.0',
'future',
'google-api-core <2.11.0', # 2.11.0rc1 is incompatible with google-auth<2
from builtins import object
import hashlib
import mock
+from mock import patch
import os
import errno
import pycurl
import time
import unittest
import urllib.parse
+import mmap
import parameterized
-@tutil.skip_sleep
-@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
- disk_cache = False
-
- def setUp(self):
- self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
- self.data = b'xyzzy'
- self.locator = '1271ed5ef305aadabc605b1609e24c52'
-
- def tearDown(self):
- DiskCacheBase.tearDown(self)
-
- def test_multiple_default_storage_classes_req_header(self):
- api_mock = self.api_client_mock()
- api_mock.config.return_value = {
- 'StorageClasses': {
- 'foo': { 'Default': True },
- 'bar': { 'Default': True },
- 'baz': { 'Default': False }
- }
- }
- api_client = self.mock_keep_services(api_mock=api_mock, count=2)
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
- resp_hdr = {
- 'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
- 'x-keep-replicas-stored': 1
- }
- with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
- keep_client.put(self.data, copies=1)
- req_hdr = mock.responses[0]
- self.assertIn(
- 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
-
- def test_storage_classes_req_header(self):
- self.assertEqual(
- self.api_client.config()['StorageClasses'],
- {'default': {'Default': True}})
- cases = [
- # requested, expected
- [['foo'], 'X-Keep-Storage-Classes: foo'],
- [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
- [[], 'X-Keep-Storage-Classes: default'],
- [None, 'X-Keep-Storage-Classes: default'],
- ]
- for req_classes, expected_header in cases:
- headers = {'x-keep-replicas-stored': 1}
- if req_classes is None or len(req_classes) == 0:
- confirmed_hdr = 'default=1'
- elif len(req_classes) > 0:
- confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
- headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
- with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
- self.keep_client.put(self.data, copies=1, classes=req_classes)
- req_hdr = mock.responses[0]
- self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
-
- def test_partial_storage_classes_put(self):
- headers = {
- 'x-keep-replicas-stored': 1,
- 'x-keep-storage-classes-confirmed': 'foo=1'}
- with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
- with self.assertRaises(arvados.errors.KeepWriteError):
- self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'], num_retries=0)
- # 1st request, both classes pending
- req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
- self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
- # 2nd try, 'foo' class already satisfied
- req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
- self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
-
- def test_successful_storage_classes_put_requests(self):
- cases = [
- # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
- [ 1, ['foo'], 1, 'foo=1', 1],
- [ 1, ['foo'], 2, 'foo=2', 1],
- [ 2, ['foo'], 2, 'foo=2', 1],
- [ 2, ['foo'], 1, 'foo=1', 2],
- [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
- [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
- [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
- [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
- [ 1, ['foo', 'bar'], 1, None, 1],
- [ 1, ['foo'], 1, None, 1],
- [ 2, ['foo'], 2, None, 1],
- [ 2, ['foo'], 1, None, 2],
- ]
- for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
- headers = {'x-keep-replicas-stored': c_copies}
- if c_classes is not None:
- headers.update({'x-keep-storage-classes-confirmed': c_classes})
- with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
- case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
- self.assertEqual(self.locator,
- self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
- case_desc)
- self.assertEqual(e_reqs, mock.call_count, case_desc)
-
- def test_failed_storage_classes_put_requests(self):
- cases = [
- # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
- [ 1, ['foo'], 1, 'bar=1', 200],
- [ 1, ['foo'], 1, None, 503],
- [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
- [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
- [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
- ]
- for w_copies, w_classes, c_copies, c_classes, return_code in cases:
- headers = {'x-keep-replicas-stored': c_copies}
- if c_classes is not None:
- headers.update({'x-keep-storage-classes-confirmed': c_classes})
- with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
- case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
- with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
- self.keep_client.put(self.data, copies=w_copies, classes=w_classes, num_retries=0)
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
keep_client.get(self.locator)
- @mock.patch('mmap.mmap')
- def test_disk_cache_retry_write_error(self, mockmmap):
+ def test_disk_cache_retry_write_error(self):
block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
disk_cache_dir=self.disk_cache_dir)
keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
- mockmmap.side_effect = (OSError(errno.ENOSPC, "no space"), self.data)
+ called = False
+ realmmap = mmap.mmap
+ def sideeffect_mmap(*args, **kwargs):
+ nonlocal called
+ if not called:
+ called = True
+ raise OSError(errno.ENOSPC, "no space")
+ else:
+ return realmmap(*args, **kwargs)
- cache_max_before = block_cache.cache_max
+ with patch('mmap.mmap') as mockmmap:
+ mockmmap.side_effect = sideeffect_mmap
- with tutil.mock_keep_responses(self.data, 200) as mock:
- self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+ cache_max_before = block_cache.cache_max
- self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
self.assertTrue(tutil.binary_compare(f.read(), self.data))
self.assertTrue(cache_max_before > block_cache.cache_max)
- @mock.patch('mmap.mmap')
- def test_disk_cache_retry_write_error2(self, mockmmap):
+ def test_disk_cache_retry_write_error2(self):
block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
disk_cache_dir=self.disk_cache_dir)
keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
- mockmmap.side_effect = (OSError(errno.ENOMEM, "no memory"), self.data)
+ called = False
+ realmmap = mmap.mmap
+ def sideeffect_mmap(*args, **kwargs):
+ nonlocal called
+ if not called:
+ called = True
+ raise OSError(errno.ENOMEM, "no memory")
+ else:
+ return realmmap(*args, **kwargs)
- slots_before = block_cache._max_slots
+ with patch('mmap.mmap') as mockmmap:
+ mockmmap.side_effect = sideeffect_mmap
- with tutil.mock_keep_responses(self.data, 200) as mock:
- self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+ slots_before = block_cache._max_slots
- self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
self.assertTrue(tutil.binary_compare(f.read(), self.data))
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import pycurl
+
+import unittest
+import parameterized
+from . import arvados_testutil as tutil
+from .arvados_testutil import DiskCacheBase
+
+@tutil.skip_sleep
+@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
+ disk_cache = False
+
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
+ self.data = b'xyzzy'
+ self.locator = '1271ed5ef305aadabc605b1609e24c52'
+
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
+ def test_multiple_default_storage_classes_req_header(self):
+ api_mock = self.api_client_mock()
+ api_mock.config.return_value = {
+ 'StorageClasses': {
+ 'foo': { 'Default': True },
+ 'bar': { 'Default': True },
+ 'baz': { 'Default': False }
+ }
+ }
+ api_client = self.mock_keep_services(api_mock=api_mock, count=2)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ resp_hdr = {
+ 'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
+ 'x-keep-replicas-stored': 1
+ }
+ with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
+ keep_client.put(self.data, copies=1)
+ req_hdr = mock.responses[0]
+ self.assertIn(
+ 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
+
+ def test_storage_classes_req_header(self):
+ self.assertEqual(
+ self.api_client.config()['StorageClasses'],
+ {'default': {'Default': True}})
+ cases = [
+ # requested, expected
+ [['foo'], 'X-Keep-Storage-Classes: foo'],
+ [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
+ [[], 'X-Keep-Storage-Classes: default'],
+ [None, 'X-Keep-Storage-Classes: default'],
+ ]
+ for req_classes, expected_header in cases:
+ headers = {'x-keep-replicas-stored': 1}
+ if req_classes is None or len(req_classes) == 0:
+ confirmed_hdr = 'default=1'
+ elif len(req_classes) > 0:
+ confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
+ headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
+ with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
+ self.keep_client.put(self.data, copies=1, classes=req_classes)
+ req_hdr = mock.responses[0]
+ self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
+
+ def test_partial_storage_classes_put(self):
+ headers = {
+ 'x-keep-replicas-stored': 1,
+ 'x-keep-storage-classes-confirmed': 'foo=1'}
+ with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'], num_retries=0)
+ # 1st request, both classes pending
+ req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
+ self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
+ # 2nd try, 'foo' class already satisfied
+ req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
+ self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
+
+ def test_successful_storage_classes_put_requests(self):
+ cases = [
+ # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
+ [ 1, ['foo'], 1, 'foo=1', 1],
+ [ 1, ['foo'], 2, 'foo=2', 1],
+ [ 2, ['foo'], 2, 'foo=2', 1],
+ [ 2, ['foo'], 1, 'foo=1', 2],
+ [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
+ [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+ [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+ [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
+ [ 1, ['foo', 'bar'], 1, None, 1],
+ [ 1, ['foo'], 1, None, 1],
+ [ 2, ['foo'], 2, None, 1],
+ [ 2, ['foo'], 1, None, 2],
+ ]
+ for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
+ headers = {'x-keep-replicas-stored': c_copies}
+ if c_classes is not None:
+ headers.update({'x-keep-storage-classes-confirmed': c_classes})
+ with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
+ case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
+ self.assertEqual(self.locator,
+ self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
+ case_desc)
+ self.assertEqual(e_reqs, mock.call_count, case_desc)
+
+ def test_failed_storage_classes_put_requests(self):
+ cases = [
+ # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
+ [ 1, ['foo'], 1, 'bar=1', 200],
+ [ 1, ['foo'], 1, None, 503],
+ [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
+ [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
+ [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
+ ]
+ for w_copies, w_classes, c_copies, c_classes, return_code in cases:
+ headers = {'x-keep-replicas-stored': c_copies}
+ if c_classes is not None:
+ headers.update({'x-keep-storage-classes-confirmed': c_classes})
+ with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
+ case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
+ with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
+ self.keep_client.put(self.data, copies=w_copies, classes=w_classes, num_retries=0)
gem 'mini_portile2', '~> 2.8', '>= 2.8.1'
+# If we're running on Ruby 2.x, we'll go down the `else` branch below.
+plugin 'bundler-override' if RUBY_VERSION >= "3.0"
+if bundler_override_paths = Bundler::Plugin.index.load_paths("bundler-override")
+ require File.join(bundler_override_paths[0], "bundler-override")
+ # Ruby 3.4 drops base64 as a default gem. Because of this, various other gems
+ # are starting to declare base64 as a dependency. However, locking one
+ # specific version of base64 makes it more difficult to support older Rubies
+ # that still have it as a default. See <https://dev.arvados.org/issues/21583>.
+ # Because we are focused on supporting distros with those older Rubies, we
+ # drop base64 dependencies here. These overrides can go away once we shift to
+ # supporting Ruby 3.4+.
+ override 'faraday', drop: ['base64']
+else
+ # The plugin is not available, either because Ruby is too old or Bundler
+ # is installing it this run. That's fine as long as Bundler isn't updating
+ # Gemfile.lock. Unfortunately we can't know that for sure here, because
+ # bundler needs to read Gemfile in order to figure out whether it's going
+ # to update Gemfile.lock. Flagging the situation for the user is the best
+ # we can do.
+ Bundler.ui.warn("bundler-override plugin not available - do NOT commit any changes to Gemfile.lock")
+end
+
# Install any plugin gems
Dir.glob(File.join(File.dirname(__FILE__), 'lib', '**', "Gemfile")) do |f|
eval(IO.read(f), binding)
addressable (>= 2.3.1)
extlib (>= 0.9.15)
multi_json (>= 1.0.0)
- base64 (0.2.0)
builder (3.2.4)
byebug (11.1.3)
concurrent-ruby (1.2.3)
factory_bot (~> 6.2.0)
railties (>= 5.0.0)
faraday (2.8.1)
- base64
faraday-net_http (>= 2.0, < 3.1)
ruby2_keywords (>= 0.0.4)
faraday-gzip (2.0.1)
raise "remote cluster #{upstream_cluster_id} returned invalid token uuid #{token_uuid.inspect}"
end
rescue HTTPClient::BadResponseError => e
+ if e.res.status_code >= 400 && e.res.status_code < 500
+ # Remote cluster does not accept this token.
+ return nil
+ end
# CurrentApiToken#call and ApplicationController#render_error will
# propagate the status code from the #http_status method, so define
# that here.
end
rescue ActiveRecord::RecordNotUnique
Rails.logger.debug("cached remote token #{token_uuid} already exists, retrying...")
- # Some other request won the race: retry just once before erroring out
- if (retries += 1) <= 1
+ # Another request won the race (trying to find_or_create the
+ # same token UUID) ...and/or... there is an expired entry with
+ # the same secret but a different UUID (e.g., the token is an
+ # OIDC access token and [a] our database has an expired cached
+ # row that was not used above, and [b] the remote cluster had
+ # deleted its expired cached row so it assigned a new UUID).
+ #
+ # Delete any conflicting row if any. Retry twice (in case we
+ # hit both of those situations at once), then give up.
+ if (retries += 1) <= 2
+ ApiClientAuthorization.where('api_token=? and uuid<>?', stored_secret, token_uuid).delete_all
retry
else
Rails.logger.warn("cannot find or create cached remote token #{token_uuid}")
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require 'test_helper'
+
+class GemfileLockTest < ActionDispatch::IntegrationTest
+ # Like the assertion message says, refer to Gemfile for this test's
+ # rationale. This test can go away once we start supporting Ruby 3.4+.
+ test "base64 gem is not locked to a specific version" do
+ gemfile_lock_path = Rails.root.join("Gemfile.lock")
+ File.open(gemfile_lock_path) do |f|
+ assert_equal(
+ f.each_line.any?(/^\s*base64\s+\(/),
+ false,
+ "Gemfile.lock includes a specific version of base64 - revert and refer to the comments in Gemfile",
+ )
+ end
+ end
+end
res.status = @stub_token_status
if res.status == 200
body = {
- uuid: api_client_authorizations(:active).uuid.sub('zzzzz', clusterid),
+ uuid: @stub_token_uuid || api_client_authorizations(:active).uuid.sub('zzzzz', clusterid),
owner_uuid: "#{clusterid}-tpzed-00000000000000z",
scopes: @stub_token_scopes,
}
}
@stub_token_status = 200
@stub_token_scopes = ["all"]
+ @stub_token_uuid = nil
ActionMailer::Base.deliveries = []
end
assert_equal 'foo', json_response['username']
end
+ test 'authenticate with remote token with secret part identical to previously cached token' do
+ get '/arvados/v1/users/current',
+ params: {format: 'json'},
+ headers: auth(remote: 'zbbbb')
+ assert_response :success
+ get '/arvados/v1/api_client_authorizations/current',
+ params: {format: 'json'},
+ headers: auth(remote: 'zbbbb')
+ assert_response :success
+
+ # Expire the cached token.
+ @cached_token_uuid = json_response['uuid']
+ act_as_system_user do
+ ApiClientAuthorization.where(uuid: @cached_token_uuid).update_all(expires_at: db_current_time() - 1.day)
+ end
+
+ # Now use the same bare token, but set up the remote cluster to
+ # return a different UUID this time.
+ @stub_token_uuid = 'zbbbb-gj3su-123451234512345'
+ get '/arvados/v1/users/current',
+ params: {format: 'json'},
+ headers: auth(remote: 'zbbbb')
+ assert_response :success
+
+ # Confirm that we actually retrieved the new UUID from the stub
+ # cluster -- otherwise we didn't really test the conflicting-UUID
+ # case.
+ get '/arvados/v1/api_client_authorizations/current',
+ params: {format: 'json'},
+ headers: auth(remote: 'zbbbb')
+ assert_response :success
+ assert_equal @stub_token_uuid, json_response['uuid']
+ end
+
test 'authenticate with remote token from misbehaving remote cluster' do
get '/arvados/v1/users/current',
params: {format: 'json'},
assert_equal 'zzzzz-tpzed-anonymouspublic', json_response['uuid']
end
- [401, 403, 422, 500, 502, 503].each do |status|
- test "propagate #{status} response from getting remote token" do
+ [400, 401, 403, 422, 500, 502, 503].each do |status|
+ test "handle #{status} response when checking remote-provided v2 token" do
@stub_token_status = status
get "/arvados/v1/users/#{@stub_content[:uuid]}",
params: {format: "json"},
headers: auth(remote: "zbbbb")
- assert_response status
+ assert_response(status < 500 ? 401 : status)
+ end
+
+ test "handle #{status} response when checking remote-provided v2 token at anonymously accessible endpoint" do
+ @stub_token_status = status
+ get "/arvados/v1/keep_services/accessible",
+ params: {format: "json"},
+ headers: auth(remote: "zbbbb")
+ assert_response(status < 500 ? :success : status)
+ end
+
+ test "handle #{status} response when checking token issued by login cluster" do
+ @stub_token_status = status
+ Rails.configuration.Login.LoginCluster = "zbbbb"
+ get "/arvados/v1/users/current",
+ params: {format: "json"},
+ headers: {'HTTP_AUTHORIZATION' => "Bearer badtoken"}
+ assert_response(status < 500 ? 401 : status)
end
+ test "handle #{status} response when checking token issued by login cluster at anonymously accessible endpoint" do
+ @stub_token_status = status
+ Rails.configuration.Login.LoginCluster = "zbbbb"
+ get "/arvados/v1/keep_services/accessible",
+ params: {format: "json"},
+ headers: {'HTTP_AUTHORIZATION' => "Bearer badtoken"}
+ assert_response(status < 500 ? :success : status)
+ end
+ end
+
+ [401, 403, 422, 500, 502, 503].each do |status|
test "propagate #{status} response from getting uncached user" do
@stub_status = status
get "/arvados/v1/users/#{@stub_content[:uuid]}",
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
+#
+# This file runs in one of three modes:
+#
+# 1. If the ARVADOS_BUILDING_VERSION environment variable is set, it writes
+# _version.py and generates dependencies based on that value.
+# 2. If running from an arvados Git checkout, it writes _version.py
+# and generates dependencies from Git.
+# 3. Otherwise, we expect this is source previously generated from Git, and
+# it reads _version.py and generates dependencies from it.
-import subprocess
-import time
import os
import re
+import runpy
+import subprocess
import sys
-SETUP_DIR = os.path.dirname(os.path.abspath(__file__))
-VERSION_PATHS = {
- SETUP_DIR,
- os.path.abspath(os.path.join(SETUP_DIR, "../../build/version-at-commit.sh"))
- }
+from pathlib import Path
+
+# These maps explain the relationships between different Python modules in
+# the arvados repository. We use these to help generate setup.py.
+PACKAGE_DEPENDENCY_MAP = {
+ 'arvados-cwl-runner': ['arvados-python-client', 'crunchstat_summary'],
+ 'arvados-user-activity': ['arvados-python-client'],
+ 'arvados_fuse': ['arvados-python-client'],
+ 'crunchstat_summary': ['arvados-python-client'],
+}
+PACKAGE_MODULE_MAP = {
+ 'arvados-cwl-runner': 'arvados_cwl',
+ 'arvados-docker-cleaner': 'arvados_docker',
+ 'arvados-python-client': 'arvados',
+ 'arvados-user-activity': 'arvados_user_activity',
+ 'arvados_fuse': 'arvados_fuse',
+ 'crunchstat_summary': 'crunchstat_summary',
+}
+PACKAGE_SRCPATH_MAP = {
+ 'arvados-cwl-runner': Path('sdk', 'cwl'),
+ 'arvados-docker-cleaner': Path('services', 'dockercleaner'),
+ 'arvados-python-client': Path('sdk', 'python'),
+ 'arvados-user-activity': Path('tools', 'user-activity'),
+ 'arvados_fuse': Path('services', 'fuse'),
+ 'crunchstat_summary': Path('tools', 'crunchstat-summary'),
+}
+
+ENV_VERSION = os.environ.get("ARVADOS_BUILDING_VERSION")
+SETUP_DIR = Path(__file__).absolute().parent
+try:
+ REPO_PATH = Path(subprocess.check_output(
+ ['git', '-C', str(SETUP_DIR), 'rev-parse', '--show-toplevel'],
+ stderr=subprocess.DEVNULL,
+ text=True,
+ ).rstrip('\n'))
+except (subprocess.CalledProcessError, OSError):
+ REPO_PATH = None
+else:
+ # Verify this is the arvados monorepo
+ if all((REPO_PATH / path).exists() for path in PACKAGE_SRCPATH_MAP.values()):
+ PACKAGE_NAME, = (
+ pkg_name for pkg_name, path in PACKAGE_SRCPATH_MAP.items()
+ if (REPO_PATH / path) == SETUP_DIR
+ )
+ MODULE_NAME = PACKAGE_MODULE_MAP[PACKAGE_NAME]
+ VERSION_SCRIPT_PATH = Path(REPO_PATH, 'build', 'version-at-commit.sh')
+ else:
+ REPO_PATH = None
+if REPO_PATH is None:
+ (PACKAGE_NAME, MODULE_NAME), = (
+ (pkg_name, mod_name)
+ for pkg_name, mod_name in PACKAGE_MODULE_MAP.items()
+ if (SETUP_DIR / mod_name).is_dir()
+ )
+
+def short_tests_only(arglist=sys.argv):
+ try:
+ arglist.remove('--short-tests-only')
+ except ValueError:
+ return False
+ else:
+ return True
+
+def git_log_output(path, *args):
+ return subprocess.check_output(
+ ['git', '-C', str(REPO_PATH),
+ 'log', '--first-parent', '--max-count=1',
+ *args, str(path)],
+ text=True,
+ ).rstrip('\n')
def choose_version_from():
- ts = {}
- for path in VERSION_PATHS:
- ts[subprocess.check_output(
- ['git', 'log', '--first-parent', '--max-count=1',
- '--format=format:%ct', path]).strip()] = path
-
- sorted_ts = sorted(ts.items())
- getver = sorted_ts[-1][1]
- print("Using "+getver+" for version number calculation of "+SETUP_DIR, file=sys.stderr)
+ ver_paths = [SETUP_DIR, VERSION_SCRIPT_PATH, *(
+ PACKAGE_SRCPATH_MAP[pkg]
+ for pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ())
+ )]
+ getver = max(ver_paths, key=lambda path: git_log_output(path, '--format=format:%ct'))
+ print(f"Using {getver} for version number calculation of {SETUP_DIR}", file=sys.stderr)
return getver
def git_version_at_commit():
curdir = choose_version_from()
- myhash = subprocess.check_output(['git', 'log', '-n1', '--first-parent',
- '--format=%H', curdir]).strip()
- myversion = subprocess.check_output([SETUP_DIR+'/../../build/version-at-commit.sh', myhash]).strip().decode()
- return myversion
+ myhash = git_log_output(curdir, '--format=%H')
+ return subprocess.check_output(
+ [str(VERSION_SCRIPT_PATH), myhash],
+ text=True,
+ ).rstrip('\n')
def save_version(setup_dir, module, v):
- v = v.replace("~dev", ".dev").replace("~rc", "rc")
- with open(os.path.join(setup_dir, module, "_version.py"), 'wt') as fp:
- return fp.write("__version__ = '%s'\n" % v)
+ with Path(setup_dir, module, '_version.py').open('w') as fp:
+ print(f"__version__ = {v!r}", file=fp)
def read_version(setup_dir, module):
- with open(os.path.join(setup_dir, module, "_version.py"), 'rt') as fp:
- return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
-
-def get_version(setup_dir, module):
- env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+ file_vars = runpy.run_path(Path(setup_dir, module, '_version.py'))
+ return file_vars['__version__']
- if env_version:
- save_version(setup_dir, module, env_version)
+def get_version(setup_dir=SETUP_DIR, module=MODULE_NAME):
+ if ENV_VERSION:
+ version = ENV_VERSION
+ elif REPO_PATH is None:
+ return read_version(setup_dir, module)
else:
- try:
- save_version(setup_dir, module, git_version_at_commit())
- except (subprocess.CalledProcessError, OSError) as err:
- print("ERROR: {0}".format(err), file=sys.stderr)
- pass
+ version = git_version_at_commit()
+ version = version.replace("~dev", ".dev").replace("~rc", "rc")
+ save_version(setup_dir, module, version)
+ return version
+
+def iter_dependencies(version=None):
+ if version is None:
+ version = get_version()
+ # A packaged development release should be installed with other
+ # development packages built from the same source, but those
+ # dependencies may have earlier "dev" versions (read: less recent
+ # Git commit timestamps). This compatible version dependency
+ # expresses that as closely as possible. Allowing versions
+ # compatible with .dev0 allows any development release.
+ # Regular expression borrowed partially from
+ # <https://packaging.python.org/en/latest/specifications/version-specifiers/#version-specifiers-regex>
+ dep_ver, match_count = re.subn(r'\.dev(0|[1-9][0-9]*)$', '.dev0', version, 1)
+ dep_op = '~=' if match_count else '=='
+ for dep_pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ()):
+ yield f'{dep_pkg}{dep_op}{dep_ver}'
- return read_version(setup_dir, module)
+# Called from calculate_python_sdk_cwl_package_versions() in run-library.sh
+if __name__ == '__main__':
+ print(get_version())
from setuptools import setup, find_packages
-SETUP_DIR = os.path.dirname(__file__) or '.'
-README = os.path.join(SETUP_DIR, 'README.rst')
-
import arvados_version
-version = arvados_version.get_version(SETUP_DIR, "arvados_docker")
-
-short_tests_only = False
-if '--short-tests-only' in sys.argv:
- short_tests_only = True
- sys.argv.remove('--short-tests-only')
+version = arvados_version.get_version()
+short_tests_only = arvados_version.short_tests_only()
+README = os.path.join(arvados_version.SETUP_DIR, 'README.rst')
setup(name="arvados-docker-cleaner",
version=version,
('share/doc/arvados-docker-cleaner', ['agpl-3.0.txt', 'arvados-docker-cleaner.service']),
],
install_requires=[
+ *arvados_version.iter_dependencies(version),
'docker>=6.1.0',
'setuptools',
],
The FUSE driver supports the Arvados event bus. When an event is received for
an object that is live in the inode cache, that object is immediately updated.
+Implementation note: in the code, the terms 'object', 'entry' and
+'inode' are used somewhat interchangeably, but generally mean an
+arvados_fuse.File or arvados_fuse.Directory object which has numeric
+inode assigned to it and appears in the Inodes._entries dictionary.
+
"""
from __future__ import absolute_import
from __future__ import division
-from future.utils import viewitems
-from future.utils import native
-from future.utils import listvalues
-from future.utils import listitems
-from future import standard_library
-standard_library.install_aliases()
from builtins import next
from builtins import str
from builtins import object
import arvados.keep
from prometheus_client import Summary
import queue
-
-# Default _notify_queue has a limit of 1000 items, but it really needs to be
-# unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
-# details.
-
-if hasattr(llfuse, 'capi'):
- # llfuse < 0.42
- llfuse.capi._notify_queue = queue.Queue()
-else:
- # llfuse >= 0.42
- llfuse._notify_queue = queue.Queue()
-
-LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
+from dataclasses import dataclass
+import typing
from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
-from .fusefile import StringFile, FuseArvadosFile
+from .fusefile import File, StringFile, FuseArvadosFile
_logger = logging.getLogger('arvados.arvados_fuse')
class DirectoryHandle(Handle):
"""Connects a numeric file handle to a Directory object that has
- been opened by the client."""
+ been opened by the client.
+
+ DirectoryHandle is used by opendir() and readdir() to get
+ directory listings. Entries returned by readdir() don't increment
+ the lookup count (kernel references), so increment our internal
+ "use count" to avoid having an item being removed mid-read.
+
+ """
def __init__(self, fh, dirobj, entries):
super(DirectoryHandle, self).__init__(fh, dirobj)
self.entries = entries
+ for ent in self.entries:
+ ent[1].inc_use()
+
+ def release(self):
+ for ent in self.entries:
+ ent[1].dec_use()
+ super(DirectoryHandle, self).release()
+
class InodeCache(object):
"""Records the memory footprint of objects and when they are last used.
- When the cache limit is exceeded, the least recently used objects are
- cleared. Clearing the object means discarding its contents to release
- memory. The next time the object is accessed, it must be re-fetched from
- the server. Note that the inode cache limit is a soft limit; the cache
- limit may be exceeded if necessary to load very large objects, it may also
- be exceeded if open file handles prevent objects from being cleared.
+ When the cache limit is exceeded, the least recently used objects
+ are cleared. Clearing the object means discarding its contents to
+ release memory. The next time the object is accessed, it must be
+ re-fetched from the server. Note that the inode cache limit is a
+ soft limit; the cache limit may be exceeded if necessary to load
+ very large projects or collections, it may also be exceeded if an
+ inode can't be safely discarded based on kernel lookups
+ (has_ref()) or internal use count (in_use()).
"""
def __init__(self, cap, min_entries=4):
- self._entries = collections.OrderedDict()
- self._by_uuid = {}
+ # Standard dictionaries are ordered, but OrderedDict is still better here, see
+ # https://docs.python.org/3.11/library/collections.html#ordereddict-objects
+ # specifically we use move_to_end() which standard dicts don't have.
+ self._cache_entries = collections.OrderedDict()
self.cap = cap
self._total = 0
self.min_entries = min_entries
def total(self):
return self._total
- def _remove(self, obj, clear):
- if clear:
- # Kernel behavior seems to be that if a file is
- # referenced, its parents remain referenced too. This
- # means has_ref() exits early when a collection is not
- # candidate for eviction.
- #
- # By contrast, in_use() doesn't increment references on
- # parents, so it requires a full tree walk to determine if
- # a collection is a candidate for eviction. This takes
- # .07s for 240000 files, which becomes a major drag when
- # cap_cache is being called several times a second and
- # there are multiple non-evictable collections in the
- # cache.
- #
- # So it is important for performance that we do the
- # has_ref() check first.
-
- if obj.has_ref(True):
- _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
- return
+ def evict_candidates(self):
+ """Yield entries that are candidates to be evicted
+ and stop when the cache total has shrunk sufficiently.
- if obj.in_use():
- _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
- return
+ Implements a LRU cache, when an item is added or touch()ed it
+ goes to the back of the OrderedDict, so items in the front are
+ oldest. The Inodes._remove() function determines if the entry
+ can actually be removed safely.
- obj.kernel_invalidate()
- _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
- obj.clear()
+ """
- # The llfuse lock is released in del_entry(), which is called by
- # Directory.clear(). While the llfuse lock is released, it can happen
- # that a reentrant call removes this entry before this call gets to it.
- # Ensure that the entry is still valid before trying to remove it.
- if obj.inode not in self._entries:
+ if self._total <= self.cap:
return
- self._total -= obj.cache_size
- del self._entries[obj.inode]
- if obj.cache_uuid:
- self._by_uuid[obj.cache_uuid].remove(obj)
- if not self._by_uuid[obj.cache_uuid]:
- del self._by_uuid[obj.cache_uuid]
- obj.cache_uuid = None
- if clear:
- _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
+ _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
- def cap_cache(self):
- if self._total > self.cap:
- for ent in listvalues(self._entries):
- if self._total < self.cap or len(self._entries) < self.min_entries:
- break
- self._remove(ent, True)
-
- def manage(self, obj):
- if obj.persisted():
- obj.cache_size = obj.objsize()
- self._entries[obj.inode] = obj
- obj.cache_uuid = obj.uuid()
- if obj.cache_uuid:
- if obj.cache_uuid not in self._by_uuid:
- self._by_uuid[obj.cache_uuid] = [obj]
- else:
- if obj not in self._by_uuid[obj.cache_uuid]:
- self._by_uuid[obj.cache_uuid].append(obj)
- self._total += obj.objsize()
- _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
- obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
- self.cap_cache()
+ # Copy this into a deque for two reasons:
+ #
+ # 1. _cache_entries is modified by unmanage() which is called
+ # by _remove
+ #
+ # 2. popping off the front means the reference goes away
+ # immediately intead of sticking around for the lifetime of
+ # "values"
+ values = collections.deque(self._cache_entries.values())
- def touch(self, obj):
- if obj.persisted():
- if obj.inode in self._entries:
- self._remove(obj, False)
- self.manage(obj)
+ while values:
+ if self._total < self.cap or len(self._cache_entries) < self.min_entries:
+ break
+ yield values.popleft()
- def unmanage(self, obj):
- if obj.persisted() and obj.inode in self._entries:
- self._remove(obj, True)
+ def unmanage(self, entry):
+ """Stop managing an object in the cache.
- def find_by_uuid(self, uuid):
- return self._by_uuid.get(uuid, [])
+ This happens when an object is being removed from the inode
+ entries table.
+
+ """
+
+ if entry.inode not in self._cache_entries:
+ return
+
+ # manage cache size running sum
+ self._total -= entry.cache_size
+ entry.cache_size = 0
+
+ # Now forget about it
+ del self._cache_entries[entry.inode]
+
+ def update_cache_size(self, obj):
+ """Update the cache total in response to the footprint of an
+ object changing (usually because it has been loaded or
+ cleared).
+
+ Adds or removes entries to the cache list based on the object
+ cache size.
+
+ """
+
+ if not obj.persisted():
+ return
+
+ if obj.inode in self._cache_entries:
+ self._total -= obj.cache_size
+
+ obj.cache_size = obj.objsize()
+
+ if obj.cache_size > 0 or obj.parent_inode is None:
+ self._total += obj.cache_size
+ self._cache_entries[obj.inode] = obj
+ elif obj.cache_size == 0 and obj.inode in self._cache_entries:
+ del self._cache_entries[obj.inode]
+
+ def touch(self, obj):
+ """Indicate an object was used recently, making it low
+ priority to be removed from the cache.
+
+ """
+ if obj.inode in self._cache_entries:
+ self._cache_entries.move_to_end(obj.inode)
+ return True
+ return False
def clear(self):
- self._entries.clear()
- self._by_uuid.clear()
+ self._cache_entries.clear()
self._total = 0
+@dataclass
+class RemoveInode:
+ entry: typing.Union[Directory, File]
+ def inode_op(self, inodes, locked_ops):
+ if locked_ops is None:
+ inodes._remove(self.entry)
+ return True
+ else:
+ locked_ops.append(self)
+ return False
+
+@dataclass
+class InvalidateInode:
+ inode: int
+ def inode_op(self, inodes, locked_ops):
+ llfuse.invalidate_inode(self.inode)
+ return True
+
+@dataclass
+class InvalidateEntry:
+ inode: int
+ name: str
+ def inode_op(self, inodes, locked_ops):
+ llfuse.invalidate_entry(self.inode, self.name)
+ return True
+
+@dataclass
+class EvictCandidates:
+ def inode_op(self, inodes, locked_ops):
+ return True
+
+
class Inodes(object):
- """Manage the set of inodes. This is the mapping from a numeric id
- to a concrete File or Directory object"""
+ """Manage the set of inodes.
+
+ This is the mapping from a numeric id to a concrete File or
+ Directory object
- def __init__(self, inode_cache, encoding="utf-8"):
+ """
+
+ def __init__(self, inode_cache, encoding="utf-8", fsns=None, shutdown_started=None):
self._entries = {}
self._counter = itertools.count(llfuse.ROOT_INODE)
self.inode_cache = inode_cache
self.encoding = encoding
- self.deferred_invalidations = []
+ self._fsns = fsns
+ self._shutdown_started = shutdown_started or threading.Event()
+
+ self._inode_remove_queue = queue.Queue()
+ self._inode_remove_thread = threading.Thread(None, self._inode_remove)
+ self._inode_remove_thread.daemon = True
+ self._inode_remove_thread.start()
+
+ self.cap_cache_event = threading.Event()
+ self._by_uuid = collections.defaultdict(list)
def __getitem__(self, item):
return self._entries[item]
return iter(self._entries.keys())
def items(self):
- return viewitems(self._entries.items())
+ return self._entries.items()
def __contains__(self, k):
return k in self._entries
def touch(self, entry):
+ """Update the access time, adjust the cache position, and
+ notify the _inode_remove thread to recheck the cache.
+
+ """
+
entry._atime = time.time()
- self.inode_cache.touch(entry)
+ if self.inode_cache.touch(entry):
+ self.cap_cache()
+
+ def cap_cache(self):
+ """Notify the _inode_remove thread to recheck the cache."""
+ if not self.cap_cache_event.is_set():
+ self.cap_cache_event.set()
+ self._inode_remove_queue.put(EvictCandidates())
+
+ def update_uuid(self, entry):
+ """Update the Arvados uuid associated with an inode entry.
+
+ This is used to look up inodes that need to be invalidated
+ when a websocket event indicates the object has changed on the
+ API server.
+
+ """
+ if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]:
+ self._by_uuid[entry.cache_uuid].remove(entry)
+
+ entry.cache_uuid = entry.uuid()
+ if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]:
+ self._by_uuid[entry.cache_uuid].append(entry)
+
+ if not self._by_uuid[entry.cache_uuid]:
+ del self._by_uuid[entry.cache_uuid]
def add_entry(self, entry):
+ """Assign a numeric inode to a new entry."""
+
entry.inode = next(self._counter)
if entry.inode == llfuse.ROOT_INODE:
entry.inc_ref()
self._entries[entry.inode] = entry
- self.inode_cache.manage(entry)
+
+ self.update_uuid(entry)
+ self.inode_cache.update_cache_size(entry)
+ self.cap_cache()
return entry
def del_entry(self, entry):
- if entry.ref_count == 0:
- self.inode_cache.unmanage(entry)
- del self._entries[entry.inode]
+ """Remove entry from the inode table.
+
+ Indicate this inode entry is pending deletion by setting
+ parent_inode to None. Notify the _inode_remove thread to try
+ and remove it.
+
+ """
+
+ entry.parent_inode = None
+ self._inode_remove_queue.put(RemoveInode(entry))
+ _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+
+ def _inode_remove(self):
+ """Background thread to handle tasks related to invalidating
+ inodes in the kernel, and removing objects from the inodes
+ table entirely.
+
+ """
+
+ locked_ops = collections.deque()
+ while True:
+ blocking_get = True
+ while True:
+ try:
+ qentry = self._inode_remove_queue.get(blocking_get)
+ except queue.Empty:
+ break
+ blocking_get = False
+ if qentry is None:
+ return
+
+ if self._shutdown_started.is_set():
+ continue
+
+ # Process this entry
+ if qentry.inode_op(self, locked_ops):
+ self._inode_remove_queue.task_done()
+
+ # Give up the reference
+ qentry = None
+
+ with llfuse.lock:
+ while locked_ops:
+ if locked_ops.popleft().inode_op(self, None):
+ self._inode_remove_queue.task_done()
+ self.cap_cache_event.clear()
+ for entry in self.inode_cache.evict_candidates():
+ self._remove(entry)
+
+ def wait_remove_queue_empty(self):
+ # used by tests
+ self._inode_remove_queue.join()
+
+ def _remove(self, entry):
+ """Remove an inode entry if possible.
+
+ If the entry is still referenced or in use, don't do anything.
+ If this is not referenced but the parent is still referenced,
+ clear any data held by the object (which may include directory
+ entries under the object) but don't remove it from the inode
+ table.
+
+ """
+ try:
+ if entry.inode is None:
+ # Removed already
+ return
+
+ if entry.inode == llfuse.ROOT_INODE:
+ return
+
+ if entry.in_use():
+ # referenced internally, stay pinned
+ #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+ return
+
+ # Tell the kernel it should forget about it
+ entry.kernel_invalidate()
+
+ if entry.has_ref():
+ # has kernel reference, could still be accessed.
+ # when the kernel forgets about it, we can delete it.
+ #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
+ return
+
+ # commit any pending changes
with llfuse.lock_released:
entry.finalize()
- entry.inode = None
- else:
- entry.dead = True
- _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+
+ # Clear the contents
+ entry.clear()
+
+ if entry.parent_inode is None:
+ _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s",
+ entry.inode, entry.cache_size, self.inode_cache.total(),
+ len(self._entries), type(entry))
+
+ if entry.cache_uuid:
+ self._by_uuid[entry.cache_uuid].remove(entry)
+ if not self._by_uuid[entry.cache_uuid]:
+ del self._by_uuid[entry.cache_uuid]
+ entry.cache_uuid = None
+
+ self.inode_cache.unmanage(entry)
+
+ del self._entries[entry.inode]
+ entry.inode = None
+
+ except Exception as e:
+ _logger.exception("failed remove")
def invalidate_inode(self, entry):
- if entry.has_ref(False):
+ if entry.has_ref():
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- llfuse.invalidate_inode(entry.inode)
+ self._inode_remove_queue.put(InvalidateInode(entry.inode))
def invalidate_entry(self, entry, name):
- if entry.has_ref(False):
+ if entry.has_ref():
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
+ self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding)))
+
+ def begin_shutdown(self):
+ self._inode_remove_queue.put(None)
+ if self._inode_remove_thread is not None:
+ self._inode_remove_thread.join()
+ self._inode_remove_thread = None
def clear(self):
+ with llfuse.lock_released:
+ self.begin_shutdown()
+
self.inode_cache.clear()
+ self._by_uuid.clear()
- for k,v in viewitems(self._entries):
+ for k,v in self._entries.items():
try:
v.finalize()
except Exception as e:
self._entries.clear()
+ def forward_slash_subst(self):
+ return self._fsns
+
+ def find_by_uuid(self, uuid):
+ """Return a list of zero or more inode entries corresponding
+ to this Arvados UUID."""
+ return self._by_uuid.get(uuid, [])
+
def catch_exceptions(orig_func):
"""Catch uncaught exceptions and log them consistently."""
rename_time = fuse_time.labels(op='rename')
flush_time = fuse_time.labels(op='flush')
- def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
+ def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False, fsns=None):
super(Operations, self).__init__()
self._api_client = api_client
if not inode_cache:
inode_cache = InodeCache(cap=256*1024*1024)
- self.inodes = Inodes(inode_cache, encoding=encoding)
+
+ if fsns is None:
+ try:
+ fsns = self._api_client.config()["Collections"]["ForwardSlashNameSubstitution"]
+ except KeyError:
+ # old API server with no FSNS config
+ fsns = '_'
+ else:
+ if fsns == '' or fsns == '/':
+ fsns = None
+
+ # If we get overlapping shutdown events (e.g., fusermount -u
+ # -z and operations.destroy()) llfuse calls forget() on inodes
+ # that have already been deleted. To avoid this, we make
+ # forget() a no-op if called after destroy().
+ self._shutdown_started = threading.Event()
+
+ self.inodes = Inodes(inode_cache, encoding=encoding, fsns=fsns,
+ shutdown_started=self._shutdown_started)
self.uid = uid
self.gid = gid
self.enable_write = enable_write
# is fully initialized should wait() on this event object.
self.initlock = threading.Event()
- # If we get overlapping shutdown events (e.g., fusermount -u
- # -z and operations.destroy()) llfuse calls forget() on inodes
- # that have already been deleted. To avoid this, we make
- # forget() a no-op if called after destroy().
- self._shutdown_started = threading.Event()
-
self.num_retries = num_retries
self.read_counter = arvados.keep.Counter()
def metric_count_func(self, opname):
return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
+ def begin_shutdown(self):
+ self._shutdown_started.set()
+ self.inodes.begin_shutdown()
+
@destroy_time.time()
@catch_exceptions
def destroy(self):
- self._shutdown_started.set()
+ _logger.debug("arv-mount destroy: start")
+
+ with llfuse.lock_released:
+ self.begin_shutdown()
+
if self.events:
self.events.close()
self.events = None
- # Different versions of llfuse require and forbid us to
- # acquire the lock here. See #8345#note-37, #10805#note-9.
- if LLFUSE_VERSION_0 and llfuse.lock.acquire():
- # llfuse < 0.42
- self.inodes.clear()
- llfuse.lock.release()
- else:
- # llfuse >= 0.42
- self.inodes.clear()
+ self.inodes.clear()
+
+ _logger.debug("arv-mount destroy: complete")
+
def access(self, inode, mode, ctx):
return True
old_attrs = properties.get("old_attributes") or {}
new_attrs = properties.get("new_attributes") or {}
- for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
+ for item in self.inodes.find_by_uuid(ev["object_uuid"]):
item.invalidate()
oldowner = old_attrs.get("owner_uuid")
newowner = ev.get("object_owner_uuid")
for parent in (
- self.inodes.inode_cache.find_by_uuid(oldowner) +
- self.inodes.inode_cache.find_by_uuid(newowner)):
+ self.inodes.find_by_uuid(oldowner) +
+ self.inodes.find_by_uuid(newowner)):
parent.invalidate()
@getattr_time.time()
@catch_exceptions
def getattr(self, inode, ctx=None):
if inode not in self.inodes:
+ _logger.debug("arv-mount getattr: inode %i missing", inode)
raise llfuse.FUSEError(errno.ENOENT)
e = self.inodes[inode]
+ self.inodes.touch(e)
+ parent = None
+ if e.parent_inode:
+ parent = self.inodes[e.parent_inode]
+ self.inodes.touch(parent)
entry = llfuse.EntryAttributes()
entry.st_ino = inode
entry.generation = 0
- entry.entry_timeout = 0
+ entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0
entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
if name == '.':
inode = parent_inode
- else:
- if parent_inode in self.inodes:
- p = self.inodes[parent_inode]
- self.inodes.touch(p)
- if name == '..':
- inode = p.parent_inode
- elif isinstance(p, Directory) and name in p:
- inode = p[name].inode
+ elif parent_inode in self.inodes:
+ p = self.inodes[parent_inode]
+ self.inodes.touch(p)
+ if name == '..':
+ inode = p.parent_inode
+ elif isinstance(p, Directory) and name in p:
+ if p[name].inode is None:
+ _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None",
+ parent_inode, name)
+ raise llfuse.FUSEError(errno.ENOENT)
+
+ inode = p[name].inode
if inode != None:
_logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
parent_inode, name, inode)
+ self.inodes.touch(self.inodes[inode])
self.inodes[inode].inc_ref()
return self.getattr(inode)
else:
for inode, nlookup in inodes:
ent = self.inodes[inode]
_logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
- if ent.dec_ref(nlookup) == 0 and ent.dead:
+ if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None:
self.inodes.del_entry(ent)
@open_time.time()
if inode in self.inodes:
p = self.inodes[inode]
else:
+ _logger.debug("arv-mount open: inode %i missing", inode)
raise llfuse.FUSEError(errno.ENOENT)
if isinstance(p, Directory):
finally:
self._filehandles[fh].release()
del self._filehandles[fh]
- self.inodes.inode_cache.cap_cache()
+ self.inodes.cap_cache()
def releasedir(self, fh):
self.release(fh)
if inode in self.inodes:
p = self.inodes[inode]
else:
+ _logger.debug("arv-mount opendir: called with unknown or removed inode %i", inode)
raise llfuse.FUSEError(errno.ENOENT)
if not isinstance(p, Directory):
if p.parent_inode in self.inodes:
parent = self.inodes[p.parent_inode]
else:
+ _logger.warning("arv-mount opendir: parent inode %i of %i is missing", p.parent_inode, inode)
raise llfuse.FUSEError(errno.EIO)
+ _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
+
# update atime
+ p.inc_use()
+ self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items())
+ p.dec_use()
self.inodes.touch(p)
- self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
return fh
@readdir_time.time()
e = off
while e < len(handle.entries):
- if handle.entries[e][1].inode in self.inodes:
- yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
+ ent = handle.entries[e]
+ if ent[1].inode in self.inodes:
+ yield (ent[0].encode(self.inodes.encoding), self.getattr(ent[1].inode), e+1)
e += 1
@statfs_time.time()
metavar='CLASSES',
help="Comma-separated list of storage classes to request for new collections",
)
-
+ # This is a hidden argument used by tests. Normally this
+ # value will be extracted from the cluster config, but mocking
+ # the cluster config under the presence of multiple threads
+ # and processes turned out to be too complicated and brittle.
+ plumbing.add_argument(
+ '--fsns',
+ type=str,
+ default=None,
+ help=argparse.SUPPRESS)
class Mount(object):
def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
disk_cache=self.args.disk_cache,
disk_cache_dir=self.args.disk_cache_dir)
- # If there's too many prefetch threads and you
- # max out the CPU, delivering data to the FUSE
- # layer actually ends up being slower.
- # Experimentally, capping 7 threads seems to
- # be a sweet spot.
- prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+ # Profiling indicates that prefetching has more of a
+ # negative impact on the read() fast path (by requiring it
+ # to do more work and take additional locks) than benefit.
+ # Also, the kernel does some readahead itself, which has a
+ # similar effect.
+ prefetch_threads = 0
self.api = arvados.safeapi.ThreadSafeApiCache(
apiconfig=arvados.config.settings(),
api_client=self.api,
encoding=self.args.encoding,
inode_cache=InodeCache(cap=self.args.directory_cache),
- enable_write=self.args.enable_write)
+ enable_write=self.args.enable_write,
+ fsns=self.args.fsns)
if self.args.crunchstat_interval:
statsthread = threading.Thread(
e = self.operations.inodes.add_entry(Directory(
llfuse.ROOT_INODE,
self.operations.inodes,
- self.api.config,
self.args.enable_write,
self.args.filters,
))
def _llfuse_main(self):
try:
- llfuse.main()
+ llfuse.main(workers=10)
except:
llfuse.close(unmount=False)
raise
+ self.operations.begin_shutdown()
llfuse.close()
"""
__slots__ = ("_stale", "_poll", "_last_update", "_atime", "_poll_time", "use_count",
- "ref_count", "dead", "cache_size", "cache_uuid", "allow_attr_cache")
+ "ref_count", "cache_size", "cache_uuid", "allow_attr_cache")
def __init__(self):
self._stale = True
self._poll_time = 60
self.use_count = 0
self.ref_count = 0
- self.dead = False
self.cache_size = 0
self.cache_uuid = None
self.ref_count -= n
return self.ref_count
- def has_ref(self, only_children):
+ def has_ref(self):
"""Determine if there are any kernel references to this
- object or its children.
-
- If only_children is True, ignore refcount of self and only consider
- children.
+ object.
"""
- if only_children:
- return False
- else:
- return self.ref_count > 0
+ return self.ref_count > 0
def objsize(self):
return 0
and the value referencing a File or Directory object.
"""
- def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters):
+ __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
+
+ def __init__(self, parent_inode, inodes, enable_write, filters):
"""parent_inode is the integer inode number"""
super(Directory, self).__init__()
raise Exception("parent_inode should be an int")
self.parent_inode = parent_inode
self.inodes = inodes
- self.apiconfig = apiconfig
self._entries = {}
self._mtime = time.time()
self._enable_write = enable_write
else:
yield [f_name, *f[1:]]
- def forward_slash_subst(self):
- if not hasattr(self, '_fsns'):
- self._fsns = None
- config = self.apiconfig()
- try:
- self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
- except KeyError:
- # old API server with no FSNS config
- self._fsns = '_'
- else:
- if self._fsns == '' or self._fsns == '/':
- self._fsns = None
- return self._fsns
-
def unsanitize_filename(self, incoming):
"""Replace ForwardSlashNameSubstitution value with /"""
- fsns = self.forward_slash_subst()
+ fsns = self.inodes.forward_slash_subst()
if isinstance(fsns, str):
return incoming.replace(fsns, '/')
else:
elif dirty == '..':
return '__'
else:
- fsns = self.forward_slash_subst()
+ fsns = self.inodes.forward_slash_subst()
if isinstance(fsns, str):
dirty = dirty.replace('/', fsns)
return _disallowed_filename_characters.sub('_', dirty)
self.inodes.touch(self)
super(Directory, self).fresh()
+ def objsize(self):
+ # Rough estimate of memory footprint based on using pympler
+ return len(self._entries) * 1024
+
def merge(self, items, fn, same, new_entry):
"""Helper method for updating the contents of the directory.
entries that are the same in both the old and new lists, create new
entries, and delete old entries missing from the new list.
- :items: iterable with new directory contents
+ Arguments:
+ * items: Iterable --- New directory contents
- :fn: function to take an entry in 'items' and return the desired file or
+ * fn: Callable --- Takes an entry in 'items' and return the desired file or
directory name, or None if this entry should be skipped
- :same: function to compare an existing entry (a File or Directory
+ * same: Callable --- Compare an existing entry (a File or Directory
object) with an entry in the items list to determine whether to keep
the existing entry.
- :new_entry: function to create a new directory entry (File or Directory
+ * new_entry: Callable --- Create a new directory entry (File or Directory
object) from an entry in the items list.
"""
changed = False
for i in items:
name = self.sanitize_filename(fn(i))
- if name:
- if name in oldentries and same(oldentries[name], i):
+ if not name:
+ continue
+ if name in oldentries:
+ ent = oldentries[name]
+ if same(ent, i) and ent.parent_inode == self.inode:
# move existing directory entry over
- self._entries[name] = oldentries[name]
+ self._entries[name] = ent
del oldentries[name]
- else:
- _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
- # create new directory entry
- ent = new_entry(i)
- if ent is not None:
- self._entries[name] = self.inodes.add_entry(ent)
- changed = True
+ self.inodes.inode_cache.touch(ent)
+
+ for i in items:
+ name = self.sanitize_filename(fn(i))
+ if not name:
+ continue
+ if name not in self._entries:
+ # create new directory entry
+ ent = new_entry(i)
+ if ent is not None:
+ self._entries[name] = self.inodes.add_entry(ent)
+ # need to invalidate this just in case there was a
+ # previous entry that couldn't be moved over or a
+ # lookup that returned file not found and cached
+ # a negative result
+ self.inodes.invalidate_entry(self, name)
+ changed = True
+ _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
# delete any other directory entries that were not in found in 'items'
- for i in oldentries:
- _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
- self.inodes.invalidate_entry(self, i)
- self.inodes.del_entry(oldentries[i])
+ for name, ent in oldentries.items():
+ _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
+ self.inodes.invalidate_entry(self, name)
+ self.inodes.del_entry(ent)
changed = True
if changed:
- self.inodes.invalidate_inode(self)
self._mtime = time.time()
+ self.inodes.inode_cache.update_cache_size(self)
self.fresh()
return True
return False
- def has_ref(self, only_children):
- if super(Directory, self).has_ref(only_children):
- return True
- for v in self._entries.values():
- if v.has_ref(False):
- return True
- return False
-
def clear(self):
"""Delete all entries"""
+ if not self._entries:
+ return
oldentries = self._entries
self._entries = {}
- for n in oldentries:
- oldentries[n].clear()
- self.inodes.del_entry(oldentries[n])
self.invalidate()
+ for name, ent in oldentries.items():
+ ent.clear()
+ self.inodes.invalidate_entry(self, name)
+ self.inodes.del_entry(ent)
+ self.inodes.inode_cache.update_cache_size(self)
def kernel_invalidate(self):
# Invalidating the dentry on the parent implies invalidating all paths
# below it as well.
- parent = self.inodes[self.parent_inode]
+ if self.parent_inode in self.inodes:
+ parent = self.inodes[self.parent_inode]
+ else:
+ # parent was removed already.
+ return
# Find self on the parent in order to invalidate this path.
# Calling the public items() method might trigger a refresh,
"""
- def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters, collection, collection_root):
- super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write, filters)
- self.apiconfig = apiconfig
+ __slots__ = ("collection", "collection_root", "collection_record_file")
+
+ def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
+ super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
self.collection = collection
self.collection_root = collection_root
self.collection_record_file = None
def new_entry(self, name, item, mtime):
name = self.sanitize_filename(name)
if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
- if item.fuse_entry.dead is not True:
- raise Exception("Can only reparent dead inode entry")
+ if item.fuse_entry.parent_inode is not None:
+ raise Exception("Can only reparent unparented inode entry")
if item.fuse_entry.inode is None:
raise Exception("Reparented entry must still have valid inode")
- item.fuse_entry.dead = False
+ item.fuse_entry.parent_inode = self.inode
self._entries[name] = item.fuse_entry
elif isinstance(item, arvados.collection.RichCollectionBase):
self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
self.inode,
self.inodes,
- self.apiconfig,
self._enable_write,
self._filters,
item,
def clear(self):
super(CollectionDirectoryBase, self).clear()
+ if self.collection is not None:
+ self.collection.unsubscribe()
self.collection = None
+ def objsize(self):
+ # objsize for the whole collection is represented at the root,
+ # don't double-count it
+ return 0
class CollectionDirectory(CollectionDirectoryBase):
"""Represents the root of a directory tree representing a collection."""
+ __slots__ = ("api", "num_retries", "collection_locator",
+ "_manifest_size", "_writable", "_updating_lock")
+
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
- super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self)
+ super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
self.api = api
self.num_retries = num_retries
self._poll = True
if self.collection_record_file is not None:
self.collection_record_file.invalidate()
self.inodes.invalidate_inode(self.collection_record_file)
- _logger.debug("%s invalidated collection record file", self)
+ _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
+ self.collection_record_file.inode)
+ self.inodes.update_uuid(self)
+ self.inodes.inode_cache.update_cache_size(self)
self.fresh()
def uuid(self):
return False
@use_counter
+ @check_update
def collection_record(self):
self.flush()
return self.collection.api_response()
return (self.collection_locator is not None)
def objsize(self):
- # This is an empirically-derived heuristic to estimate the memory used
- # to store this collection's metadata. Calculating the memory
- # footprint directly would be more accurate, but also more complicated.
- return self._manifest_size * 128
+ # This is a rough guess of the amount of overhead involved for
+ # a collection; the assumptions are that that each file
+ # averages 128 bytes in the manifest, but consume 1024 bytes
+ # of Python data structures, so 1024/128=8 means we estimate
+ # the RAM footprint at 8 times the size of bare manifest text.
+ return self._manifest_size * 8
def finalize(self):
- if self.collection is not None:
- if self.writable():
+ if self.collection is None:
+ return
+
+ if self.writable():
+ try:
self.collection.save()
- self.collection.stop_threads()
+ except Exception as e:
+ _logger.exception("Failed to save collection %s", self.collection_locator)
+ self.collection.stop_threads()
def clear(self):
if self.collection is not None:
self.collection.stop_threads()
- super(CollectionDirectory, self).clear()
self._manifest_size = 0
+ super(CollectionDirectory, self).clear()
+ if self.collection_record_file is not None:
+ self.inodes.del_entry(self.collection_record_file)
+ self.collection_record_file = None
class TmpCollectionDirectory(CollectionDirectoryBase):
# This is always enable_write=True because it never tries to
# save to the backend
super(TmpCollectionDirectory, self).__init__(
- parent_inode, inodes, api_client.config, True, filters, collection, self)
+ parent_inode, inodes, True, filters, collection, self)
self.populate(self.mtime())
def on_event(self, *args, **kwargs):
with self.collection.lock:
self.collection_record_file.invalidate()
self.inodes.invalidate_inode(self.collection_record_file)
- _logger.debug("%s invalidated collection record", self)
+ _logger.debug("%s invalidated collection record", self.inode)
finally:
while lockcount > 0:
self.collection.lock.acquire()
""".lstrip()
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
- super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.pdh_only = pdh_only
"""A special directory that contains as subdirectories all tags visible to the user."""
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
- super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self._poll = True
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
poll=False, poll_time=60):
- super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.tag = tag
class ProjectDirectory(Directory):
"""A special directory that contains the contents of a project."""
+ __slots__ = ("api", "num_retries", "project_object", "project_object_file",
+ "project_uuid", "_updating_lock",
+ "_current_user", "_full_listing", "storage_classes", "recursively_contained")
+
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
project_object, poll=True, poll_time=3, storage_classes=None):
- super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.project_object = project_object
self._current_user = None
self._full_listing = False
self.storage_classes = storage_classes
+ self.recursively_contained = False
+
+ # Filter groups can contain themselves, which causes tools
+ # that walk the filesystem to get stuck in an infinite loop,
+ # so suppress returning a listing in that case.
+ if self.project_object.get("group_class") == "filter":
+ iter_parent_inode = parent_inode
+ while iter_parent_inode != llfuse.ROOT_INODE:
+ parent_dir = self.inodes[iter_parent_inode]
+ if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
+ self.recursively_contained = True
+ break
+ iter_parent_inode = parent_dir.parent_inode
def want_event_subscribe(self):
return True
self.project_object_file = ObjectFile(self.inode, self.project_object)
self.inodes.add_entry(self.project_object_file)
- if not self._full_listing:
+ if self.recursively_contained or not self._full_listing:
return True
def samefn(a, i):
*self._filters_for('collections', qualified=True),
],
) if obj['current_version_uuid'] == obj['uuid'])
-
# end with llfuse.lock_released, re-acquire lock
self.merge(contents,
def persisted(self):
return True
+ def clear(self):
+ super(ProjectDirectory, self).clear()
+ if self.project_object_file is not None:
+ self.inodes.del_entry(self.project_object_file)
+ self.project_object_file = None
+
@use_counter
@check_update
def mkdir(self, name):
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
exclude, poll=False, poll_time=60, storage_classes=None):
- super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.current_user = api.users().current().execute(num_retries=num_retries)
if self.writable():
self.arvfile.parent.root_collection().save()
+ def clear(self):
+ if self.parent_inode is None:
+ self.arvfile.fuse_entry = None
+ self.arvfile = None
+
class StringFile(File):
"""Wrap a simple string as a file"""
+
+ __slots__ = ("contents",)
+
def __init__(self, parent_inode, contents, _mtime):
super(StringFile, self).__init__(parent_inode, _mtime)
self.contents = contents
class ObjectFile(StringFile):
"""Wrap a dict as a serialized json object."""
+ __slots__ = ("object_uuid",)
+
def __init__(self, parent_inode, obj):
super(ObjectFile, self).__init__(parent_inode, "", 0)
self.object_uuid = obj['uuid']
The function is called at the time the file is read. The result is
cached until invalidate() is called.
"""
+
+ __slots__ = ("func",)
+
def __init__(self, parent_inode, func):
super(FuncToJSONFile, self).__init__(parent_inode, "", 0)
self.func = func
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
+#
+# This file runs in one of three modes:
+#
+# 1. If the ARVADOS_BUILDING_VERSION environment variable is set, it writes
+# _version.py and generates dependencies based on that value.
+# 2. If running from an arvados Git checkout, it writes _version.py
+# and generates dependencies from Git.
+# 3. Otherwise, we expect this is source previously generated from Git, and
+# it reads _version.py and generates dependencies from it.
-import subprocess
-import time
import os
import re
+import runpy
+import subprocess
import sys
-SETUP_DIR = os.path.dirname(os.path.abspath(__file__))
-VERSION_PATHS = {
- SETUP_DIR,
- os.path.abspath(os.path.join(SETUP_DIR, "../../sdk/python")),
- os.path.abspath(os.path.join(SETUP_DIR, "../../build/version-at-commit.sh"))
- }
+from pathlib import Path
+
+# These maps explain the relationships between different Python modules in
+# the arvados repository. We use these to help generate setup.py.
+PACKAGE_DEPENDENCY_MAP = {
+ 'arvados-cwl-runner': ['arvados-python-client', 'crunchstat_summary'],
+ 'arvados-user-activity': ['arvados-python-client'],
+ 'arvados_fuse': ['arvados-python-client'],
+ 'crunchstat_summary': ['arvados-python-client'],
+}
+PACKAGE_MODULE_MAP = {
+ 'arvados-cwl-runner': 'arvados_cwl',
+ 'arvados-docker-cleaner': 'arvados_docker',
+ 'arvados-python-client': 'arvados',
+ 'arvados-user-activity': 'arvados_user_activity',
+ 'arvados_fuse': 'arvados_fuse',
+ 'crunchstat_summary': 'crunchstat_summary',
+}
+PACKAGE_SRCPATH_MAP = {
+ 'arvados-cwl-runner': Path('sdk', 'cwl'),
+ 'arvados-docker-cleaner': Path('services', 'dockercleaner'),
+ 'arvados-python-client': Path('sdk', 'python'),
+ 'arvados-user-activity': Path('tools', 'user-activity'),
+ 'arvados_fuse': Path('services', 'fuse'),
+ 'crunchstat_summary': Path('tools', 'crunchstat-summary'),
+}
+
+ENV_VERSION = os.environ.get("ARVADOS_BUILDING_VERSION")
+SETUP_DIR = Path(__file__).absolute().parent
+try:
+ REPO_PATH = Path(subprocess.check_output(
+ ['git', '-C', str(SETUP_DIR), 'rev-parse', '--show-toplevel'],
+ stderr=subprocess.DEVNULL,
+ text=True,
+ ).rstrip('\n'))
+except (subprocess.CalledProcessError, OSError):
+ REPO_PATH = None
+else:
+ # Verify this is the arvados monorepo
+ if all((REPO_PATH / path).exists() for path in PACKAGE_SRCPATH_MAP.values()):
+ PACKAGE_NAME, = (
+ pkg_name for pkg_name, path in PACKAGE_SRCPATH_MAP.items()
+ if (REPO_PATH / path) == SETUP_DIR
+ )
+ MODULE_NAME = PACKAGE_MODULE_MAP[PACKAGE_NAME]
+ VERSION_SCRIPT_PATH = Path(REPO_PATH, 'build', 'version-at-commit.sh')
+ else:
+ REPO_PATH = None
+if REPO_PATH is None:
+ (PACKAGE_NAME, MODULE_NAME), = (
+ (pkg_name, mod_name)
+ for pkg_name, mod_name in PACKAGE_MODULE_MAP.items()
+ if (SETUP_DIR / mod_name).is_dir()
+ )
+
+def short_tests_only(arglist=sys.argv):
+ try:
+ arglist.remove('--short-tests-only')
+ except ValueError:
+ return False
+ else:
+ return True
+
+def git_log_output(path, *args):
+ return subprocess.check_output(
+ ['git', '-C', str(REPO_PATH),
+ 'log', '--first-parent', '--max-count=1',
+ *args, str(path)],
+ text=True,
+ ).rstrip('\n')
def choose_version_from():
- ts = {}
- for path in VERSION_PATHS:
- ts[subprocess.check_output(
- ['git', 'log', '--first-parent', '--max-count=1',
- '--format=format:%ct', path]).strip()] = path
-
- sorted_ts = sorted(ts.items())
- getver = sorted_ts[-1][1]
- print("Using "+getver+" for version number calculation of "+SETUP_DIR, file=sys.stderr)
+ ver_paths = [SETUP_DIR, VERSION_SCRIPT_PATH, *(
+ PACKAGE_SRCPATH_MAP[pkg]
+ for pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ())
+ )]
+ getver = max(ver_paths, key=lambda path: git_log_output(path, '--format=format:%ct'))
+ print(f"Using {getver} for version number calculation of {SETUP_DIR}", file=sys.stderr)
return getver
def git_version_at_commit():
curdir = choose_version_from()
- myhash = subprocess.check_output(['git', 'log', '-n1', '--first-parent',
- '--format=%H', curdir]).strip()
- myversion = subprocess.check_output([SETUP_DIR+'/../../build/version-at-commit.sh', myhash]).strip().decode()
- return myversion
+ myhash = git_log_output(curdir, '--format=%H')
+ return subprocess.check_output(
+ [str(VERSION_SCRIPT_PATH), myhash],
+ text=True,
+ ).rstrip('\n')
def save_version(setup_dir, module, v):
- v = v.replace("~dev", ".dev").replace("~rc", "rc")
- with open(os.path.join(setup_dir, module, "_version.py"), 'wt') as fp:
- return fp.write("__version__ = '%s'\n" % v)
+ with Path(setup_dir, module, '_version.py').open('w') as fp:
+ print(f"__version__ = {v!r}", file=fp)
def read_version(setup_dir, module):
- with open(os.path.join(setup_dir, module, "_version.py"), 'rt') as fp:
- return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
-
-def get_version(setup_dir, module):
- env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+ file_vars = runpy.run_path(Path(setup_dir, module, '_version.py'))
+ return file_vars['__version__']
- if env_version:
- save_version(setup_dir, module, env_version)
+def get_version(setup_dir=SETUP_DIR, module=MODULE_NAME):
+ if ENV_VERSION:
+ version = ENV_VERSION
+ elif REPO_PATH is None:
+ return read_version(setup_dir, module)
else:
- try:
- save_version(setup_dir, module, git_version_at_commit())
- except (subprocess.CalledProcessError, OSError) as err:
- print("ERROR: {0}".format(err), file=sys.stderr)
- pass
+ version = git_version_at_commit()
+ version = version.replace("~dev", ".dev").replace("~rc", "rc")
+ save_version(setup_dir, module, version)
+ return version
+
+def iter_dependencies(version=None):
+ if version is None:
+ version = get_version()
+ # A packaged development release should be installed with other
+ # development packages built from the same source, but those
+ # dependencies may have earlier "dev" versions (read: less recent
+ # Git commit timestamps). This compatible version dependency
+ # expresses that as closely as possible. Allowing versions
+ # compatible with .dev0 allows any development release.
+ # Regular expression borrowed partially from
+ # <https://packaging.python.org/en/latest/specifications/version-specifiers/#version-specifiers-regex>
+ dep_ver, match_count = re.subn(r'\.dev(0|[1-9][0-9]*)$', '.dev0', version, 1)
+ dep_op = '~=' if match_count else '=='
+ for dep_pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ()):
+ yield f'{dep_pkg}{dep_op}{dep_ver}'
- return read_version(setup_dir, module)
+# Called from calculate_python_sdk_cwl_package_versions() in run-library.sh
+if __name__ == '__main__':
+ print(get_version())
from setuptools import setup, find_packages
-SETUP_DIR = os.path.dirname(__file__) or '.'
-README = os.path.join(SETUP_DIR, 'README.rst')
-
import arvados_version
-version = arvados_version.get_version(SETUP_DIR, "arvados_fuse")
-if os.environ.get('ARVADOS_BUILDING_VERSION', False):
- pysdk_dep = "=={}".format(version)
-else:
- # On dev releases, arvados-python-client may have a different timestamp
- pysdk_dep = "<={}".format(version)
-
-short_tests_only = False
-if '--short-tests-only' in sys.argv:
- short_tests_only = True
- sys.argv.remove('--short-tests-only')
+version = arvados_version.get_version()
+short_tests_only = arvados_version.short_tests_only()
+README = os.path.join(arvados_version.SETUP_DIR, 'README.rst')
setup(name='arvados_fuse',
version=version,
('share/doc/arvados_fuse', ['agpl-3.0.txt', 'README.rst']),
],
install_requires=[
- 'arvados-python-client{}'.format(pysdk_dep),
- 'llfuse >= 1.3.6',
+ *arvados_version.iter_dependencies(version),
+ 'arvados-llfuse >= 1.5.1',
'future',
'python-daemon',
'ciso8601 >= 2.0.0',
with arvados_fuse.command.Mount(
arvados_fuse.command.ArgumentParser().parse_args(
argv + ['--foreground',
- '--unmount-timeout=2',
+ '--unmount-timeout=60',
self.mnt])) as self.mount:
return func(self, *args, **kwargs)
finally:
self.operations.events.close(timeout=10)
subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
t0 = time.time()
- self.llfuse_thread.join(timeout=10)
+ self.llfuse_thread.join(timeout=60)
if self.llfuse_thread.is_alive():
logger.warning("MountTestBase.tearDown():"
- " llfuse thread still alive 10s after umount"
+ " llfuse thread still alive 60s after umount"
" -- exiting with SIGKILL")
os.kill(os.getpid(), signal.SIGKILL)
waited = time.time() - t0
import logging
class InodeTests(unittest.TestCase):
+
+ # The following tests call next(inodes._counter) because inode 1
+ # (the root directory) gets special treatment.
+
def test_inodes_basic(self):
cache = arvados_fuse.InodeCache(1000, 4)
inodes = arvados_fuse.Inodes(cache)
+ next(inodes._counter)
# Check that ent1 gets added to inodes
ent1 = mock.MagicMock()
def test_inodes_not_persisted(self):
cache = arvados_fuse.InodeCache(1000, 4)
inodes = arvados_fuse.Inodes(cache)
+ next(inodes._counter)
ent1 = mock.MagicMock()
ent1.in_use.return_value = False
def test_inode_cleared(self):
cache = arvados_fuse.InodeCache(1000, 4)
inodes = arvados_fuse.Inodes(cache)
+ next(inodes._counter)
# Check that ent1 gets added to inodes
ent1 = mock.MagicMock()
inodes.add_entry(ent3)
# Won't clear anything because min_entries = 4
- self.assertEqual(2, len(cache._entries))
+ self.assertEqual(2, len(cache._cache_entries))
self.assertFalse(ent1.clear.called)
self.assertEqual(1100, cache.total())
# Change min_entries
cache.min_entries = 1
- cache.cap_cache()
+ ent1.parent_inode = None
+ inodes.cap_cache()
+ inodes.wait_remove_queue_empty()
self.assertEqual(600, cache.total())
self.assertTrue(ent1.clear.called)
# Touching ent1 should cause ent3 to get cleared
+ ent3.parent_inode = None
self.assertFalse(ent3.clear.called)
- cache.touch(ent1)
+ inodes.inode_cache.update_cache_size(ent1)
+ inodes.touch(ent1)
+ inodes.wait_remove_queue_empty()
self.assertTrue(ent3.clear.called)
self.assertEqual(500, cache.total())
def test_clear_in_use(self):
cache = arvados_fuse.InodeCache(1000, 4)
inodes = arvados_fuse.Inodes(cache)
+ next(inodes._counter)
ent1 = mock.MagicMock()
ent1.in_use.return_value = True
ent3.clear.called = False
self.assertFalse(ent1.clear.called)
self.assertFalse(ent3.clear.called)
- cache.touch(ent3)
+ inodes.touch(ent3)
+ inodes.wait_remove_queue_empty()
self.assertFalse(ent1.clear.called)
self.assertFalse(ent3.clear.called)
- self.assertFalse(ent3.kernel_invalidate.called)
+ # kernel invalidate gets called anyway
+ self.assertTrue(ent3.kernel_invalidate.called)
self.assertEqual(1100, cache.total())
# ent1 still in use, ent3 doesn't have ref,
ent3.has_ref.return_value = False
ent1.clear.called = False
ent3.clear.called = False
- cache.touch(ent3)
+ ent3.parent_inode = None
+ inodes.touch(ent3)
+ inodes.wait_remove_queue_empty()
self.assertFalse(ent1.clear.called)
self.assertTrue(ent3.clear.called)
self.assertEqual(500, cache.total())
def test_delete(self):
- cache = arvados_fuse.InodeCache(1000, 4)
+ cache = arvados_fuse.InodeCache(1000, 0)
inodes = arvados_fuse.Inodes(cache)
+ next(inodes._counter)
ent1 = mock.MagicMock()
ent1.in_use.return_value = False
ent1.ref_count = 0
with llfuse.lock:
inodes.del_entry(ent1)
+ inodes.wait_remove_queue_empty()
self.assertEqual(0, cache.total())
- cache.touch(ent3)
+
+ inodes.add_entry(ent3)
+ inodes.wait_remove_queue_empty()
self.assertEqual(600, cache.total())
class SanitizeFilenameTest(MountTestBase):
def test_sanitize_filename(self):
pdir = fuse.ProjectDirectory(
- 1, {}, self.api, 0, False, None,
+ 1, fuse.Inodes(None), self.api, 0, False, None,
project_object=self.api.users().current().execute(),
)
acceptable = [
mnt_args = [
'--read-write',
'--mount-home', 'zzz',
+ '--fsns', '[SLASH]'
]
def setUp(self):
super(SlashSubstitutionTest, self).setUp()
+
self.api = arvados.safeapi.ThreadSafeApiCache(
arvados.config.settings(),
- version='v1',
+ version='v1'
)
- self.api.config = lambda: {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
self.testcoll = self.api.collections().create(body={"name": "foo/bar/baz"}).execute()
self.testcolleasy = self.api.collections().create(body={"name": "foo-bar-baz"}).execute()
self.fusename = 'foo[SLASH]bar[SLASH]baz'
@IntegrationTest.mount(argv=mnt_args)
- @mock.patch('arvados.util.get_config_once')
- def test_slash_substitution_before_listing(self, get_config_once):
- get_config_once.return_value = {"Collections": {"ForwardSlashNameSubstitution": "[SLASH]"}}
+ def test_slash_substitution_before_listing(self):
self.pool_test(os.path.join(self.mnt, 'zzz'), self.fusename)
self.checkContents()
@staticmethod
self.mnt])
subprocess.check_call(
['./bin/arv-mount', '--subtype', 'test', '--replace',
- '--unmount-timeout', '10',
+ '--unmount-timeout', '60',
self.mnt])
subprocess.check_call(
['./bin/arv-mount', '--subtype', 'test', '--replace',
- '--unmount-timeout', '10',
+ '--unmount-timeout', '60',
self.mnt,
'--exec', 'true'])
for m in subprocess.check_output(['mount']).splitlines():
c.Check(resp.Code, Equals, http.StatusBadGateway)
c.Check(resp.Body.String(), Equals, "test error\n")
+ router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
+ return errors.New("no http status provided")
+ }
+ resp = call(router, "GET", "http://example/"+locSigned, arvadostest.ActiveTokenV2, nil, nil)
+ c.Check(resp.Code, Equals, http.StatusInternalServerError)
+ c.Check(resp.Body.String(), Equals, "no http status provided\n")
+
c.Assert(router.keepstore.mountsW[1].volume.BlockWrite(context.Background(), barHash, []byte("bar")), IsNil)
// If the requested block is available on the second volume,
creds := aws.NewChainProvider(
[]aws.CredentialsProvider{
aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
- ec2rolecreds.New(ec2metadata.New(cfg)),
+ ec2rolecreds.New(ec2metadata.New(cfg), func(opts *ec2rolecreds.ProviderOptions) {
+ // (from aws-sdk-go-v2 comments)
+ // "allow the credentials to trigger
+ // refreshing prior to the credentials
+ // actually expiring. This is
+ // beneficial so race conditions with
+ // expiring credentials do not cause
+ // request to fail unexpectedly due to
+ // ExpiredTokenException exceptions."
+ //
+ // (from
+ // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
+ // "We make new credentials available
+ // at least five minutes before the
+ // expiration of the old credentials."
+ opts.ExpiryWindow = 5 * time.Minute
+ }),
})
cfg.Credentials = creds
.contains(name)
.parents("tr")
.within($mainRow => {
+ cy.get($mainRow).scrollIntoView();
label && cy.contains(label);
if (multipleRows) {
cy.get("[data-cy=process-io-card] h6")
.contains("Input Parameters")
.parents("[data-cy=process-io-card]")
- .within(() => {
+ .within((ctx) => {
+ cy.get(ctx).scrollIntoView();
verifyIOParameter("input_file", null, "Label Description", "input1.tar", "00000000000000000000000000000000+01");
verifyIOParameter("input_file", null, "Label Description", "input1-2.txt", undefined, true);
verifyIOParameter("input_file", null, "Label Description", "input1-3.txt", undefined, true);
.parents("[data-cy=process-io-card]")
.within(ctx => {
cy.get(ctx).scrollIntoView();
- cy.get('[data-cy="io-preview-image-toggle"]').click({ waitForAnimations: false });
const outPdh = testOutputCollection.portable_data_hash;
verifyIOParameter("output_file", null, "Label Description", "cat.png", `${outPdh}`);
- verifyIOParameterImage("output_file", `/c=${outPdh}/cat.png`);
+ // Disabled until image preview returns
+ // verifyIOParameterImage("output_file", `/c=${outPdh}/cat.png`);
verifyIOParameter("output_file_with_secondary", null, "Doc Description", "main.dat", `${outPdh}`);
verifyIOParameter("output_file_with_secondary", null, "Doc Description", "secondary.dat", undefined, true);
verifyIOParameter("output_file_with_secondary", null, "Doc Description", "secondary2.dat", undefined, true);
cy.get("[data-cy=process-io-card] h6")
.contains("Input Parameters")
.parents("[data-cy=process-io-card]")
- .within(() => {
+ .within((ctx) => {
+ cy.get(ctx).scrollIntoView();
cy.wait(2000);
cy.waitForDom();
- cy.get("tbody tr").each(item => {
- cy.wrap(item).contains("No value");
+
+ testInputs.map((input) => {
+ verifyIOParameter(input.definition.id.split('/').slice(-1)[0], null, null, "No value");
});
});
cy.get("[data-cy=process-io-card] h6")
.contains("Output Parameters")
.parents("[data-cy=process-io-card]")
- .within(() => {
- cy.get("tbody tr").each(item => {
- cy.wrap(item).contains("No value");
+ .within((ctx) => {
+ cy.get(ctx).scrollIntoView();
+
+ testOutputs.map((output) => {
+ verifyIOParameter(output.definition.id.split('/').slice(-1)[0], null, null, "No value");
});
});
});
]);
export const formatInputData = (inputs: CommandInputParameter[], auth: AuthState): ProcessIOParameter[] => {
- return inputs.map(input => {
- return {
- id: getIOParamId(input),
- label: input.label || "",
- value: getIOParamDisplayValue(auth, input),
- };
+ return inputs.flatMap((input): ProcessIOParameter[] => {
+ const processValues = getIOParamDisplayValue(auth, input);
+ return processValues.map((thisValue, i) => ({
+ id: i === 0 ? getIOParamId(input) : "",
+ label: i === 0 ? input.label || "" : "",
+ value: thisValue,
+ }));
});
};
pdh: string | undefined,
auth: AuthState
): ProcessIOParameter[] => {
- return definitions.map(output => {
- return {
- id: getIOParamId(output),
- label: output.label || "",
- value: getIOParamDisplayValue(auth, Object.assign(output, { value: values[getIOParamId(output)] || [] }), pdh),
- };
+ return definitions.flatMap((output): ProcessIOParameter[] => {
+ const processValues = getIOParamDisplayValue(auth, Object.assign(output, { value: values[getIOParamId(output)] || [] }), pdh);
+ return processValues.map((thisValue, i) => ({
+ id: i === 0 ? getIOParamId(output) : "",
+ label: i === 0 ? output.label || "" : "",
+ value: thisValue,
+ }));
});
};
import { ProcessOutputCollectionFiles } from './process-output-collection-files';
import { MemoryRouter } from 'react-router-dom';
-
+// Mock collection files component since it just needs to exist
jest.mock('views/process-panel/process-output-collection-files');
+// Mock autosizer for the io panel virtual list
+jest.mock('react-virtualized-auto-sizer', () => ({ children }: any) => children({ height: 600, width: 600 }));
+
configure({ adapter: new Adapter() });
describe('renderers', () => {
it('shows main process with params', () => {
// when
- const parameters = [{id: 'someId', label: 'someLabel', value: [{display: 'someValue'}]}];
+ const parameters = [{id: 'someId', label: 'someLabel', value: {display: 'someValue'}}];
let panel = mount(
<Provider store={store}>
<MuiThemeProvider theme={CustomTheme}>
import classNames from "classnames";
import { DefaultCodeSnippet } from "components/default-code-snippet/default-code-snippet";
import { KEEP_URL_REGEX } from "models/resource";
+import { FixedSizeList } from 'react-window';
+import AutoSizer from "react-virtualized-auto-sizer";
+import { LinkProps } from "@material-ui/core/Link";
type CssRules =
| "card"
| "avatar"
| "iconHeader"
| "tableWrapper"
- | "tableRoot"
- | "paramValue"
+ | "paramTableRoot"
+ | "paramTableCellText"
+ | "mountsTableRoot"
| "keepLink"
| "collectionLink"
- | "imagePreview"
- | "valArray"
| "secondaryVal"
- | "secondaryRow"
| "emptyValue"
| "noBorderRow"
| "symmetricTabs"
- | "imagePlaceholder"
- | "rowWithPreview"
- | "labelColumn"
- | "primaryRow";
+ | "wrapTooltip";
const styles: StyleRulesCallback<CssRules> = (theme: ArvadosTheme) => ({
card: {
alignSelf: "flex-start",
paddingTop: theme.spacing.unit * 0.5,
},
+ // Card content
content: {
- height: `calc(100% - ${theme.spacing.unit * 7}px - ${theme.spacing.unit * 1.5}px)`,
+ height: `calc(100% - ${theme.spacing.unit * 6}px)`,
padding: theme.spacing.unit * 1.0,
paddingTop: 0,
"&:last-child": {
paddingBottom: theme.spacing.unit * 1,
},
},
+ // Card title
title: {
overflow: "hidden",
paddingTop: theme.spacing.unit * 0.5,
color: theme.customs.colors.greyD,
fontSize: "1.875rem",
},
+ // Applies to each tab's content
tableWrapper: {
height: "auto",
- maxHeight: `calc(100% - ${theme.spacing.unit * 3}px)`,
+ maxHeight: `calc(100% - ${theme.spacing.unit * 6}px)`,
overflow: "auto",
+ // Use flexbox to keep scrolling at the virtual list level
+ display: "flex",
+ flexDirection: "column",
+ alignItems: "start", // Prevents scroll bars at different levels in json tab
},
- tableRoot: {
+
+ // Param table virtual list styles
+ paramTableRoot: {
+ display: "flex",
+ flexDirection: "column",
+ overflow: "hidden",
+ // Flex header
+ "& thead tr": {
+ alignItems: "end",
+ "& th": {
+ padding: "4px 25px 10px",
+ },
+ },
+ "& tbody": {
+ height: "100vh", // Must be constrained by panel maxHeight
+ },
+ // Flex header/body rows
+ "& thead tr, & > tbody tr": {
+ display: "flex",
+ // Flex header/body cells
+ "& th, & td": {
+ flexGrow: 1,
+ flexShrink: 1,
+ flexBasis: 0,
+ overflow: "hidden",
+ },
+ // Column width overrides
+ "& th:nth-of-type(1), & td:nth-of-type(1)": {
+ flexGrow: 0.7,
+ },
+ "& th:nth-last-of-type(1), & td:nth-last-of-type(1)": {
+ flexGrow: 2,
+ },
+ },
+ // Flex body rows
+ "& tbody tr": {
+ height: "40px",
+ // Flex body cells
+ "& td": {
+ padding: "2px 25px 2px",
+ overflow: "hidden",
+ display: "flex",
+ flexDirection: "row",
+ alignItems: "center",
+ whiteSpace: "nowrap",
+ },
+ },
+ },
+ // Param value cell typography styles
+ paramTableCellText: {
+ overflow: "hidden",
+ display: "flex",
+ // Every cell contents requires a wrapper for the ellipsis
+ // since adding ellipses to an anchor element parent results in misaligned tooltip
+ "& a, & span": {
+ overflow: "hidden",
+ textOverflow: "ellipsis",
+ },
+ '& pre': {
+ margin: 0,
+ overflow: "hidden",
+ textOverflow: "ellipsis",
+ },
+ },
+ mountsTableRoot: {
width: "100%",
"& thead th": {
verticalAlign: "bottom",
paddingRight: "25px",
},
},
- paramValue: {
- display: "flex",
- alignItems: "flex-start",
- flexDirection: "column",
- },
keepLink: {
color: theme.palette.primary.main,
textDecoration: "none",
+ // Overflow wrap for mounts table
overflowWrap: "break-word",
cursor: "pointer",
},
+ // Output collection tab link
collectionLink: {
margin: "10px",
"& a": {
cursor: "pointer",
},
},
- imagePreview: {
- maxHeight: "15em",
- maxWidth: "15em",
- marginBottom: theme.spacing.unit,
- },
- valArray: {
- display: "flex",
- gap: "10px",
- flexWrap: "wrap",
- "& span": {
- display: "inline",
- },
- },
secondaryVal: {
paddingLeft: "20px",
},
- secondaryRow: {
- height: "24px",
- verticalAlign: "top",
- position: "relative",
- top: "-4px",
- },
emptyValue: {
color: theme.customs.colors.grey700,
},
flexBasis: "0",
},
},
- imagePlaceholder: {
- width: "60px",
- height: "60px",
- display: "flex",
- alignItems: "center",
- justifyContent: "center",
- backgroundColor: "#cecece",
- borderRadius: "10px",
- },
- rowWithPreview: {
- verticalAlign: "bottom",
- },
- labelColumn: {
- minWidth: "120px",
- },
- primaryRow: {
- height: "24px",
- "& td": {
- paddingTop: "2px",
- paddingBottom: "2px",
- },
+ wrapTooltip: {
+ maxWidth: "600px",
+ wordWrap: "break-word",
},
});
setSubProcTabState(value);
};
- const [showImagePreview, setShowImagePreview] = useState(false);
-
const PanelIcon = label === ProcessIOCardType.INPUT ? InputIcon : OutputIcon;
const mainProcess = !(process && process!.containerRequest.requestingContainerUuid);
const showParamTable = mainProcess || forceShowParams;
}
action={
<div>
- {mainProcess && (
- <Tooltip
- title={"Toggle Image Preview"}
- disableFocusListener
- >
- <IconButton
- data-cy="io-preview-image-toggle"
- onClick={() => {
- setShowImagePreview(!showImagePreview);
- }}
- >
- {showImagePreview ? <ImageIcon /> : <ImageOffIcon />}
- </IconButton>
- </Tooltip>
- )}
{doUnMaximizePanel && panelMaximized && (
<Tooltip
title={`Unmaximize ${panelName || "panel"}`}
</Grid>
)}
{/* Once loaded, either raw or params may still be empty
- * Raw when all params are empty
- * Params when raw is provided by containerRequest properties but workflow mount is absent for preview
- */}
+ * Raw when all params are empty
+ * Params when raw is provided by containerRequest properties but workflow mount is absent for preview
+ */}
{!loading && (hasRaw || hasParams) && (
<>
<Tabs
<div className={classes.tableWrapper}>
<ProcessIOPreview
data={params}
- showImagePreview={showImagePreview}
valueLabel={forceShowParams ? "Default value" : "Value"}
/>
</div>
export type ProcessIOParameter = {
id: string;
label: string;
- value: ProcessIOValue[];
+ value: ProcessIOValue;
};
interface ProcessIOPreviewDataProps {
data: ProcessIOParameter[];
- showImagePreview: boolean;
valueLabel: string;
}
type ProcessIOPreviewProps = ProcessIOPreviewDataProps & WithStyles<CssRules>;
const ProcessIOPreview = memo(
- withStyles(styles)(({ classes, data, showImagePreview, valueLabel }: ProcessIOPreviewProps) => {
+ withStyles(styles)(({ classes, data, valueLabel }: ProcessIOPreviewProps) => {
const showLabel = data.some((param: ProcessIOParameter) => param.label);
+
+ const hasMoreValues = (index: number) => (
+ data[index+1] && !isMainRow(data[index+1])
+ );
+
+ const isMainRow = (param: ProcessIOParameter) => (param && (param.id || param.label && !param.value.secondary));
+
+ const RenderRow = ({index, style}) => {
+ const param = data[index];
+
+ const rowClasses = {
+ [classes.noBorderRow]: hasMoreValues(index),
+ };
+
+ return <TableRow
+ style={style}
+ className={classNames(rowClasses)}
+ data-cy={isMainRow(param) ? "process-io-param" : ""}>
+ <TableCell>
+ <Tooltip title={param.id}>
+ <Typography className={classes.paramTableCellText}>
+ <span>
+ {param.id}
+ </span>
+ </Typography>
+ </Tooltip>
+ </TableCell>
+ {showLabel && <TableCell>
+ <Tooltip title={param.label}>
+ <Typography className={classes.paramTableCellText}>
+ <span>
+ {param.label}
+ </span>
+ </Typography>
+ </Tooltip>
+ </TableCell>}
+ <TableCell>
+ <ProcessValuePreview
+ value={param.value}
+ />
+ </TableCell>
+ <TableCell>
+ <Typography className={classes.paramTableCellText}>
+ {/** Collection is an anchor so doesn't require wrapper element */}
+ {param.value.collection}
+ </Typography>
+ </TableCell>
+ </TableRow>;
+ };
+
return (
<Table
- className={classes.tableRoot}
+ className={classes.paramTableRoot}
aria-label="Process IO Preview"
>
<TableHead>
<TableRow>
<TableCell>Name</TableCell>
- {showLabel && <TableCell className={classes.labelColumn}>Label</TableCell>}
+ {showLabel && <TableCell>Label</TableCell>}
<TableCell>{valueLabel}</TableCell>
<TableCell>Collection</TableCell>
</TableRow>
</TableHead>
<TableBody>
- {data.map((param: ProcessIOParameter) => {
- const firstVal = param.value.length > 0 ? param.value[0] : undefined;
- const rest = param.value.slice(1);
- const mainRowClasses = {
- [classes.noBorderRow]: rest.length > 0,
- [classes.primaryRow]: true
- };
-
- return (
- <React.Fragment key={param.id}>
- <TableRow
- className={classNames(mainRowClasses)}
- data-cy="process-io-param"
- >
- <TableCell>{param.id}</TableCell>
- {showLabel && <TableCell>{param.label}</TableCell>}
- <TableCell>
- {firstVal && (
- <ProcessValuePreview
- value={firstVal}
- showImagePreview={showImagePreview}
- />
- )}
- </TableCell>
- <TableCell className={firstVal?.imageUrl ? classes.rowWithPreview : undefined}>
- <Typography className={classes.paramValue}>{firstVal?.collection}</Typography>
- </TableCell>
- </TableRow>
- {rest.map((val, i) => {
- const rowClasses = {
- [classes.noBorderRow]: i < rest.length - 1,
- [classes.secondaryRow]: val.secondary,
- [classes.primaryRow]: !val.secondary,
- };
- return (
- <TableRow
- className={classNames(rowClasses)}
- key={i}
- >
- <TableCell />
- {showLabel && <TableCell />}
- <TableCell>
- <ProcessValuePreview
- value={val}
- showImagePreview={showImagePreview}
- />
- </TableCell>
- <TableCell className={firstVal?.imageUrl ? classes.rowWithPreview : undefined}>
- <Typography className={classes.paramValue}>{val.collection}</Typography>
- </TableCell>
- </TableRow>
- );
- })}
- </React.Fragment>
- );
- })}
+ <AutoSizer>
+ {({ height, width }) =>
+ <FixedSizeList
+ height={height}
+ itemCount={data.length}
+ itemSize={40}
+ width={width}
+ >
+ {RenderRow}
+ </FixedSizeList>
+ }
+ </AutoSizer>
</TableBody>
</Table>
);
interface ProcessValuePreviewProps {
value: ProcessIOValue;
- showImagePreview: boolean;
}
-const ProcessValuePreview = withStyles(styles)(({ value, showImagePreview, classes }: ProcessValuePreviewProps & WithStyles<CssRules>) => (
- <Typography className={classes.paramValue}>
- {value.imageUrl && showImagePreview ? (
- <img
- className={classes.imagePreview}
- src={value.imageUrl}
- alt="Inline Preview"
- />
- ) : (
- ""
- )}
- {value.imageUrl && !showImagePreview ? <ImagePlaceholder /> : ""}
- <span className={classNames(classes.valArray, value.secondary && classes.secondaryVal)}>{value.display}</span>
+const ProcessValuePreview = withStyles(styles)(({ value, classes }: ProcessValuePreviewProps & WithStyles<CssRules>) => (
+ <Typography className={classNames(classes.paramTableCellText, value.secondary && classes.secondaryVal)}>
+ {value.display}
</Typography>
));
}
const ProcessIORaw = withStyles(styles)(({ data }: ProcessIORawDataProps) => (
- <Paper elevation={0}>
+ <Paper elevation={0} style={{minWidth: "100%"}}>
<DefaultCodeSnippet
lines={[JSON.stringify(data, null, 2)]}
linked
auth: state.auth,
}))(({ mounts, classes, auth }: ProcessInputMountsProps & { auth: AuthState }) => (
<Table
- className={classes.tableRoot}
+ className={classes.mountsTableRoot}
aria-label="Process Input Mounts"
>
<TableHead>
case isPrimitiveOfType(input, CWLType.BOOLEAN):
const boolValue = (input as BooleanCommandInputParameter).value;
return boolValue !== undefined && !(Array.isArray(boolValue) && boolValue.length === 0)
- ? [{ display: renderPrimitiveValue(boolValue, false) }]
+ ? [{ display: <PrimitiveTooltip data={boolValue}>{renderPrimitiveValue(boolValue, false)}</PrimitiveTooltip> }]
: [{ display: <EmptyValue /> }];
case isPrimitiveOfType(input, CWLType.INT):
return intValue !== undefined &&
// Missing values are empty array
!(Array.isArray(intValue) && intValue.length === 0)
- ? [{ display: renderPrimitiveValue(intValue, false) }]
+ ? [{ display: <PrimitiveTooltip data={intValue}>{renderPrimitiveValue(intValue, false)}</PrimitiveTooltip> }]
: [{ display: <EmptyValue /> }];
case isPrimitiveOfType(input, CWLType.FLOAT):
case isPrimitiveOfType(input, CWLType.DOUBLE):
const floatValue = (input as FloatCommandInputParameter).value;
return floatValue !== undefined && !(Array.isArray(floatValue) && floatValue.length === 0)
- ? [{ display: renderPrimitiveValue(floatValue, false) }]
+ ? [{ display: <PrimitiveTooltip data={floatValue}>{renderPrimitiveValue(floatValue, false)}</PrimitiveTooltip> }]
: [{ display: <EmptyValue /> }];
case isPrimitiveOfType(input, CWLType.STRING):
const stringValue = (input as StringCommandInputParameter).value || undefined;
return stringValue !== undefined && !(Array.isArray(stringValue) && stringValue.length === 0)
- ? [{ display: renderPrimitiveValue(stringValue, false) }]
+ ? [{ display: <PrimitiveTooltip data={stringValue}>{renderPrimitiveValue(stringValue, false)}</PrimitiveTooltip> }]
: [{ display: <EmptyValue /> }];
case isPrimitiveOfType(input, CWLType.FILE):
case getEnumType(input) !== null:
const enumValue = (input as EnumCommandInputParameter).value;
- return enumValue !== undefined && enumValue ? [{ display: <pre>{enumValue}</pre> }] : [{ display: <EmptyValue /> }];
+ return enumValue !== undefined && enumValue ? [{ display: <PrimitiveTooltip data={enumValue}>{enumValue}</PrimitiveTooltip> }] : [{ display: <EmptyValue /> }];
case isArrayOfType(input, CWLType.STRING):
const strArray = (input as StringArrayCommandInputParameter).value || [];
- return strArray.length ? [{ display: <>{strArray.map(val => renderPrimitiveValue(val, true))}</> }] : [{ display: <EmptyValue /> }];
+ return strArray.length ? [{ display: <PrimitiveArrayTooltip data={strArray}>{strArray.map(val => renderPrimitiveValue(val, true))}</PrimitiveArrayTooltip> }] : [{ display: <EmptyValue /> }];
case isArrayOfType(input, CWLType.INT):
case isArrayOfType(input, CWLType.LONG):
const intArray = (input as IntArrayCommandInputParameter).value || [];
- return intArray.length ? [{ display: <>{intArray.map(val => renderPrimitiveValue(val, true))}</> }] : [{ display: <EmptyValue /> }];
+ return intArray.length ? [{ display: <PrimitiveArrayTooltip data={intArray}>{intArray.map(val => renderPrimitiveValue(val, true))}</PrimitiveArrayTooltip> }] : [{ display: <EmptyValue /> }];
case isArrayOfType(input, CWLType.FLOAT):
case isArrayOfType(input, CWLType.DOUBLE):
const floatArray = (input as FloatArrayCommandInputParameter).value || [];
- return floatArray.length ? [{ display: <>{floatArray.map(val => renderPrimitiveValue(val, true))}</> }] : [{ display: <EmptyValue /> }];
+ return floatArray.length ? [{ display: <PrimitiveArrayTooltip data={floatArray}>{floatArray.map(val => renderPrimitiveValue(val, true))}</PrimitiveArrayTooltip> }] : [{ display: <EmptyValue /> }];
case isArrayOfType(input, CWLType.FILE):
const fileArrayMainFiles = (input as FileArrayCommandInputParameter).value || [];
}
};
+interface PrimitiveTooltipProps {
+ data: boolean | number | string;
+}
+
+const PrimitiveTooltip = (props: React.PropsWithChildren<PrimitiveTooltipProps>) => (
+ <Tooltip title={typeof props.data !== 'object' ? String(props.data) : ""}>
+ <pre>{props.children}</pre>
+ </Tooltip>
+);
+
+interface PrimitiveArrayTooltipProps {
+ data: string[];
+}
+
+const PrimitiveArrayTooltip = (props: React.PropsWithChildren<PrimitiveArrayTooltipProps>) => (
+ <Tooltip title={props.data.join(', ')}>
+ <span>{props.children}</span>
+ </Tooltip>
+);
+
+
const renderPrimitiveValue = (value: any, asChip: boolean) => {
const isObject = typeof value === "object";
if (!isObject) {
<Chip
key={value}
label={String(value)}
+ style={{marginRight: "10px"}}
/>
) : (
- <pre key={value}>{String(value)}</pre>
+ <>{String(value)}</>
);
} else {
return asChip ? <UnsupportedValueChip /> : <UnsupportedValue />;
// Passing a pdh always returns a relative wb2 collection url
const pdhWbPath = getNavUrl(pdhUrl, auth);
return pdhUrl && pdhWbPath ? (
- <Tooltip title={"View collection in Workbench"}>
+ <Tooltip title={<>View collection in Workbench<br />{pdhUrl}</>}>
<RouterLink
to={pdhWbPath}
className={classes.keepLink}
const keepUrlPathNav = getKeepNavUrl(auth, res, pdh);
return keepUrlPathNav ? (
- <Tooltip title={"View in keep-web"}>
+ <Tooltip classes={{tooltip: classes.wrapTooltip}} title={<>View in keep-web<br />{keepUrlPath || "/"}</>}>
<a
className={classes.keepLink}
href={keepUrlPathNav}
};
};
+type MuiLinkWithTooltipProps = WithStyles<CssRules> & React.PropsWithChildren<LinkProps>;
+
+const MuiLinkWithTooltip = withStyles(styles)((props: MuiLinkWithTooltipProps) => (
+ <Tooltip title={props.title} classes={{tooltip: props.classes.wrapTooltip}}>
+ <MuiLink {...props}>
+ {props.children}
+ </MuiLink>
+ </Tooltip>
+));
+
const fileToProcessIOValue = (file: File, secondary: boolean, auth: AuthState, pdh: string | undefined, mainFilePdh: string): ProcessIOValue => {
if (isExternalValue(file)) {
return { display: <UnsupportedValue /> };
if (isFileUrl(file.location)) {
return {
display: (
- <MuiLink
+ <MuiLinkWithTooltip
href={file.location}
target="_blank"
rel="noopener"
+ title={file.location}
>
{file.location}
- </MuiLink>
+ </MuiLinkWithTooltip>
),
secondary,
};
label={"Cannot display value"}
/>
));
-
-const ImagePlaceholder = withStyles(styles)(({ classes }: WithStyles<CssRules>) => (
- <span className={classes.imagePlaceholder}>
- <ImageIcon />
- </span>
-));
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
+#
+# This file runs in one of three modes:
+#
+# 1. If the ARVADOS_BUILDING_VERSION environment variable is set, it writes
+# _version.py and generates dependencies based on that value.
+# 2. If running from an arvados Git checkout, it writes _version.py
+# and generates dependencies from Git.
+# 3. Otherwise, we expect this is source previously generated from Git, and
+# it reads _version.py and generates dependencies from it.
-import subprocess
-import time
import os
import re
+import runpy
+import subprocess
import sys
-SETUP_DIR = os.path.dirname(os.path.abspath(__file__))
-VERSION_PATHS = {
- SETUP_DIR,
- os.path.abspath(os.path.join(SETUP_DIR, "../../sdk/python")),
- os.path.abspath(os.path.join(SETUP_DIR, "../../build/version-at-commit.sh"))
- }
+from pathlib import Path
+
+# These maps explain the relationships between different Python modules in
+# the arvados repository. We use these to help generate setup.py.
+PACKAGE_DEPENDENCY_MAP = {
+ 'arvados-cwl-runner': ['arvados-python-client', 'crunchstat_summary'],
+ 'arvados-user-activity': ['arvados-python-client'],
+ 'arvados_fuse': ['arvados-python-client'],
+ 'crunchstat_summary': ['arvados-python-client'],
+}
+PACKAGE_MODULE_MAP = {
+ 'arvados-cwl-runner': 'arvados_cwl',
+ 'arvados-docker-cleaner': 'arvados_docker',
+ 'arvados-python-client': 'arvados',
+ 'arvados-user-activity': 'arvados_user_activity',
+ 'arvados_fuse': 'arvados_fuse',
+ 'crunchstat_summary': 'crunchstat_summary',
+}
+PACKAGE_SRCPATH_MAP = {
+ 'arvados-cwl-runner': Path('sdk', 'cwl'),
+ 'arvados-docker-cleaner': Path('services', 'dockercleaner'),
+ 'arvados-python-client': Path('sdk', 'python'),
+ 'arvados-user-activity': Path('tools', 'user-activity'),
+ 'arvados_fuse': Path('services', 'fuse'),
+ 'crunchstat_summary': Path('tools', 'crunchstat-summary'),
+}
+
+ENV_VERSION = os.environ.get("ARVADOS_BUILDING_VERSION")
+SETUP_DIR = Path(__file__).absolute().parent
+try:
+ REPO_PATH = Path(subprocess.check_output(
+ ['git', '-C', str(SETUP_DIR), 'rev-parse', '--show-toplevel'],
+ stderr=subprocess.DEVNULL,
+ text=True,
+ ).rstrip('\n'))
+except (subprocess.CalledProcessError, OSError):
+ REPO_PATH = None
+else:
+ # Verify this is the arvados monorepo
+ if all((REPO_PATH / path).exists() for path in PACKAGE_SRCPATH_MAP.values()):
+ PACKAGE_NAME, = (
+ pkg_name for pkg_name, path in PACKAGE_SRCPATH_MAP.items()
+ if (REPO_PATH / path) == SETUP_DIR
+ )
+ MODULE_NAME = PACKAGE_MODULE_MAP[PACKAGE_NAME]
+ VERSION_SCRIPT_PATH = Path(REPO_PATH, 'build', 'version-at-commit.sh')
+ else:
+ REPO_PATH = None
+if REPO_PATH is None:
+ (PACKAGE_NAME, MODULE_NAME), = (
+ (pkg_name, mod_name)
+ for pkg_name, mod_name in PACKAGE_MODULE_MAP.items()
+ if (SETUP_DIR / mod_name).is_dir()
+ )
+
+def short_tests_only(arglist=sys.argv):
+ try:
+ arglist.remove('--short-tests-only')
+ except ValueError:
+ return False
+ else:
+ return True
+
+def git_log_output(path, *args):
+ return subprocess.check_output(
+ ['git', '-C', str(REPO_PATH),
+ 'log', '--first-parent', '--max-count=1',
+ *args, str(path)],
+ text=True,
+ ).rstrip('\n')
def choose_version_from():
- ts = {}
- for path in VERSION_PATHS:
- ts[subprocess.check_output(
- ['git', 'log', '--first-parent', '--max-count=1',
- '--format=format:%ct', path]).strip()] = path
-
- sorted_ts = sorted(ts.items())
- getver = sorted_ts[-1][1]
- print("Using "+getver+" for version number calculation of "+SETUP_DIR, file=sys.stderr)
+ ver_paths = [SETUP_DIR, VERSION_SCRIPT_PATH, *(
+ PACKAGE_SRCPATH_MAP[pkg]
+ for pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ())
+ )]
+ getver = max(ver_paths, key=lambda path: git_log_output(path, '--format=format:%ct'))
+ print(f"Using {getver} for version number calculation of {SETUP_DIR}", file=sys.stderr)
return getver
def git_version_at_commit():
curdir = choose_version_from()
- myhash = subprocess.check_output(['git', 'log', '-n1', '--first-parent',
- '--format=%H', curdir]).strip()
- myversion = subprocess.check_output([SETUP_DIR+'/../../build/version-at-commit.sh', myhash]).strip().decode()
- return myversion
+ myhash = git_log_output(curdir, '--format=%H')
+ return subprocess.check_output(
+ [str(VERSION_SCRIPT_PATH), myhash],
+ text=True,
+ ).rstrip('\n')
def save_version(setup_dir, module, v):
- v = v.replace("~dev", ".dev").replace("~rc", "rc")
- with open(os.path.join(setup_dir, module, "_version.py"), 'wt') as fp:
- return fp.write("__version__ = '%s'\n" % v)
+ with Path(setup_dir, module, '_version.py').open('w') as fp:
+ print(f"__version__ = {v!r}", file=fp)
def read_version(setup_dir, module):
- with open(os.path.join(setup_dir, module, "_version.py"), 'rt') as fp:
- return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
-
-def get_version(setup_dir, module):
- env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+ file_vars = runpy.run_path(Path(setup_dir, module, '_version.py'))
+ return file_vars['__version__']
- if env_version:
- save_version(setup_dir, module, env_version)
+def get_version(setup_dir=SETUP_DIR, module=MODULE_NAME):
+ if ENV_VERSION:
+ version = ENV_VERSION
+ elif REPO_PATH is None:
+ return read_version(setup_dir, module)
else:
- try:
- save_version(setup_dir, module, git_version_at_commit())
- except (subprocess.CalledProcessError, OSError) as err:
- print("ERROR: {0}".format(err), file=sys.stderr)
- pass
+ version = git_version_at_commit()
+ version = version.replace("~dev", ".dev").replace("~rc", "rc")
+ save_version(setup_dir, module, version)
+ return version
+
+def iter_dependencies(version=None):
+ if version is None:
+ version = get_version()
+ # A packaged development release should be installed with other
+ # development packages built from the same source, but those
+ # dependencies may have earlier "dev" versions (read: less recent
+ # Git commit timestamps). This compatible version dependency
+ # expresses that as closely as possible. Allowing versions
+ # compatible with .dev0 allows any development release.
+ # Regular expression borrowed partially from
+ # <https://packaging.python.org/en/latest/specifications/version-specifiers/#version-specifiers-regex>
+ dep_ver, match_count = re.subn(r'\.dev(0|[1-9][0-9]*)$', '.dev0', version, 1)
+ dep_op = '~=' if match_count else '=='
+ for dep_pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ()):
+ yield f'{dep_pkg}{dep_op}{dep_ver}'
- return read_version(setup_dir, module)
+# Called from calculate_python_sdk_cwl_package_versions() in run-library.sh
+if __name__ == '__main__':
+ print(get_version())
from setuptools import setup, find_packages
-SETUP_DIR = os.path.dirname(__file__) or '.'
-README = os.path.join(SETUP_DIR, 'README.rst')
-
import arvados_version
-version = arvados_version.get_version(SETUP_DIR, "crunchstat_summary")
-if os.environ.get('ARVADOS_BUILDING_VERSION', False):
- pysdk_dep = "=={}".format(version)
-else:
- # On dev releases, arvados-python-client may have a different timestamp
- pysdk_dep = "<={}".format(version)
-
-short_tests_only = False
-if '--short-tests-only' in sys.argv:
- short_tests_only = True
- sys.argv.remove('--short-tests-only')
+version = arvados_version.get_version()
+short_tests_only = arvados_version.short_tests_only()
+README = os.path.join(arvados_version.SETUP_DIR, 'README.rst')
setup(name='crunchstat_summary',
version=version,
('share/doc/crunchstat_summary', ['agpl-3.0.txt']),
],
install_requires=[
- 'arvados-python-client{}'.format(pysdk_dep),
+ *arvados_version.iter_dependencies(version),
],
python_requires="~=3.8",
test_suite='tests',
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
+#
+# This file runs in one of three modes:
+#
+# 1. If the ARVADOS_BUILDING_VERSION environment variable is set, it writes
+# _version.py and generates dependencies based on that value.
+# 2. If running from an arvados Git checkout, it writes _version.py
+# and generates dependencies from Git.
+# 3. Otherwise, we expect this is source previously generated from Git, and
+# it reads _version.py and generates dependencies from it.
-import subprocess
-import time
import os
import re
+import runpy
+import subprocess
import sys
-SETUP_DIR = os.path.dirname(os.path.abspath(__file__))
-VERSION_PATHS = {
- SETUP_DIR,
- os.path.abspath(os.path.join(SETUP_DIR, "../../sdk/python")),
- os.path.abspath(os.path.join(SETUP_DIR, "../../build/version-at-commit.sh"))
- }
+from pathlib import Path
+
+# These maps explain the relationships between different Python modules in
+# the arvados repository. We use these to help generate setup.py.
+PACKAGE_DEPENDENCY_MAP = {
+ 'arvados-cwl-runner': ['arvados-python-client', 'crunchstat_summary'],
+ 'arvados-user-activity': ['arvados-python-client'],
+ 'arvados_fuse': ['arvados-python-client'],
+ 'crunchstat_summary': ['arvados-python-client'],
+}
+PACKAGE_MODULE_MAP = {
+ 'arvados-cwl-runner': 'arvados_cwl',
+ 'arvados-docker-cleaner': 'arvados_docker',
+ 'arvados-python-client': 'arvados',
+ 'arvados-user-activity': 'arvados_user_activity',
+ 'arvados_fuse': 'arvados_fuse',
+ 'crunchstat_summary': 'crunchstat_summary',
+}
+PACKAGE_SRCPATH_MAP = {
+ 'arvados-cwl-runner': Path('sdk', 'cwl'),
+ 'arvados-docker-cleaner': Path('services', 'dockercleaner'),
+ 'arvados-python-client': Path('sdk', 'python'),
+ 'arvados-user-activity': Path('tools', 'user-activity'),
+ 'arvados_fuse': Path('services', 'fuse'),
+ 'crunchstat_summary': Path('tools', 'crunchstat-summary'),
+}
+
+ENV_VERSION = os.environ.get("ARVADOS_BUILDING_VERSION")
+SETUP_DIR = Path(__file__).absolute().parent
+try:
+ REPO_PATH = Path(subprocess.check_output(
+ ['git', '-C', str(SETUP_DIR), 'rev-parse', '--show-toplevel'],
+ stderr=subprocess.DEVNULL,
+ text=True,
+ ).rstrip('\n'))
+except (subprocess.CalledProcessError, OSError):
+ REPO_PATH = None
+else:
+ # Verify this is the arvados monorepo
+ if all((REPO_PATH / path).exists() for path in PACKAGE_SRCPATH_MAP.values()):
+ PACKAGE_NAME, = (
+ pkg_name for pkg_name, path in PACKAGE_SRCPATH_MAP.items()
+ if (REPO_PATH / path) == SETUP_DIR
+ )
+ MODULE_NAME = PACKAGE_MODULE_MAP[PACKAGE_NAME]
+ VERSION_SCRIPT_PATH = Path(REPO_PATH, 'build', 'version-at-commit.sh')
+ else:
+ REPO_PATH = None
+if REPO_PATH is None:
+ (PACKAGE_NAME, MODULE_NAME), = (
+ (pkg_name, mod_name)
+ for pkg_name, mod_name in PACKAGE_MODULE_MAP.items()
+ if (SETUP_DIR / mod_name).is_dir()
+ )
+
+def short_tests_only(arglist=sys.argv):
+ try:
+ arglist.remove('--short-tests-only')
+ except ValueError:
+ return False
+ else:
+ return True
+
+def git_log_output(path, *args):
+ return subprocess.check_output(
+ ['git', '-C', str(REPO_PATH),
+ 'log', '--first-parent', '--max-count=1',
+ *args, str(path)],
+ text=True,
+ ).rstrip('\n')
def choose_version_from():
- ts = {}
- for path in VERSION_PATHS:
- ts[subprocess.check_output(
- ['git', 'log', '--first-parent', '--max-count=1',
- '--format=format:%ct', path]).strip()] = path
-
- sorted_ts = sorted(ts.items())
- getver = sorted_ts[-1][1]
- print("Using "+getver+" for version number calculation of "+SETUP_DIR, file=sys.stderr)
+ ver_paths = [SETUP_DIR, VERSION_SCRIPT_PATH, *(
+ PACKAGE_SRCPATH_MAP[pkg]
+ for pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ())
+ )]
+ getver = max(ver_paths, key=lambda path: git_log_output(path, '--format=format:%ct'))
+ print(f"Using {getver} for version number calculation of {SETUP_DIR}", file=sys.stderr)
return getver
def git_version_at_commit():
curdir = choose_version_from()
- myhash = subprocess.check_output(['git', 'log', '-n1', '--first-parent',
- '--format=%H', curdir]).strip()
- myversion = subprocess.check_output([SETUP_DIR+'/../../build/version-at-commit.sh', myhash]).strip().decode()
- return myversion
+ myhash = git_log_output(curdir, '--format=%H')
+ return subprocess.check_output(
+ [str(VERSION_SCRIPT_PATH), myhash],
+ text=True,
+ ).rstrip('\n')
def save_version(setup_dir, module, v):
- v = v.replace("~dev", ".dev").replace("~rc", "rc")
- with open(os.path.join(setup_dir, module, "_version.py"), 'wt') as fp:
- return fp.write("__version__ = '%s'\n" % v)
+ with Path(setup_dir, module, '_version.py').open('w') as fp:
+ print(f"__version__ = {v!r}", file=fp)
def read_version(setup_dir, module):
- with open(os.path.join(setup_dir, module, "_version.py"), 'rt') as fp:
- return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
-
-def get_version(setup_dir, module):
- env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+ file_vars = runpy.run_path(Path(setup_dir, module, '_version.py'))
+ return file_vars['__version__']
- if env_version:
- save_version(setup_dir, module, env_version)
+def get_version(setup_dir=SETUP_DIR, module=MODULE_NAME):
+ if ENV_VERSION:
+ version = ENV_VERSION
+ elif REPO_PATH is None:
+ return read_version(setup_dir, module)
else:
- try:
- save_version(setup_dir, module, git_version_at_commit())
- except (subprocess.CalledProcessError, OSError) as err:
- print("ERROR: {0}".format(err), file=sys.stderr)
- pass
+ version = git_version_at_commit()
+ version = version.replace("~dev", ".dev").replace("~rc", "rc")
+ save_version(setup_dir, module, version)
+ return version
+
+def iter_dependencies(version=None):
+ if version is None:
+ version = get_version()
+ # A packaged development release should be installed with other
+ # development packages built from the same source, but those
+ # dependencies may have earlier "dev" versions (read: less recent
+ # Git commit timestamps). This compatible version dependency
+ # expresses that as closely as possible. Allowing versions
+ # compatible with .dev0 allows any development release.
+ # Regular expression borrowed partially from
+ # <https://packaging.python.org/en/latest/specifications/version-specifiers/#version-specifiers-regex>
+ dep_ver, match_count = re.subn(r'\.dev(0|[1-9][0-9]*)$', '.dev0', version, 1)
+ dep_op = '~=' if match_count else '=='
+ for dep_pkg in PACKAGE_DEPENDENCY_MAP.get(PACKAGE_NAME, ()):
+ yield f'{dep_pkg}{dep_op}{dep_ver}'
- return read_version(setup_dir, module)
+# Called from calculate_python_sdk_cwl_package_versions() in run-library.sh
+if __name__ == '__main__':
+ print(get_version())
from setuptools import setup, find_packages
-SETUP_DIR = os.path.dirname(__file__) or '.'
-README = os.path.join(SETUP_DIR, 'README.rst')
-
import arvados_version
-version = arvados_version.get_version(SETUP_DIR, "arvados_user_activity")
+version = arvados_version.get_version()
+README = os.path.join(arvados_version.SETUP_DIR, 'README.rst')
setup(name='arvados-user-activity',
version=version,
('share/doc/arvados_user_activity', ['agpl-3.0.txt']),
],
install_requires=[
- 'arvados-python-client >= 2.2.0.dev20201118185221',
+ *arvados_version.iter_dependencies(version),
],
python_requires="~=3.8",
zip_safe=True,