Add 'sdk/java-v2/' from commit '55f103e336ca9fb8bf1720d2ef4ee8dd4e221118'
[arvados.git] / sdk / java-v2 / src / main / java / org / arvados / client / logic / keep / KeepClient.java
1 /*
2  * Copyright (C) The Arvados Authors. All rights reserved.
3  *
4  * SPDX-License-Identifier: AGPL-3.0 OR Apache-2.0
5  *
6  */
7
8 package org.arvados.client.logic.keep;
9
10 import com.google.common.collect.Lists;
11 import org.apache.commons.codec.digest.DigestUtils;
12 import org.apache.commons.io.FileUtils;
13 import org.arvados.client.api.client.KeepServicesApiClient;
14 import org.arvados.client.api.model.KeepService;
15 import org.arvados.client.api.model.KeepServiceList;
16 import org.arvados.client.common.Characters;
17 import org.arvados.client.common.Headers;
18 import org.arvados.client.config.ConfigProvider;
19 import org.arvados.client.exception.ArvadosApiException;
20 import org.arvados.client.exception.ArvadosClientException;
21 import org.slf4j.Logger;
22
23 import java.io.File;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Objects;
30 import java.util.concurrent.CompletableFuture;
31 import java.util.function.Function;
32 import java.util.stream.Collectors;
33 import java.util.stream.Stream;
34
35 public class KeepClient {
36
37     private final KeepServicesApiClient keepServicesApiClient;
38     private final Logger log = org.slf4j.LoggerFactory.getLogger(KeepClient.class);
39     private List<KeepService> keepServices;
40     private List<KeepService> writableServices;
41     private Map<String, KeepService> gatewayServices;
42     private final String apiToken;
43     private Integer maxReplicasPerService;
44     private final ConfigProvider config;
45
46     public KeepClient(ConfigProvider config) {
47         this.config = config;
48         keepServicesApiClient = new KeepServicesApiClient(config);
49         apiToken = config.getApiToken();
50     }
51
52     public byte[] getDataChunk(KeepLocator keepLocator) {
53
54         Map<String, String> headers = new HashMap<>();
55         Map<String, FileTransferHandler> rootsMap = new HashMap<>();
56
57         List<String> sortedRoots = mapNewServices(rootsMap, keepLocator, false, false, headers);
58
59         byte[] dataChunk = sortedRoots
60                 .stream()
61                 .map(rootsMap::get)
62                 .map(r -> r.get(keepLocator))
63                 .filter(Objects::nonNull)
64                 .findFirst()
65                 .orElse(null);
66
67         if (dataChunk == null) {
68             throw new ArvadosClientException("No server responding. Unable to download data chunk.");
69         }
70
71         return dataChunk;
72     }
73
74     public String put(File data, int copies, int numRetries) {
75
76         byte[] fileBytes;
77         try {
78             fileBytes = FileUtils.readFileToByteArray(data);
79         } catch (IOException e) {
80             throw new ArvadosClientException("An error occurred while reading data chunk", e);
81         }
82
83         String dataHash = DigestUtils.md5Hex(fileBytes);
84         String locatorString = String.format("%s+%d", dataHash, data.length());
85
86         if (copies < 1) {
87             return locatorString;
88         }
89         KeepLocator locator = new KeepLocator(locatorString);
90
91         // Tell the proxy how many copies we want it to store
92         Map<String, String> headers = new HashMap<>();
93         headers.put(Headers.X_KEEP_DESIRED_REPLICAS, String.valueOf(copies));
94
95         Map<String, FileTransferHandler> rootsMap = new HashMap<>();
96         List<String> sortedRoots = mapNewServices(rootsMap, locator, false, true, headers);
97
98         int numThreads = 0;
99         if (maxReplicasPerService == null || maxReplicasPerService >= copies) {
100             numThreads = 1;
101         } else {
102             numThreads = ((Double) Math.ceil(1.0 * copies / maxReplicasPerService)).intValue();
103         }
104         log.debug("Pool max threads is {}", numThreads);
105
106         List<CompletableFuture<String>> futures = Lists.newArrayList();
107         for (int i = 0; i < numThreads; i++) {
108             String root = sortedRoots.get(i);
109             FileTransferHandler keepServiceLocal = rootsMap.get(root);
110             futures.add(CompletableFuture.supplyAsync(() -> keepServiceLocal.put(dataHash, data)));
111         }
112
113         @SuppressWarnings("unchecked")
114         CompletableFuture<String>[] array = futures.toArray(new CompletableFuture[0]);
115
116         return Stream.of(array)
117                 .map(CompletableFuture::join)
118                 .reduce((a, b) -> b)
119                 .orElse(null);
120     }
121
122     private List<String> mapNewServices(Map<String, FileTransferHandler> rootsMap, KeepLocator locator,
123                                         boolean forceRebuild, boolean needWritable, Map<String, String> headers) {
124
125         headers.putIfAbsent("Authorization", String.format("OAuth2 %s", apiToken));
126         List<String> localRoots = weightedServiceRoots(locator, forceRebuild, needWritable);
127         for (String root : localRoots) {
128             FileTransferHandler keepServiceLocal = new FileTransferHandler(root, headers, config);
129             rootsMap.putIfAbsent(root, keepServiceLocal);
130         }
131         return localRoots;
132     }
133
134     /**
135      * Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing
136      * data with the given hash+hints.
137      */
138     private List<String> weightedServiceRoots(KeepLocator locator, boolean forceRebuild, boolean needWritable) {
139
140         buildServicesList(forceRebuild);
141
142         List<String> sortedRoots = new ArrayList<>();
143
144         // Use the services indicated by the given +K@... remote
145         // service hints, if any are present and can be resolved to a
146         // URI.
147         //
148         for (String hint : locator.getHints()) {
149             if (hint.startsWith("K@")) {
150                 if (hint.length() == 7) {
151                     sortedRoots.add(String.format("https://keep.%s.arvadosapi.com/", hint.substring(2)));
152                 } else if (hint.length() == 29) {
153                     KeepService svc = gatewayServices.get(hint.substring(2));
154                     if (svc != null) {
155                         sortedRoots.add(svc.getServiceRoot());
156                     }
157                 }
158             }
159         }
160
161         // Sort the available local services by weight (heaviest first)
162         // for this locator, and return their service_roots (base URIs)
163         // in that order.
164         List<KeepService> useServices = keepServices;
165         if (needWritable) {
166             useServices = writableServices;
167         }
168         anyNonDiskServices(useServices);
169
170         sortedRoots.addAll(useServices
171                 .stream()
172                 .sorted((ks1, ks2) -> serviceWeight(locator.getMd5sum(), ks2.getUuid())
173                         .compareTo(serviceWeight(locator.getMd5sum(), ks1.getUuid())))
174                 .map(KeepService::getServiceRoot)
175                 .collect(Collectors.toList()));
176
177         return sortedRoots;
178     }
179
180     private void buildServicesList(boolean forceRebuild) {
181         if (keepServices != null && !forceRebuild) {
182             return;
183         }
184         KeepServiceList keepServiceList;
185         try {
186             keepServiceList = keepServicesApiClient.accessible();
187         } catch (ArvadosApiException e) {
188             throw new ArvadosClientException("Cannot obtain list of accessible keep services");
189         }
190         // Gateway services are only used when specified by UUID,
191         // so there's nothing to gain by filtering them by
192         // service_type.
193         gatewayServices = keepServiceList.getItems().stream().collect(Collectors.toMap(KeepService::getUuid, Function.identity()));
194
195         if (gatewayServices.isEmpty()) {
196             throw new ArvadosClientException("No gateway services available!");
197         }
198
199         // Precompute the base URI for each service.
200         for (KeepService keepService : gatewayServices.values()) {
201             String serviceHost = keepService.getServiceHost();
202             if (!serviceHost.startsWith("[") && serviceHost.contains(Characters.COLON)) {
203                 // IPv6 URIs must be formatted like http://[::1]:80/...
204                 serviceHost = String.format("[%s]", serviceHost);
205             }
206
207             String protocol = keepService.getServiceSslFlag() ? "https" : "http";
208             String serviceRoot = String.format("%s://%s:%d/", protocol, serviceHost, keepService.getServicePort());
209             keepService.setServiceRoot(serviceRoot);
210         }
211
212         keepServices = gatewayServices.values().stream().filter(ks -> !ks.getServiceType().startsWith("gateway:")).collect(Collectors.toList());
213         writableServices = keepServices.stream().filter(ks -> !ks.getReadOnly()).collect(Collectors.toList());
214
215         // For disk type services, max_replicas_per_service is 1
216         // It is unknown (unlimited) for other service types.
217         if (anyNonDiskServices(writableServices)) {
218             maxReplicasPerService = null;
219         } else {
220             maxReplicasPerService = 1;
221         }
222     }
223
224     private Boolean anyNonDiskServices(List<KeepService> useServices) {
225         return useServices.stream().anyMatch(ks -> !ks.getServiceType().equals("disk"));
226     }
227
228     /**
229      * Compute the weight of a Keep service endpoint for a data block with a known hash.
230      * <p>
231      * The weight is md5(h + u) where u is the last 15 characters of the service endpoint's UUID.
232      */
233     private static String serviceWeight(String dataHash, String serviceUuid) {
234         String shortenedUuid;
235         if (serviceUuid != null && serviceUuid.length() >= 15) {
236             int substringIndex = serviceUuid.length() - 15;
237             shortenedUuid = serviceUuid.substring(substringIndex);
238         } else {
239             shortenedUuid = (serviceUuid == null) ? "" : serviceUuid;
240         }
241         return DigestUtils.md5Hex(dataHash + shortenedUuid);
242     }
243
244 }