/* * Copyright (C) The Arvados Authors. All rights reserved. * * SPDX-License-Identifier: AGPL-3.0 OR Apache-2.0 * */ package org.arvados.client.logic.keep; import com.google.common.collect.Lists; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; import org.arvados.client.api.client.KeepServicesApiClient; import org.arvados.client.api.model.KeepService; import org.arvados.client.api.model.KeepServiceList; import org.arvados.client.common.Characters; import org.arvados.client.common.Headers; import org.arvados.client.config.ConfigProvider; import org.arvados.client.exception.ArvadosApiException; import org.arvados.client.exception.ArvadosClientException; import org.slf4j.Logger; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; public class KeepClient { private final KeepServicesApiClient keepServicesApiClient; private final Logger log = org.slf4j.LoggerFactory.getLogger(KeepClient.class); private List keepServices; private List writableServices; private Map gatewayServices; private final String apiToken; private Integer maxReplicasPerService; private final ConfigProvider config; public KeepClient(ConfigProvider config) { this.config = config; keepServicesApiClient = new KeepServicesApiClient(config); apiToken = config.getApiToken(); } public byte[] getDataChunk(KeepLocator keepLocator) { Map headers = new HashMap<>(); Map rootsMap = new HashMap<>(); List sortedRoots = mapNewServices(rootsMap, keepLocator, false, false, headers); byte[] dataChunk = sortedRoots .stream() .map(rootsMap::get) .map(r -> r.get(keepLocator)) .filter(Objects::nonNull) .findFirst() .orElse(null); if (dataChunk == null) { throw new ArvadosClientException("No server responding. Unable to download data chunk."); } return dataChunk; } public String put(File data, int copies, int numRetries) { byte[] fileBytes; try { fileBytes = FileUtils.readFileToByteArray(data); } catch (IOException e) { throw new ArvadosClientException("An error occurred while reading data chunk", e); } String dataHash = DigestUtils.md5Hex(fileBytes); String locatorString = String.format("%s+%d", dataHash, data.length()); if (copies < 1) { return locatorString; } KeepLocator locator = new KeepLocator(locatorString); // Tell the proxy how many copies we want it to store Map headers = new HashMap<>(); headers.put(Headers.X_KEEP_DESIRED_REPLICAS, String.valueOf(copies)); Map rootsMap = new HashMap<>(); List sortedRoots = mapNewServices(rootsMap, locator, false, true, headers); int numThreads = 0; if (maxReplicasPerService == null || maxReplicasPerService >= copies) { numThreads = 1; } else { numThreads = ((Double) Math.ceil(1.0 * copies / maxReplicasPerService)).intValue(); } log.debug("Pool max threads is {}", numThreads); List> futures = Lists.newArrayList(); for (int i = 0; i < numThreads; i++) { String root = sortedRoots.get(i); FileTransferHandler keepServiceLocal = rootsMap.get(root); futures.add(CompletableFuture.supplyAsync(() -> keepServiceLocal.put(dataHash, data))); } @SuppressWarnings("unchecked") CompletableFuture[] array = futures.toArray(new CompletableFuture[0]); return Stream.of(array) .map(CompletableFuture::join) .reduce((a, b) -> b) .orElse(null); } private List mapNewServices(Map rootsMap, KeepLocator locator, boolean forceRebuild, boolean needWritable, Map headers) { headers.putIfAbsent("Authorization", String.format("OAuth2 %s", apiToken)); List localRoots = weightedServiceRoots(locator, forceRebuild, needWritable); for (String root : localRoots) { FileTransferHandler keepServiceLocal = new FileTransferHandler(root, headers, config); rootsMap.putIfAbsent(root, keepServiceLocal); } return localRoots; } /** * Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing * data with the given hash+hints. */ private List weightedServiceRoots(KeepLocator locator, boolean forceRebuild, boolean needWritable) { buildServicesList(forceRebuild); List sortedRoots = new ArrayList<>(); // Use the services indicated by the given +K@... remote // service hints, if any are present and can be resolved to a // URI. // for (String hint : locator.getHints()) { if (hint.startsWith("K@")) { if (hint.length() == 7) { sortedRoots.add(String.format("https://keep.%s.arvadosapi.com/", hint.substring(2))); } else if (hint.length() == 29) { KeepService svc = gatewayServices.get(hint.substring(2)); if (svc != null) { sortedRoots.add(svc.getServiceRoot()); } } } } // Sort the available local services by weight (heaviest first) // for this locator, and return their service_roots (base URIs) // in that order. List useServices = keepServices; if (needWritable) { useServices = writableServices; } anyNonDiskServices(useServices); sortedRoots.addAll(useServices .stream() .sorted((ks1, ks2) -> serviceWeight(locator.getMd5sum(), ks2.getUuid()) .compareTo(serviceWeight(locator.getMd5sum(), ks1.getUuid()))) .map(KeepService::getServiceRoot) .collect(Collectors.toList())); return sortedRoots; } private void buildServicesList(boolean forceRebuild) { if (keepServices != null && !forceRebuild) { return; } KeepServiceList keepServiceList; try { keepServiceList = keepServicesApiClient.accessible(); } catch (ArvadosApiException e) { throw new ArvadosClientException("Cannot obtain list of accessible keep services"); } // Gateway services are only used when specified by UUID, // so there's nothing to gain by filtering them by // service_type. gatewayServices = keepServiceList.getItems().stream().collect(Collectors.toMap(KeepService::getUuid, Function.identity())); if (gatewayServices.isEmpty()) { throw new ArvadosClientException("No gateway services available!"); } // Precompute the base URI for each service. for (KeepService keepService : gatewayServices.values()) { String serviceHost = keepService.getServiceHost(); if (!serviceHost.startsWith("[") && serviceHost.contains(Characters.COLON)) { // IPv6 URIs must be formatted like http://[::1]:80/... serviceHost = String.format("[%s]", serviceHost); } String protocol = keepService.getServiceSslFlag() ? "https" : "http"; String serviceRoot = String.format("%s://%s:%d/", protocol, serviceHost, keepService.getServicePort()); keepService.setServiceRoot(serviceRoot); } keepServices = gatewayServices.values().stream().filter(ks -> !ks.getServiceType().startsWith("gateway:")).collect(Collectors.toList()); writableServices = keepServices.stream().filter(ks -> !ks.getReadOnly()).collect(Collectors.toList()); // For disk type services, max_replicas_per_service is 1 // It is unknown (unlimited) for other service types. if (anyNonDiskServices(writableServices)) { maxReplicasPerService = null; } else { maxReplicasPerService = 1; } } private Boolean anyNonDiskServices(List useServices) { return useServices.stream().anyMatch(ks -> !ks.getServiceType().equals("disk")); } /** * Compute the weight of a Keep service endpoint for a data block with a known hash. *

* The weight is md5(h + u) where u is the last 15 characters of the service endpoint's UUID. */ private static String serviceWeight(String dataHash, String serviceUuid) { String shortenedUuid; if (serviceUuid != null && serviceUuid.length() >= 15) { int substringIndex = serviceUuid.length() - 15; shortenedUuid = serviceUuid.substring(substringIndex); } else { shortenedUuid = (serviceUuid == null) ? "" : serviceUuid; } return DigestUtils.md5Hex(dataHash + shortenedUuid); } }