Added memoization to computeWeightedReplicationCosts.
[arvados.git] / services / datamanager / datamanager.py
index a016fb81451736d2e2540c4edc601982d6c78980..be2d81b290be9f4a4c578daeb00ac6ae7d431462 100755 (executable)
@@ -12,7 +12,7 @@ import threading
 import urllib2
 
 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-from collections import defaultdict
+from collections import defaultdict, Counter
 from functools import partial
 from operator import itemgetter
 from SocketServer import ThreadingMixIn
@@ -224,6 +224,60 @@ def blockPersistedUsage(user_uuid, block_uuid):
   return (byteSizeFromValidUuid(block_uuid) *
           block_to_persister_replication[block_uuid].get(user_uuid, 0))
 
+memo_computeWeightedReplicationCosts = {}
+def computeWeightedReplicationCosts(replication_levels):
+  """Computes the relative cost of varied replication levels.
+
+  replication_levels: a tuple of integers representing the desired
+  replication level.
+
+  Returns a dictionary from replication level to cost.
+
+  The basic thinking is that the cost of replicating at level x should
+  be shared by everyone who wants replication of level x or higher.
+
+  For example, if we have two users who want 1 copy, one user who
+  wants 3 copies and two users who want 6 copies:
+  the input would be [1, 1, 3, 6, 6] (or any permutation)
+
+  The cost of the first copy is shared by all 5 users, so they each
+  pay 1 copy / 5 users = 0.2.
+  The cost of the second and third copies shared by 3 users, so they
+  each pay 2 copies / 3 users = 0.67 (plus the above costs)
+  The cost of the fourth, fifth and sixth copies is shared by two
+  users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
+
+  Here are some sample other examples:
+  computeWeightedReplicationCosts([1,]) -> {1:1.0}
+  computeWeightedReplicationCosts([2,]) -> {2:2.0}
+  computeWeightedReplicationCosts([1,1]) -> {1:0.5}
+  computeWeightedReplicationCosts([2,2]) -> {1:1.0}
+  computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
+  computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
+  computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
+  """
+  replication_level_counts = sorted(Counter(replication_levels).items())
+
+  memo_key = str(replication_level_counts)
+
+  if not memo_key in memo_computeWeightedReplicationCosts:
+    last_level = 0
+    current_cost = 0
+    total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
+    cost_for_level = {}
+    for replication_level, count in replication_level_counts:
+      copies_added = replication_level - last_level
+      # compute marginal cost from last level and add it to the last cost
+      current_cost += copies_added / total_interested
+      cost_for_level[replication_level] = current_cost
+      # update invariants
+      last_level = replication_level
+      total_interested -= count
+    memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
+
+  print memo_computeWeightedReplicationCosts
+  return memo_computeWeightedReplicationCosts[memo_key]
+
 def reportUserDiskUsage():
   for user, blocks in reader_to_blocks.items():
     user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
@@ -276,33 +330,8 @@ def computeReplication(keep_blocks):
 
 # This is the main flow here
 
-parser = argparse.ArgumentParser(description='Report on keep disks.')
-parser.add_argument('-m',
-                    '--max-api-results',
-                    type=int,
-                    default=5000,
-                    help=('The max results to get at once.'))
-parser.add_argument('-p',
-                    '--port',
-                    type=int,
-                    default=9090,
-                    help=('The port number to serve on. 0 means no server.'))
-parser.add_argument('-v',
-                    '--verbose',
-                    help='increase output verbosity',
-                    action='store_true')
-parser.add_argument('-u',
-                    '--uuid',
-                    help='uuid of specific collection to process')
-parser.add_argument('--require-admin-user',
-                    action='store_true',
-                    default=True,
-                    help='Fail if the user is not an admin [default]')
-parser.add_argument('--no-require-admin-user',
-                    dest='require_admin_user',
-                    action='store_false',
-                    help='Allow users without admin permissions with only a warning.')
-args = parser.parse_args()
+
+args = None
 
 log = logging.getLogger('arvados.services.datamanager')
 stderr_handler = logging.StreamHandler()
@@ -549,13 +578,43 @@ class DataManagerHandler(BaseHTTPRequestHandler):
 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
   """Handle requests in a separate thread."""
 
-#if __name__ == '__main__':
-
-if args.port == 0:
-  loadAllData()
-else:
-  loader = threading.Thread(target = loadAllData, name = 'loader')
-  loader.start()
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser(description='Report on keep disks.')
+  parser.add_argument('-m',
+                      '--max-api-results',
+                      type=int,
+                      default=5000,
+                      help=('The max results to get at once.'))
+  parser.add_argument('-p',
+                      '--port',
+                      type=int,
+                      default=9090,
+                      help=('The port number to serve on. 0 means no server.'))
+  parser.add_argument('-v',
+                      '--verbose',
+                      help='increase output verbosity',
+                      action='store_true')
+  parser.add_argument('-u',
+                      '--uuid',
+                      help='uuid of specific collection to process')
+  parser.add_argument('--require-admin-user',
+                      action='store_true',
+                      default=True,
+                      help='Fail if the user is not an admin [default]')
+  parser.add_argument('--no-require-admin-user',
+                      dest='require_admin_user',
+                      action='store_false',
+                      help='Allow users without admin permissions with only a warning.')
+
+
+  args = parser.parse_args()
+
+
+  if args.port == 0:
+    loadAllData()
+  else:
+    loader = threading.Thread(target = loadAllData, name = 'loader')
+    loader.start()
 
-  server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
-  server.serve_forever()
+    server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
+    server.serve_forever()