Upgrade Akka to 2.0.2
[aquarium] / doc / arch / loadgen.py
1 #!/usr/bin/python
2 from signal import SIGTERM, signal, SIGINT
3
4 import pika
5 import random
6 import json
7 import hashlib
8 import sys
9 import os
10 import time
11
12
13 def msg():
14     msg = {}
15     msg['id']             = hashlib.sha1(os.urandom(20)).hexdigest()
16 #    msg['occurredMillis'] = random.randrange(1293840000, 1325376000) * 1000
17 #    msg['receivedMillis'] = random.randrange(1293840000, 1325376000) * 1000
18     msg['occurredMillis']  =  int(time.time() * 1000)
19     msg['receivedMillis']  =  int(time.time() * 1000)
20     msg['userId']         = str(random.randrange(1, 1000))
21     msg['clientId']       = str(random.randrange(1, 4))
22     msg['resource']       = random.sample(("bandwidthup", "bandwidthdown",
23                                            "vmtime", "diskspace"), 1)[0]
24     msg['eventVersion']   = str(1)
25
26     if msg['resource'] == 'vmtime':
27         msg['instanceId'] = str(random.randrange(1, 4000))
28         msg['value']    = random.randrange(0, 2)
29     else:
30         msg['instanceId'] = 1 
31         msg['value'] = random.randrange(1, 5000)
32     msg['details'] = {} 
33     return msg
34
35 def parse_arguments(args):
36     from optparse import OptionParser
37
38     parser = OptionParser()
39     parser.add_option("-p", "--passwd", dest="passwd", help="Password")
40     parser.add_option("-r", "--rate", dest="rate", type="int",
41                       help="Number of msgs/sec", default = 5)
42     return parser.parse_args(args)
43
44
45 def exit_handler(signum, frame):
46     print "Number of messages: %d" % num_msgs
47     connection.close()
48     exit
49
50 (opts, args) = parse_arguments(sys.argv[1:])
51
52 credentials = pika.PlainCredentials('rabbit', opts.passwd)
53 parameters = pika.ConnectionParameters(
54     host = 'dev72.dev.grnet.gr',
55     credentials = credentials)
56 connection = pika.BlockingConnection(parameters)
57
58 channel = connection.channel()
59 channel.exchange_declare(exchange='pithos', passive = True)
60 connection.set_backpressure_multiplier(1000)
61
62 random.seed(0xdeadbabe)
63
64 num_msgs = 0
65 # signal(SIGTERM, exit_handler)
66 # signal(SIGINT, exit_handler)
67 oldtime = time.time() * 1000
68 old_messages = 0
69
70 while True:
71     foo = msg()
72     print "QUEUE %s %d" % (foo['id'], time.time() * 1000) 
73     # Construct a message and send it
74     #print json.dumps(foo)
75     channel.basic_publish(exchange='pithos',
76                       routing_key='pithos.resource.%s' % foo['resource'],
77                       body=json.dumps(foo),
78                       properties=pika.BasicProperties(
79                           content_type="text/plain",
80                           delivery_mode=2))
81     num_msgs += 1
82     newtime = time.time() * 1000
83
84     if newtime - oldtime < 1000:
85         if num_msgs - old_messages >= opts.rate:
86             toSleep = float(1000 - newtime + oldtime)
87             #print "msgs: %d sleeping for %f" % (num_msgs, toSleep)
88             time.sleep(toSleep / 1000)
89             oldtime = newtime
90             old_messages = num_msgs
91     else:
92         oldtime = newtime
93         old_messages = num_msgs