Added method to computed weighted cost of different replication levels. Added main...
authorMisha Zatsman <misha@curoverse.com>
Wed, 9 Apr 2014 20:44:50 +0000 (20:44 +0000)
committerMisha Zatsman <misha@curoverse.com>
Wed, 9 Apr 2014 20:44:50 +0000 (20:44 +0000)
services/datamanager/datamanager.py

index a016fb81451736d2e2540c4edc601982d6c78980..666df97bd4765a9fe899a79aa01a351e912f75e7 100755 (executable)
@@ -12,7 +12,7 @@ import threading
 import urllib2
 
 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-from collections import defaultdict
+from collections import defaultdict, Counter
 from functools import partial
 from operator import itemgetter
 from SocketServer import ThreadingMixIn
@@ -224,6 +224,54 @@ def blockPersistedUsage(user_uuid, block_uuid):
   return (byteSizeFromValidUuid(block_uuid) *
           block_to_persister_replication[block_uuid].get(user_uuid, 0))
 
+def computeWeightedReplicationCosts(replication_levels):
+  """Computes the relative cost of varied replication levels.
+
+  replication_levels: a tuple of integers representing the desired
+  replication level.
+
+  Returns a dictionary from replication level to cost.
+
+  The basic thinking is that the cost of replicating at level x should
+  be shared by everyone who wants replication of level x or higher.
+
+  For example, if I have two users who want 1 copy, one user who
+  wants 3 copies and two users who want 6 copies:
+  the input would be [1, 1, 3, 6, 6] (or any permutation)
+
+  The cost of the first copy is shared by all 5 users, so they each
+  pay 1 copy / 5 users = 0.2.
+  The cost of the second and third copies shared by 3 users, so they
+  each pay 2 copies / 3 users = 0.67 (plus the above costs)
+  The cost of the fourth, fifth and sixth copies is shared by two
+  users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
+
+  Here are some sample other examples:
+  computeWeightedReplicationCosts([1,]) -> {1:1.0}
+  computeWeightedReplicationCosts([2,]) -> {2:2.0}
+  computeWeightedReplicationCosts([1,1]) -> {1:0.5}
+  computeWeightedReplicationCosts([2,2]) -> {1:1.0}
+  computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
+  computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
+  computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
+  """
+  replication_level_counts = sorted(Counter(replication_levels))
+  # The above, written to a string, could also serve as a hash key if
+  # we want to save on computation
+
+  last_level = 0
+  total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
+  cost_for_level = {}
+  for replication_level, count in replication_level_counts:
+    # compute cost
+    copies_added = replication_level - last_level
+    cost_for_level[replication_level] = (
+      cost_for_level[last_level] +
+      (total_interested * copies_added) / count)
+    # update invariants
+    last_level = replication_level
+    total_interested -= count
+
 def reportUserDiskUsage():
   for user, blocks in reader_to_blocks.items():
     user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
@@ -549,13 +597,12 @@ class DataManagerHandler(BaseHTTPRequestHandler):
 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
   """Handle requests in a separate thread."""
 
-#if __name__ == '__main__':
-
-if args.port == 0:
-  loadAllData()
-else:
-  loader = threading.Thread(target = loadAllData, name = 'loader')
-  loader.start()
+if __name__ == '__main__':
+  if args.port == 0:
+    loadAllData()
+  else:
+    loader = threading.Thread(target = loadAllData, name = 'loader')
+    loader.start()
 
-  server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
-  server.serve_forever()
+    server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
+    server.serve_forever()