root / doc / arch / loadgen.py @ dbc6b371
History | View | Annotate | Download (2.9 kB)
1 | 66785e62 | Georgios Gousios | #!/usr/bin/python
|
---|---|---|---|
2 | 66785e62 | Georgios Gousios | from signal import SIGTERM, signal, SIGINT |
3 | 66785e62 | Georgios Gousios | |
4 | 66785e62 | Georgios Gousios | import pika |
5 | 66785e62 | Georgios Gousios | import random |
6 | 66785e62 | Georgios Gousios | import json |
7 | 66785e62 | Georgios Gousios | import hashlib |
8 | 66785e62 | Georgios Gousios | import sys |
9 | 66785e62 | Georgios Gousios | import os |
10 | 66785e62 | Georgios Gousios | import time |
11 | 66785e62 | Georgios Gousios | |
12 | 66785e62 | Georgios Gousios | |
13 | 66785e62 | Georgios Gousios | def msg(): |
14 | 66785e62 | Georgios Gousios | msg = {} |
15 | 66785e62 | Georgios Gousios | msg['id'] = hashlib.sha1(os.urandom(20)).hexdigest() |
16 | 0f9d3049 | Georgios Gousios | # msg['occurredMillis'] = random.randrange(1293840000, 1325376000) * 1000
|
17 | 0f9d3049 | Georgios Gousios | # msg['receivedMillis'] = random.randrange(1293840000, 1325376000) * 1000
|
18 | 0f9d3049 | Georgios Gousios | msg['occurredMillis'] = int(time.time() * 1000) |
19 | 0f9d3049 | Georgios Gousios | msg['receivedMillis'] = int(time.time() * 1000) |
20 | dbc6b371 | Georgios Gousios | msg['userId'] = str(random.randrange(1, 1000)) |
21 | 66785e62 | Georgios Gousios | msg['clientId'] = str(random.randrange(1, 4)) |
22 | 0f9d3049 | Georgios Gousios | msg['resource'] = random.sample(("bandwidthup", "bandwidthdown", |
23 | 0f9d3049 | Georgios Gousios | "vmtime", "diskspace"), 1)[0] |
24 | 66785e62 | Georgios Gousios | msg['eventVersion'] = str(1) |
25 | 66785e62 | Georgios Gousios | |
26 | 66785e62 | Georgios Gousios | details = {} |
27 | 66785e62 | Georgios Gousios | if msg['resource'] == 'vmtime': |
28 | 66785e62 | Georgios Gousios | msg['value'] = random.randrange(0, 2) |
29 | 66785e62 | Georgios Gousios | details['vmid'] = str(random.randrange(1, 4000)) |
30 | 66785e62 | Georgios Gousios | else:
|
31 | 66785e62 | Georgios Gousios | msg['value'] = random.randrange(1, 5000) |
32 | 66785e62 | Georgios Gousios | msg['details'] = details
|
33 | 66785e62 | Georgios Gousios | return msg
|
34 | 66785e62 | Georgios Gousios | |
35 | 66785e62 | Georgios Gousios | def parse_arguments(args): |
36 | 66785e62 | Georgios Gousios | from optparse import OptionParser |
37 | 66785e62 | Georgios Gousios | |
38 | 66785e62 | Georgios Gousios | parser = OptionParser() |
39 | 66785e62 | Georgios Gousios | parser.add_option("-p", "--passwd", dest="passwd", help="Password") |
40 | 66785e62 | Georgios Gousios | parser.add_option("-r", "--rate", dest="rate", type="int", |
41 | 66785e62 | Georgios Gousios | help="Number of msgs/sec", default = 5) |
42 | 66785e62 | Georgios Gousios | return parser.parse_args(args)
|
43 | 66785e62 | Georgios Gousios | |
44 | 66785e62 | Georgios Gousios | |
45 | 66785e62 | Georgios Gousios | def exit_handler(signum, frame): |
46 | 66785e62 | Georgios Gousios | print "Number of messages: %d" % num_msgs |
47 | 66785e62 | Georgios Gousios | connection.close() |
48 | 66785e62 | Georgios Gousios | exit
|
49 | 66785e62 | Georgios Gousios | |
50 | 66785e62 | Georgios Gousios | (opts, args) = parse_arguments(sys.argv[1:])
|
51 | 66785e62 | Georgios Gousios | |
52 | 66785e62 | Georgios Gousios | credentials = pika.PlainCredentials('aquarium', opts.passwd)
|
53 | 66785e62 | Georgios Gousios | parameters = pika.ConnectionParameters( |
54 | 66785e62 | Georgios Gousios | host = 'aquarium.dev.grnet.gr',
|
55 | 66785e62 | Georgios Gousios | credentials = credentials) |
56 | 66785e62 | Georgios Gousios | connection = pika.BlockingConnection(parameters) |
57 | 66785e62 | Georgios Gousios | |
58 | 66785e62 | Georgios Gousios | channel = connection.channel() |
59 | 66785e62 | Georgios Gousios | channel.exchange_declare(exchange='aquarium', passive = True) |
60 | 66785e62 | Georgios Gousios | connection.set_backpressure_multiplier(1000)
|
61 | 66785e62 | Georgios Gousios | |
62 | 66785e62 | Georgios Gousios | random.seed(0xdeadbabe)
|
63 | 66785e62 | Georgios Gousios | |
64 | 66785e62 | Georgios Gousios | num_msgs = 0
|
65 | 66785e62 | Georgios Gousios | # signal(SIGTERM, exit_handler)
|
66 | 66785e62 | Georgios Gousios | # signal(SIGINT, exit_handler)
|
67 | 66785e62 | Georgios Gousios | oldtime = time.time() * 1000
|
68 | 66785e62 | Georgios Gousios | old_messages = 0
|
69 | 66785e62 | Georgios Gousios | |
70 | 66785e62 | Georgios Gousios | while True: |
71 | 66785e62 | Georgios Gousios | foo = msg() |
72 | dbc6b371 | Georgios Gousios | print "QUEUE %s %d" % (foo['id'], time.time() * 1000) |
73 | 66785e62 | Georgios Gousios | # Construct a message and send it
|
74 | dbc6b371 | Georgios Gousios | #print json.dumps(foo)
|
75 | 66785e62 | Georgios Gousios | channel.basic_publish(exchange='aquarium',
|
76 | 66785e62 | Georgios Gousios | routing_key='resevent.1.%s' % foo['resource'], |
77 | 66785e62 | Georgios Gousios | body=json.dumps(foo), |
78 | 66785e62 | Georgios Gousios | properties=pika.BasicProperties( |
79 | 66785e62 | Georgios Gousios | content_type="text/plain",
|
80 | 66785e62 | Georgios Gousios | delivery_mode=1))
|
81 | 66785e62 | Georgios Gousios | num_msgs += 1
|
82 | 66785e62 | Georgios Gousios | newtime = time.time() * 1000
|
83 | 66785e62 | Georgios Gousios | |
84 | 66785e62 | Georgios Gousios | if newtime - oldtime < 1000: |
85 | 0f9d3049 | Georgios Gousios | if num_msgs - old_messages >= opts.rate:
|
86 | 66785e62 | Georgios Gousios | toSleep = float(1000 - newtime + oldtime) |
87 | dbc6b371 | Georgios Gousios | #print "msgs: %d sleeping for %f" % (num_msgs, toSleep)
|
88 | 66785e62 | Georgios Gousios | time.sleep(toSleep / 1000)
|
89 | 66785e62 | Georgios Gousios | oldtime = newtime |
90 | 66785e62 | Georgios Gousios | old_messages = num_msgs |
91 | 66785e62 | Georgios Gousios | else:
|
92 | 66785e62 | Georgios Gousios | oldtime = newtime |
93 | 66785e62 | Georgios Gousios | old_messages = num_msgs |