X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5254bebf574f0f72bb0321e944d089ad6f6c25e2..ec4e2b3707c030dab6081db499adc5ac369e61bd:/services/datamanager/datamanager.py?ds=sidebyside diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py index a016fb8145..be2d81b290 100755 --- a/services/datamanager/datamanager.py +++ b/services/datamanager/datamanager.py @@ -12,7 +12,7 @@ import threading import urllib2 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler -from collections import defaultdict +from collections import defaultdict, Counter from functools import partial from operator import itemgetter from SocketServer import ThreadingMixIn @@ -224,6 +224,60 @@ def blockPersistedUsage(user_uuid, block_uuid): return (byteSizeFromValidUuid(block_uuid) * block_to_persister_replication[block_uuid].get(user_uuid, 0)) +memo_computeWeightedReplicationCosts = {} +def computeWeightedReplicationCosts(replication_levels): + """Computes the relative cost of varied replication levels. + + replication_levels: a tuple of integers representing the desired + replication level. + + Returns a dictionary from replication level to cost. + + The basic thinking is that the cost of replicating at level x should + be shared by everyone who wants replication of level x or higher. + + For example, if we have two users who want 1 copy, one user who + wants 3 copies and two users who want 6 copies: + the input would be [1, 1, 3, 6, 6] (or any permutation) + + The cost of the first copy is shared by all 5 users, so they each + pay 1 copy / 5 users = 0.2. + The cost of the second and third copies shared by 3 users, so they + each pay 2 copies / 3 users = 0.67 (plus the above costs) + The cost of the fourth, fifth and sixth copies is shared by two + users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs) + + Here are some sample other examples: + computeWeightedReplicationCosts([1,]) -> {1:1.0} + computeWeightedReplicationCosts([2,]) -> {2:2.0} + computeWeightedReplicationCosts([1,1]) -> {1:0.5} + computeWeightedReplicationCosts([2,2]) -> {1:1.0} + computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5} + computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5} + computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7} + """ + replication_level_counts = sorted(Counter(replication_levels).items()) + + memo_key = str(replication_level_counts) + + if not memo_key in memo_computeWeightedReplicationCosts: + last_level = 0 + current_cost = 0 + total_interested = float(sum(map(itemgetter(1), replication_level_counts))) + cost_for_level = {} + for replication_level, count in replication_level_counts: + copies_added = replication_level - last_level + # compute marginal cost from last level and add it to the last cost + current_cost += copies_added / total_interested + cost_for_level[replication_level] = current_cost + # update invariants + last_level = replication_level + total_interested -= count + memo_computeWeightedReplicationCosts[memo_key] = cost_for_level + + print memo_computeWeightedReplicationCosts + return memo_computeWeightedReplicationCosts[memo_key] + def reportUserDiskUsage(): for user, blocks in reader_to_blocks.items(): user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map( @@ -276,33 +330,8 @@ def computeReplication(keep_blocks): # This is the main flow here -parser = argparse.ArgumentParser(description='Report on keep disks.') -parser.add_argument('-m', - '--max-api-results', - type=int, - default=5000, - help=('The max results to get at once.')) -parser.add_argument('-p', - '--port', - type=int, - default=9090, - help=('The port number to serve on. 0 means no server.')) -parser.add_argument('-v', - '--verbose', - help='increase output verbosity', - action='store_true') -parser.add_argument('-u', - '--uuid', - help='uuid of specific collection to process') -parser.add_argument('--require-admin-user', - action='store_true', - default=True, - help='Fail if the user is not an admin [default]') -parser.add_argument('--no-require-admin-user', - dest='require_admin_user', - action='store_false', - help='Allow users without admin permissions with only a warning.') -args = parser.parse_args() + +args = None log = logging.getLogger('arvados.services.datamanager') stderr_handler = logging.StreamHandler() @@ -549,13 +578,43 @@ class DataManagerHandler(BaseHTTPRequestHandler): class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): """Handle requests in a separate thread.""" -#if __name__ == '__main__': - -if args.port == 0: - loadAllData() -else: - loader = threading.Thread(target = loadAllData, name = 'loader') - loader.start() +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Report on keep disks.') + parser.add_argument('-m', + '--max-api-results', + type=int, + default=5000, + help=('The max results to get at once.')) + parser.add_argument('-p', + '--port', + type=int, + default=9090, + help=('The port number to serve on. 0 means no server.')) + parser.add_argument('-v', + '--verbose', + help='increase output verbosity', + action='store_true') + parser.add_argument('-u', + '--uuid', + help='uuid of specific collection to process') + parser.add_argument('--require-admin-user', + action='store_true', + default=True, + help='Fail if the user is not an admin [default]') + parser.add_argument('--no-require-admin-user', + dest='require_admin_user', + action='store_false', + help='Allow users without admin permissions with only a warning.') + + + args = parser.parse_args() + + + if args.port == 0: + loadAllData() + else: + loader = threading.Thread(target = loadAllData, name = 'loader') + loader.start() - server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler) - server.serve_forever() + server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler) + server.serve_forever()