Statistics
| Branch: | Tag: | Revision:

root / doc / arch / loadgen.py @ dbc6b371

History | View | Annotate | Download (2.9 kB)

1 66785e62 Georgios Gousios
#!/usr/bin/python
2 66785e62 Georgios Gousios
from signal import SIGTERM, signal, SIGINT
3 66785e62 Georgios Gousios
4 66785e62 Georgios Gousios
import pika
5 66785e62 Georgios Gousios
import random
6 66785e62 Georgios Gousios
import json
7 66785e62 Georgios Gousios
import hashlib
8 66785e62 Georgios Gousios
import sys
9 66785e62 Georgios Gousios
import os
10 66785e62 Georgios Gousios
import time
11 66785e62 Georgios Gousios
12 66785e62 Georgios Gousios
13 66785e62 Georgios Gousios
def msg():
14 66785e62 Georgios Gousios
    msg = {}
15 66785e62 Georgios Gousios
    msg['id']             = hashlib.sha1(os.urandom(20)).hexdigest()
16 0f9d3049 Georgios Gousios
#    msg['occurredMillis'] = random.randrange(1293840000, 1325376000) * 1000
17 0f9d3049 Georgios Gousios
#    msg['receivedMillis'] = random.randrange(1293840000, 1325376000) * 1000
18 0f9d3049 Georgios Gousios
    msg['occurredMillis']  =  int(time.time() * 1000)
19 0f9d3049 Georgios Gousios
    msg['receivedMillis']  =  int(time.time() * 1000)
20 dbc6b371 Georgios Gousios
    msg['userId']         = str(random.randrange(1, 1000))
21 66785e62 Georgios Gousios
    msg['clientId']       = str(random.randrange(1, 4))
22 0f9d3049 Georgios Gousios
    msg['resource']       = random.sample(("bandwidthup", "bandwidthdown",
23 0f9d3049 Georgios Gousios
                                           "vmtime", "diskspace"), 1)[0]
24 66785e62 Georgios Gousios
    msg['eventVersion']   = str(1)
25 66785e62 Georgios Gousios
26 66785e62 Georgios Gousios
    details = {}
27 66785e62 Georgios Gousios
    if msg['resource'] == 'vmtime':
28 66785e62 Georgios Gousios
        msg['value']    = random.randrange(0, 2)
29 66785e62 Georgios Gousios
        details['vmid'] = str(random.randrange(1, 4000))
30 66785e62 Georgios Gousios
    else:
31 66785e62 Georgios Gousios
        msg['value'] = random.randrange(1, 5000)
32 66785e62 Georgios Gousios
    msg['details'] = details
33 66785e62 Georgios Gousios
    return msg
34 66785e62 Georgios Gousios
35 66785e62 Georgios Gousios
def parse_arguments(args):
36 66785e62 Georgios Gousios
    from optparse import OptionParser
37 66785e62 Georgios Gousios
38 66785e62 Georgios Gousios
    parser = OptionParser()
39 66785e62 Georgios Gousios
    parser.add_option("-p", "--passwd", dest="passwd", help="Password")
40 66785e62 Georgios Gousios
    parser.add_option("-r", "--rate", dest="rate", type="int",
41 66785e62 Georgios Gousios
                      help="Number of msgs/sec", default = 5)
42 66785e62 Georgios Gousios
    return parser.parse_args(args)
43 66785e62 Georgios Gousios
44 66785e62 Georgios Gousios
45 66785e62 Georgios Gousios
def exit_handler(signum, frame):
46 66785e62 Georgios Gousios
    print "Number of messages: %d" % num_msgs
47 66785e62 Georgios Gousios
    connection.close()
48 66785e62 Georgios Gousios
    exit
49 66785e62 Georgios Gousios
50 66785e62 Georgios Gousios
(opts, args) = parse_arguments(sys.argv[1:])
51 66785e62 Georgios Gousios
52 66785e62 Georgios Gousios
credentials = pika.PlainCredentials('aquarium', opts.passwd)
53 66785e62 Georgios Gousios
parameters = pika.ConnectionParameters(
54 66785e62 Georgios Gousios
    host = 'aquarium.dev.grnet.gr',
55 66785e62 Georgios Gousios
    credentials = credentials)
56 66785e62 Georgios Gousios
connection = pika.BlockingConnection(parameters)
57 66785e62 Georgios Gousios
58 66785e62 Georgios Gousios
channel = connection.channel()
59 66785e62 Georgios Gousios
channel.exchange_declare(exchange='aquarium', passive = True)
60 66785e62 Georgios Gousios
connection.set_backpressure_multiplier(1000)
61 66785e62 Georgios Gousios
62 66785e62 Georgios Gousios
random.seed(0xdeadbabe)
63 66785e62 Georgios Gousios
64 66785e62 Georgios Gousios
num_msgs = 0
65 66785e62 Georgios Gousios
# signal(SIGTERM, exit_handler)
66 66785e62 Georgios Gousios
# signal(SIGINT, exit_handler)
67 66785e62 Georgios Gousios
oldtime = time.time() * 1000
68 66785e62 Georgios Gousios
old_messages = 0
69 66785e62 Georgios Gousios
70 66785e62 Georgios Gousios
while True:
71 66785e62 Georgios Gousios
    foo = msg()
72 dbc6b371 Georgios Gousios
    print "QUEUE %s %d" % (foo['id'], time.time() * 1000) 
73 66785e62 Georgios Gousios
    # Construct a message and send it
74 dbc6b371 Georgios Gousios
    #print json.dumps(foo)
75 66785e62 Georgios Gousios
    channel.basic_publish(exchange='aquarium',
76 66785e62 Georgios Gousios
                      routing_key='resevent.1.%s' % foo['resource'],
77 66785e62 Georgios Gousios
                      body=json.dumps(foo),
78 66785e62 Georgios Gousios
                      properties=pika.BasicProperties(
79 66785e62 Georgios Gousios
                          content_type="text/plain",
80 66785e62 Georgios Gousios
                          delivery_mode=1))
81 66785e62 Georgios Gousios
    num_msgs += 1
82 66785e62 Georgios Gousios
    newtime = time.time() * 1000
83 66785e62 Georgios Gousios
84 66785e62 Georgios Gousios
    if newtime - oldtime < 1000:
85 0f9d3049 Georgios Gousios
        if num_msgs - old_messages >= opts.rate:
86 66785e62 Georgios Gousios
            toSleep = float(1000 - newtime + oldtime)
87 dbc6b371 Georgios Gousios
            #print "msgs: %d sleeping for %f" % (num_msgs, toSleep)
88 66785e62 Georgios Gousios
            time.sleep(toSleep / 1000)
89 66785e62 Georgios Gousios
            oldtime = newtime
90 66785e62 Georgios Gousios
            old_messages = num_msgs
91 66785e62 Georgios Gousios
    else:
92 66785e62 Georgios Gousios
        oldtime = newtime
93 66785e62 Georgios Gousios
        old_messages = num_msgs