2 * Copyright (C) The Arvados Authors. All rights reserved.
4 * SPDX-License-Identifier: AGPL-3.0 OR Apache-2.0
8 package org.arvados.client.logic.keep;
10 import com.google.common.collect.Lists;
11 import org.apache.commons.codec.digest.DigestUtils;
12 import org.apache.commons.io.FileUtils;
13 import org.arvados.client.api.client.KeepServicesApiClient;
14 import org.arvados.client.api.model.KeepService;
15 import org.arvados.client.api.model.KeepServiceList;
16 import org.arvados.client.common.Characters;
17 import org.arvados.client.common.Headers;
18 import org.arvados.client.config.ConfigProvider;
19 import org.arvados.client.exception.ArvadosApiException;
20 import org.arvados.client.exception.ArvadosClientException;
21 import org.slf4j.Logger;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Objects;
30 import java.util.concurrent.CompletableFuture;
31 import java.util.function.Function;
32 import java.util.stream.Collectors;
33 import java.util.stream.Stream;
35 public class KeepClient {
37 private final KeepServicesApiClient keepServicesApiClient;
38 private final Logger log = org.slf4j.LoggerFactory.getLogger(KeepClient.class);
39 private List<KeepService> keepServices;
40 private List<KeepService> writableServices;
41 private Map<String, KeepService> gatewayServices;
42 private final String apiToken;
43 private Integer maxReplicasPerService;
44 private final ConfigProvider config;
46 public KeepClient(ConfigProvider config) {
48 keepServicesApiClient = new KeepServicesApiClient(config);
49 apiToken = config.getApiToken();
52 public byte[] getDataChunk(KeepLocator keepLocator) {
54 Map<String, String> headers = new HashMap<>();
55 Map<String, FileTransferHandler> rootsMap = new HashMap<>();
57 List<String> sortedRoots = mapNewServices(rootsMap, keepLocator, false, false, headers);
59 byte[] dataChunk = sortedRoots
62 .map(r -> r.get(keepLocator))
63 .filter(Objects::nonNull)
67 if (dataChunk == null) {
68 throw new ArvadosClientException("No server responding. Unable to download data chunk.");
74 public String put(File data, int copies, int numRetries) {
78 fileBytes = FileUtils.readFileToByteArray(data);
79 } catch (IOException e) {
80 throw new ArvadosClientException("An error occurred while reading data chunk", e);
83 String dataHash = DigestUtils.md5Hex(fileBytes);
84 String locatorString = String.format("%s+%d", dataHash, data.length());
89 KeepLocator locator = new KeepLocator(locatorString);
91 // Tell the proxy how many copies we want it to store
92 Map<String, String> headers = new HashMap<>();
93 headers.put(Headers.X_KEEP_DESIRED_REPLICAS, String.valueOf(copies));
95 Map<String, FileTransferHandler> rootsMap = new HashMap<>();
96 List<String> sortedRoots = mapNewServices(rootsMap, locator, false, true, headers);
99 if (maxReplicasPerService == null || maxReplicasPerService >= copies) {
102 numThreads = ((Double) Math.ceil(1.0 * copies / maxReplicasPerService)).intValue();
104 log.debug("Pool max threads is {}", numThreads);
106 List<CompletableFuture<String>> futures = Lists.newArrayList();
107 for (int i = 0; i < numThreads; i++) {
108 String root = sortedRoots.get(i);
109 FileTransferHandler keepServiceLocal = rootsMap.get(root);
110 futures.add(CompletableFuture.supplyAsync(() -> keepServiceLocal.put(dataHash, data)));
113 @SuppressWarnings("unchecked")
114 CompletableFuture<String>[] array = futures.toArray(new CompletableFuture[0]);
116 return Stream.of(array)
117 .map(CompletableFuture::join)
122 private List<String> mapNewServices(Map<String, FileTransferHandler> rootsMap, KeepLocator locator,
123 boolean forceRebuild, boolean needWritable, Map<String, String> headers) {
125 headers.putIfAbsent("Authorization", String.format("OAuth2 %s", apiToken));
126 List<String> localRoots = weightedServiceRoots(locator, forceRebuild, needWritable);
127 for (String root : localRoots) {
128 FileTransferHandler keepServiceLocal = new FileTransferHandler(root, headers, config);
129 rootsMap.putIfAbsent(root, keepServiceLocal);
135 * Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing
136 * data with the given hash+hints.
138 private List<String> weightedServiceRoots(KeepLocator locator, boolean forceRebuild, boolean needWritable) {
140 buildServicesList(forceRebuild);
142 List<String> sortedRoots = new ArrayList<>();
144 // Use the services indicated by the given +K@... remote
145 // service hints, if any are present and can be resolved to a
148 for (String hint : locator.getHints()) {
149 if (hint.startsWith("K@")) {
150 if (hint.length() == 7) {
151 sortedRoots.add(String.format("https://keep.%s.arvadosapi.com/", hint.substring(2)));
152 } else if (hint.length() == 29) {
153 KeepService svc = gatewayServices.get(hint.substring(2));
155 sortedRoots.add(svc.getServiceRoot());
161 // Sort the available local services by weight (heaviest first)
162 // for this locator, and return their service_roots (base URIs)
164 List<KeepService> useServices = keepServices;
166 useServices = writableServices;
168 anyNonDiskServices(useServices);
170 sortedRoots.addAll(useServices
172 .sorted((ks1, ks2) -> serviceWeight(locator.getMd5sum(), ks2.getUuid())
173 .compareTo(serviceWeight(locator.getMd5sum(), ks1.getUuid())))
174 .map(KeepService::getServiceRoot)
175 .collect(Collectors.toList()));
180 private void buildServicesList(boolean forceRebuild) {
181 if (keepServices != null && !forceRebuild) {
184 KeepServiceList keepServiceList;
186 keepServiceList = keepServicesApiClient.accessible();
187 } catch (ArvadosApiException e) {
188 throw new ArvadosClientException("Cannot obtain list of accessible keep services");
190 // Gateway services are only used when specified by UUID,
191 // so there's nothing to gain by filtering them by
193 gatewayServices = keepServiceList.getItems().stream().collect(Collectors.toMap(KeepService::getUuid, Function.identity()));
195 if (gatewayServices.isEmpty()) {
196 throw new ArvadosClientException("No gateway services available!");
199 // Precompute the base URI for each service.
200 for (KeepService keepService : gatewayServices.values()) {
201 String serviceHost = keepService.getServiceHost();
202 if (!serviceHost.startsWith("[") && serviceHost.contains(Characters.COLON)) {
203 // IPv6 URIs must be formatted like http://[::1]:80/...
204 serviceHost = String.format("[%s]", serviceHost);
207 String protocol = keepService.getServiceSslFlag() ? "https" : "http";
208 String serviceRoot = String.format("%s://%s:%d/", protocol, serviceHost, keepService.getServicePort());
209 keepService.setServiceRoot(serviceRoot);
212 keepServices = gatewayServices.values().stream().filter(ks -> !ks.getServiceType().startsWith("gateway:")).collect(Collectors.toList());
213 writableServices = keepServices.stream().filter(ks -> !ks.getReadOnly()).collect(Collectors.toList());
215 // For disk type services, max_replicas_per_service is 1
216 // It is unknown (unlimited) for other service types.
217 if (anyNonDiskServices(writableServices)) {
218 maxReplicasPerService = null;
220 maxReplicasPerService = 1;
224 private Boolean anyNonDiskServices(List<KeepService> useServices) {
225 return useServices.stream().anyMatch(ks -> !ks.getServiceType().equals("disk"));
229 * Compute the weight of a Keep service endpoint for a data block with a known hash.
231 * The weight is md5(h + u) where u is the last 15 characters of the service endpoint's UUID.
233 private static String serviceWeight(String dataHash, String serviceUuid) {
234 String shortenedUuid;
235 if (serviceUuid != null && serviceUuid.length() >= 15) {
236 int substringIndex = serviceUuid.length() - 15;
237 shortenedUuid = serviceUuid.substring(substringIndex);
239 shortenedUuid = (serviceUuid == null) ? "" : serviceUuid;
241 return DigestUtils.md5Hex(dataHash + shortenedUuid);