import urllib2
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-from collections import defaultdict
+from collections import defaultdict, Counter
from functools import partial
from operator import itemgetter
from SocketServer import ThreadingMixIn
return (byteSizeFromValidUuid(block_uuid) *
block_to_persister_replication[block_uuid].get(user_uuid, 0))
+memo_computeWeightedReplicationCosts = {}
+def computeWeightedReplicationCosts(replication_levels):
+ """Computes the relative cost of varied replication levels.
+
+ replication_levels: a tuple of integers representing the desired
+ replication level.
+
+ Returns a dictionary from replication level to cost.
+
+ The basic thinking is that the cost of replicating at level x should
+ be shared by everyone who wants replication of level x or higher.
+
+ For example, if we have two users who want 1 copy, one user who
+ wants 3 copies and two users who want 6 copies:
+ the input would be [1, 1, 3, 6, 6] (or any permutation)
+
+ The cost of the first copy is shared by all 5 users, so they each
+ pay 1 copy / 5 users = 0.2.
+ The cost of the second and third copies shared by 3 users, so they
+ each pay 2 copies / 3 users = 0.67 (plus the above costs)
+ The cost of the fourth, fifth and sixth copies is shared by two
+ users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
+
+ Here are some sample other examples:
+ computeWeightedReplicationCosts([1,]) -> {1:1.0}
+ computeWeightedReplicationCosts([2,]) -> {2:2.0}
+ computeWeightedReplicationCosts([1,1]) -> {1:0.5}
+ computeWeightedReplicationCosts([2,2]) -> {1:1.0}
+ computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
+ computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
+ computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
+ """
+ replication_level_counts = sorted(Counter(replication_levels).items())
+
+ memo_key = str(replication_level_counts)
+
+ if not memo_key in memo_computeWeightedReplicationCosts:
+ last_level = 0
+ current_cost = 0
+ total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
+ cost_for_level = {}
+ for replication_level, count in replication_level_counts:
+ copies_added = replication_level - last_level
+ # compute marginal cost from last level and add it to the last cost
+ current_cost += copies_added / total_interested
+ cost_for_level[replication_level] = current_cost
+ # update invariants
+ last_level = replication_level
+ total_interested -= count
+ memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
+
+ print memo_computeWeightedReplicationCosts
+ return memo_computeWeightedReplicationCosts[memo_key]
+
def reportUserDiskUsage():
for user, blocks in reader_to_blocks.items():
user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
# This is the main flow here
-parser = argparse.ArgumentParser(description='Report on keep disks.')
-parser.add_argument('-m',
- '--max-api-results',
- type=int,
- default=5000,
- help=('The max results to get at once.'))
-parser.add_argument('-p',
- '--port',
- type=int,
- default=9090,
- help=('The port number to serve on. 0 means no server.'))
-parser.add_argument('-v',
- '--verbose',
- help='increase output verbosity',
- action='store_true')
-parser.add_argument('-u',
- '--uuid',
- help='uuid of specific collection to process')
-parser.add_argument('--require-admin-user',
- action='store_true',
- default=True,
- help='Fail if the user is not an admin [default]')
-parser.add_argument('--no-require-admin-user',
- dest='require_admin_user',
- action='store_false',
- help='Allow users without admin permissions with only a warning.')
-args = parser.parse_args()
+
+args = None
log = logging.getLogger('arvados.services.datamanager')
stderr_handler = logging.StreamHandler()
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""Handle requests in a separate thread."""
-#if __name__ == '__main__':
-
-if args.port == 0:
- loadAllData()
-else:
- loader = threading.Thread(target = loadAllData, name = 'loader')
- loader.start()
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description='Report on keep disks.')
+ parser.add_argument('-m',
+ '--max-api-results',
+ type=int,
+ default=5000,
+ help=('The max results to get at once.'))
+ parser.add_argument('-p',
+ '--port',
+ type=int,
+ default=9090,
+ help=('The port number to serve on. 0 means no server.'))
+ parser.add_argument('-v',
+ '--verbose',
+ help='increase output verbosity',
+ action='store_true')
+ parser.add_argument('-u',
+ '--uuid',
+ help='uuid of specific collection to process')
+ parser.add_argument('--require-admin-user',
+ action='store_true',
+ default=True,
+ help='Fail if the user is not an admin [default]')
+ parser.add_argument('--no-require-admin-user',
+ dest='require_admin_user',
+ action='store_false',
+ help='Allow users without admin permissions with only a warning.')
+
+
+ args = parser.parse_args()
+
+
+ if args.port == 0:
+ loadAllData()
+ else:
+ loader = threading.Thread(target = loadAllData, name = 'loader')
+ loader.start()
- server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
- server.serve_forever()
+ server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
+ server.serve_forever()