Revision 8007ba7b

b/logic/dispatcher_callbacks.py
92 92
    try:
93 93
        msg = json.loads(message.body)
94 94

  
95
        if msg["type"] != "" :
95
        if msg["type"] != "reconciliate" :
96 96
             _logger.error("Message is of unknown type %s", msg["type"])
97 97
             return
98 98

  
b/logic/management/commands/reconciliate.py
1
#
2
# Reconciliate VM state - Management Script
3
#
4
# Copyright 2010 Greek Research and Technology Network
5
#
6

  
7
from django.core.management.base import NoArgsCommand
8
from synnefo.db.models import VirtualMachine
9
from django.conf import settings
10
from datetime import datetime, timedelta
11

  
12
from amqplib import client_0_8 as amqp
13

  
14
import time
15
import socket
16
import json
17

  
18
class Command(NoArgsCommand):
19
    help = 'Reconciliate VM status with the backend'
20

  
21
    def open_channel(self):
22
        conn = None
23
        while conn == None:
24
            try:
25
                conn = amqp.Connection( host=settings.RABBIT_HOST,
26
                     userid=settings.RABBIT_USERNAME,
27
                     password=settings.RABBIT_PASSWORD,
28
                     virtual_host=settings.RABBIT_VHOST)
29
            except socket.error:
30
                time.sleep(1)
31
                pass
32

  
33
        return conn.channel()
34

  
35
    def handle_noargs(self, **options):
36

  
37
        now = datetime.now()
38
        last_update = timedelta(minutes = 30)
39
        not_updated = VirtualMachine.objects.filter(updated__lte = (now - last_update))
40
        all =  VirtualMachine.objects.all()
41

  
42
        to_update = all.count() / settings.RECONCILIATION_MIN
43

  
44
        vm_ids = map(lambda x: x.vm_id,  all.filter()) #TODO: Fix filtering
45
        sent = False
46

  
47
        for vmid in vm_ids :
48
            while sent is False:
49
                try:
50
                    msg = dict(type = "reconciliate", vmid = vmid)
51
                    self.chan.basic_publish(json.dumps(msg),
52
                            exchange=settings.EXCHANGE_CRON,
53
                            routing_key="reconciliation.%s", vmid)
54
                    sent = True
55
                except socket.error:
56
                    self.chan = self.open_channel()
57
                except Exception:
58
                    raise
59

  
60

  
61
        print "All:%d, Not Updated:%d, Triggered update for:%d" % (all.count(), not_updated.count(), vm_ids)
b/settings.py.dist
183 183
QUEUE_GANETI_EVENTS = "events"
184 184
QUEUE_CRON_CREDITS = "credits"
185 185
QUEUE_EMAIL = "email"
186
QUEUE_API = "api"
187
QUEUE_RECONC = "reconciliation"
186 188
QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
187
QUEUES = (QUEUE_GANETI_EVENTS, QUEUE_CRON_CREDITS, QUEUE_EMAIL)
189
QUEUES = (QUEUE_GANETI_EVENTS, QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_API)
188 190

  
189 191
BINDINGS_DEBUG = [
190 192
    # Queue         # Exchange          # RouteKey  # Handler
......
200 202
    (QUEUE_CRON_CREDITS,    EXCHANGE_CRON,      '*.credits.*',        'update_credits'),
201 203
    (QUEUE_EMAIL,           EXCHANGE_API,       '*.email.*',          'send_email'),
202 204
    (QUEUE_EMAIL,           EXCHANGE_CRON,      '*.email.*',          'send_email'),
203
    (QUEUE_API,             EXCHANGE_API,       '*.email.*',          'send_email'),
205
    (QUEUE_RECONC,          EXCHANGE_CRON,      'reconciliation.*',   'trigger_status_update'),
204 206
]
205 207

  
206 208
def fix_amqp_settings(backend_prefix):
......
240 242
#          work after this many hours after 2011/05/10
241 243
AUTH_TOKEN_DURATION = 30 * 24
242 244

  
243
# Number of minutes between reconciliations
245
# Minutes between reconciliations
244 246
RECONCILIATION_MIN = 30

Also available in: Unified diff