Statistics
| Branch: | Tag: | Revision:

root / doc / arch / loadgen.py @ dbc6b371

History | View | Annotate | Download (2.9 kB)

1
#!/usr/bin/python
2
from signal import SIGTERM, signal, SIGINT
3

    
4
import pika
5
import random
6
import json
7
import hashlib
8
import sys
9
import os
10
import time
11

    
12

    
13
def msg():
14
    msg = {}
15
    msg['id']             = hashlib.sha1(os.urandom(20)).hexdigest()
16
#    msg['occurredMillis'] = random.randrange(1293840000, 1325376000) * 1000
17
#    msg['receivedMillis'] = random.randrange(1293840000, 1325376000) * 1000
18
    msg['occurredMillis']  =  int(time.time() * 1000)
19
    msg['receivedMillis']  =  int(time.time() * 1000)
20
    msg['userId']         = str(random.randrange(1, 1000))
21
    msg['clientId']       = str(random.randrange(1, 4))
22
    msg['resource']       = random.sample(("bandwidthup", "bandwidthdown",
23
                                           "vmtime", "diskspace"), 1)[0]
24
    msg['eventVersion']   = str(1)
25

    
26
    details = {}
27
    if msg['resource'] == 'vmtime':
28
        msg['value']    = random.randrange(0, 2)
29
        details['vmid'] = str(random.randrange(1, 4000))
30
    else:
31
        msg['value'] = random.randrange(1, 5000)
32
    msg['details'] = details
33
    return msg
34

    
35
def parse_arguments(args):
36
    from optparse import OptionParser
37

    
38
    parser = OptionParser()
39
    parser.add_option("-p", "--passwd", dest="passwd", help="Password")
40
    parser.add_option("-r", "--rate", dest="rate", type="int",
41
                      help="Number of msgs/sec", default = 5)
42
    return parser.parse_args(args)
43

    
44

    
45
def exit_handler(signum, frame):
46
    print "Number of messages: %d" % num_msgs
47
    connection.close()
48
    exit
49

    
50
(opts, args) = parse_arguments(sys.argv[1:])
51

    
52
credentials = pika.PlainCredentials('aquarium', opts.passwd)
53
parameters = pika.ConnectionParameters(
54
    host = 'aquarium.dev.grnet.gr',
55
    credentials = credentials)
56
connection = pika.BlockingConnection(parameters)
57

    
58
channel = connection.channel()
59
channel.exchange_declare(exchange='aquarium', passive = True)
60
connection.set_backpressure_multiplier(1000)
61

    
62
random.seed(0xdeadbabe)
63

    
64
num_msgs = 0
65
# signal(SIGTERM, exit_handler)
66
# signal(SIGINT, exit_handler)
67
oldtime = time.time() * 1000
68
old_messages = 0
69

    
70
while True:
71
    foo = msg()
72
    print "QUEUE %s %d" % (foo['id'], time.time() * 1000) 
73
    # Construct a message and send it
74
    #print json.dumps(foo)
75
    channel.basic_publish(exchange='aquarium',
76
                      routing_key='resevent.1.%s' % foo['resource'],
77
                      body=json.dumps(foo),
78
                      properties=pika.BasicProperties(
79
                          content_type="text/plain",
80
                          delivery_mode=1))
81
    num_msgs += 1
82
    newtime = time.time() * 1000
83

    
84
    if newtime - oldtime < 1000:
85
        if num_msgs - old_messages >= opts.rate:
86
            toSleep = float(1000 - newtime + oldtime)
87
            #print "msgs: %d sleeping for %f" % (num_msgs, toSleep)
88
            time.sleep(toSleep / 1000)
89
            oldtime = newtime
90
            old_messages = num_msgs
91
    else:
92
        oldtime = newtime
93
        old_messages = num_msgs