Revision a17a8e98

b/snf-cyclades-app/synnefo/logic/callbacks.py
44 44
log = logging.getLogger()
45 45

  
46 46

  
47
def is_update_required(func):
48
    """
49
    Decorator for checking if an incoming message needs to update the db.
50

  
51
    The database will not be updated in the following cases:
52
    - The message has been redelivered and the action has already been
53
      completed. In this case the event_time will be equal with the one
54
      in the database.
55
    - The message describes a previous state in the ganeti, from the one that is
56
      described in the db. In this case the event_time will be smaller from the
57
      one in the database.
47
def handle_message_delivery(func):
48
    """ Generic decorator for handling messages.
58 49

  
59
    This decorator is also acknowledging the messages to the AMQP broker.
50
    This decorator is responsible for converting the message into json format,
51
    handling of common exceptions and acknowledment of message if needed.
60 52

  
61 53
    """
62 54
    @wraps(func)
63 55
    def wrapper(client, message, *args, **kwargs):
64 56
        try:
65 57
            msg = json.loads(message['body'])
66

  
67
            event_time = merge_time(msg['event_time'])
68

  
69
            vm_id = utils.id_from_instance_name(msg["instance"])
70
            vm = VirtualMachine.objects.get(id=vm_id)
71

  
72
            db_time = vm.backendtime
73
            if event_time <= db_time:
74
                format_ = "%d/%m/%y %H:%M:%S:%f"
75
                log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
76
                          message,
77
                          event_time.strftime(format_),
78
                          db_time.strftime(format_))
79
                client.basic_ack(message)
80
                return
81

  
82
            # New message. Update the database!
83
            func(vm, msg)
84

  
58
            func(msg)
59
            client.basic_ack(message)
85 60
        except ValueError:
86 61
            log.error("Incoming message not in JSON format: %s", message)
87 62
            client.basic_ack(message)
......
89 64
            log.error("Malformed incoming JSON, missing attributes: %s",
90 65
                      message)
91 66
            client.basic_ack(message)
67
        except Exception as e:
68
            log.exception("Unexpected error: %s, msg: %s", e, msg)
69

  
70
    return wrapper
71

  
72
def instance_from_msg(func):
73
    """ Decorator for getting the VirtualMachine object of the msg.
74

  
75
    """
76
    @handle_message_delivery
77
    @wraps(func)
78
    def wrapper(msg):
79
        try:
80
            vm_id = utils.id_from_instance_name(msg["instance"])
81
            vm = VirtualMachine.objects.get(id=vm_id)
82
            func(vm, msg)
92 83
        except VirtualMachine.InvalidBackendIdError:
93 84
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
94
            client.basic_ack(message)
95 85
        except VirtualMachine.DoesNotExist:
96 86
            log.error("VM for instance %s with id %d not found in DB.",
97 87
                      msg['instance'], vm_id)
98
            client.basic_ack(message)
99
        except Exception as e:
100
            log.exception("Unexpected error: %s, msg: %s", e, msg)
101
        else:
102
            # Acknowledge the message
103
            client.basic_ack(message)
88
    return wrapper
89

  
90
def network_from_msg(func):
91
    """ Decorator for getting the Network object of the msg.
92

  
93
    """
94
    @handle_message_delivery
95
    @wraps(func)
96
    def wrapper(msg):
97
        try:
98
            network_id = utils.id_from_network_name(msg["network"])
99
            network = Network.objects.get(id=network_id)
100
            func(network, msg)
101
        except Network.InvalidBackendIdError:
102
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
103
        except Network.DoesNotExist:
104
            log.error("Network %s with id %d not found in DB.",
105
                      msg['network'], vm_id)
106
    return wrapper
107

  
108
def if_update_required(func):
109
    """
110
    Decorator for checking if an incoming message needs to update the db.
111

  
112
    The database will not be updated in the following cases:
113
    - The message has been redelivered and the action has already been
114
      completed. In this case the event_time will be equal with the one
115
      in the database.
116
    - The message describes a previous state in the ganeti, from the one that is
117
      described in the db. In this case the event_time will be smaller from the
118
      one in the database.
119

  
120
    """
121
    @wraps(func)
122
    def wrapper(target, msg):
123
        event_time = merge_time(msg['event_time'])
124
        db_time = target.backendtime
125

  
126
        if event_time <= db_time:
127
            format_ = "%d/%m/%y %H:%M:%S:%f"
128
            log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
129
                      msg,
130
                      event_time.strftime(format_),
131
                      db_time.strftime(format_))
132
            return
133
        # New message. Update the database!
134
        func(target, msg, event_time)
104 135

  
105 136
    return wrapper
106 137

  
107 138

  
108
@is_update_required
109
def update_db(vm, msg):
139
@instance_from_msg
140
@if_update_required
141
def update_db(vm, msg, event_time):
110 142
    """Process a notification of type 'ganeti-op-status'"""
111 143
    log.debug("Processing ganeti-op-status msg: %s", msg)
112 144

  
......
114 146
        log.error("Message is of unknown type %s.", msg['type'])
115 147
        return
116 148

  
117
    event_time = merge_time(msg['event_time'])
118 149
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
119 150
                              msg['status'], msg['logmsg'])
120 151

  
......
122 153
              msg['instance'])
123 154

  
124 155

  
125
@is_update_required
126
def update_net(vm, msg):
156
@instance_from_msg
157
@if_update_required
158
def update_net(vm, msg, event_time):
127 159
    """Process a notification of type 'ganeti-net-status'"""
128 160
    log.debug("Processing ganeti-net-status msg: %s", msg)
129 161

  
......
131 163
        log.error("Message is of unknown type %s", msg['type'])
132 164
        return
133 165

  
134
    event_time = merge_time(msg['event_time'])
135 166
    backend.process_net_status(vm, event_time, msg['nics'])
136 167

  
137 168
    log.debug("Done processing ganeti-net-status msg for vm %s.",
138 169
              msg["instance"])
139 170

  
140 171

  
141
@is_update_required
142
def update_build_progress(vm, msg):
172
@network_from_msg
173
@if_update_required
174
def update_network(network, msg, event_time):
175
    """Process a notification of type 'ganeti-network-status'"""
176
    log.debug("Processing ganeti-network-status msg: %s", msg)
177

  
178
    if msg['type'] != "ganeti-network-status":
179
        log.error("Message is of unknown type %s.", msg['type'])
180
        return
181

  
182

  
183
    log.debug("Done processing ganeti-network-status msg for vm %s.",
184
              msg['instance'])
185

  
186

  
187
@instance_from_msg
188
@if_update_required
189
def update_build_progress(vm, msg, event_time):
143 190
    """Process a create progress message"""
144 191
    log.debug("Processing ganeti-create-progress msg: %s", msg)
145 192

  
......
147 194
        log.error("Message is of unknown type %s", msg['type'])
148 195
        return
149 196

  
150
    event_time = merge_time(msg['event_time'])
151 197
    backend.process_create_progress(vm, event_time, msg['rprogress'], None)
152 198

  
153 199
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
154 200
              msg['instance'])
155 201

  
156 202

  
157
def dummy_proc(client, message):
203
def dummy_proc(client, message, *args, **kwargs):
158 204
    try:
159 205
        log.debug("Msg: %s", message['body'])
160 206
        client.basic_ack(message)
b/snf-cyclades-app/synnefo/logic/dispatcher.py
142 142
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
143 143

  
144 144
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
145
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
145 146
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
146 147
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
147 148
    QUEUE_RECONC = "%s-reconciliation" % prefix
148 149
    if settings.DEBUG is True:
149 150
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
150 151

  
151
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
152
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
152 153
              QUEUE_GANETI_BUILD_PROGR)
153 154

  
154 155
    # notifications of type "ganeti-op-status"
155 156
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
157
    # notifications of type "ganeti-network-status"
158
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
156 159
    # notifications of type "ganeti-net-status"
157 160
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
158 161
    # notifications of type "ganeti-create-progress"
......
163 166
    BINDINGS = [
164 167
    # Queue                   # Exchange                # RouteKey              # Handler
165 168
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
169
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
166 170
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
167 171
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
168 172
    ]

Also available in: Unified diff