Revision 7ca9e930

b/README.develop
182 182
   username in settings.py. Several functional conventions within the system
183 183
   require this variable to include a dash at its end (e.g. snf-)
184 184

  
185
8. Specify a new queue to receive ganeti events, based on the specified
186
   BACKEND_PREFIX_ID. Something like the following at the end of
187
   settings.py should do the trick.
188

  
189
   DB_HANDLER_KEY = 'ganeti.%s.#' % BACKEND_PREFIX_ID.split('-')[0]
190
   BINDINGS[0] = ("events-%s" % BACKEND_PREFIX_ID.split('-')[0], EXCHANGE_GANETI,  DB_HANDLER_KEY, 'update_db')
191
   QUEUES += ("events-%s" % BACKEND_PREFIX_ID.split('-')[0], )
185
8. Fix the AMQP-specific settings based on the selected BACKEND_PREFIX_ID.
186
   The fix_amqp_settings() function is called once at the end of
187
   settings.py.dist, you must call it again if you change BACKEND_PREFIX_ID
188
   at some later point.
192 189

  
193 190
9. Start the system
194 191
    $ ./bin/python logic/dispatcher.py  # DB synch daemon
b/ganeti/ganeti-eventd.py
110 110
                msg["message"] = logmsg
111 111
            
112 112
            instance = instances.split('-')[0]  
113
            routekey = "ganeti.%s.event.%s" % (instance,op.status)
113
            routekey = "ganeti.%s.event.op" % instance
114 114
            
115 115
            self.logger.debug("Delivering msg: %s (key=%s)",
116 116
                json.dumps(msg), routekey)
b/ganeti/hooks.py
20 20

  
21 21
import synnefo.settings as settings
22 22

  
23

  
23 24
def ganeti_net_status(logger, environ):
24 25
    """Produce notifications of type 'Ganeti-net-status'
25 26
    
......
152 153
            
153 154
            self.publish_msgs(notifs)
154 155

  
155
            print >> sys.stderr, "post_start_hook: ", notifs
156

  
157
        return 1
156
        return 0
158 157

  
159 158

  
160 159
class PostStopHook(GanetiHook):
b/logic/dispatcher_callbacks.py
10 10

  
11 11
def update_db(message):
12 12
    """Process the status of a VM based on a ganeti status message"""
13
    _logger.debug("Processing msg: %s" % message.body)
13
    _logger.debug("Processing msg: %s", message.body)
14 14
    try:
15 15
        msg = json.loads(message.body)
16 16

  
17 17
        if msg["type"] != "ganeti-op-status":
18
            _logger.error("Message is of uknown type %s." % (msg["type"],))
18
            _logger.error("Message is of uknown type %s.", msg["type"])
19 19
            return
20 20

  
21 21
        vmid = utils.id_from_instance_name(msg["instance"])
22 22
        vm = VirtualMachine.objects.get(id=vmid)
23 23

  
24 24
        backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
25
        _logger.debug("Done processing msg for vm %s." % (msg["instance"]))
25
        _logger.debug("Done processing msg for vm %s.", msg["instance"])
26 26
        message.channel.basic_ack(message.delivery_tag)
27 27
    except KeyError:
28
        _logger.error("Malformed incoming JSON, missing attributes: " + message.body)
28
        _logger.error("Malformed incoming JSON, missing attributes: %s", message.body)
29 29
    except VirtualMachine.InvalidBackendIdError:
30
        _logger.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
30
        _logger.debug("Ignoring msg for unknown instance %s.", msg["instance"])
31 31
    except VirtualMachine.DoesNotExist:
32
        _logger.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
32
        _logger.error("VM for instance %s with id %d not found in DB.", msg["instance"], vmid)
33 33
    except Exception as e:
34 34
        _logger.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
35 35

  
36

  
37
def update_net(message):
38
    """Process a network status update notification from Ganeti"""
39
    _logger.debug("MINE msg: %s", message.body)
40
    message.channel.basic_ack(message.delivery_tag)
41

  
42

  
36 43
def send_email(message):
37 44
    _logger.debug("Request to send email message")
38 45
    message.channel.basic_ack(message.delivery_tag)
......
46 53
def dummy_proc(message):
47 54
    try:
48 55
        msg = json.loads(message.body)
49
        _logger.debug("Msg (exchange:%s) " % (msg, ))
56
        _logger.debug("Msg (exchange:%s) ", msg)
50 57
        message.channel.basic_ack(message.delivery_tag)
51 58
    except Exception as e:
52 59
        _logger.error("Could not receive message %s" % e.message)
b/settings.py.dist
167 167
GANETI_EVENTD_LOG_FILE = "/var/log/synnefo/ganeti-eventd.log"
168 168
GANETI_EVENTD_PID_FILE = "/var/run/synnefo/ganeti-eventd.pid"
169 169

  
170
#Rabbit work queue end point
170
# Rabbit work queue endpoint
171 171
RABBIT_HOST = "62.217.120.67:5672"
172 172
RABBIT_USERNAME = "okeanos"
173 173
RABBIT_PASSWORD = "0k3@n0s"
174 174
RABBIT_VHOST = "/"
175 175

  
176
# Queues, exchanges and bindings
176
# 
177
# Queues, exchanges and bindings for AMQP
178
#
177 179
EXCHANGE_GANETI = "ganeti"  # Messages from Ganeti
178 180
EXCHANGE_CRON = "cron"      # Messages from periodically triggered tasks
179 181
EXCHANGE_API = "api"        # Messages from the Rest API
......
192 194
    (QUEUE_DEBUG,   EXCHANGE_API,       '#',        'dummy_proc'),
193 195
]
194 196

  
195
#Prefix-based naming for db routing key, only 
196
DB_HANDLER_KEY = 'ganeti.%s.#' % BACKEND_PREFIX_ID.split('-')[0]
197

  
198 197
BINDINGS = [
199
    # Queue                 # Exchange          # RouteKey      #Handler
200
    (QUEUE_GANETI_EVENTS,   EXCHANGE_GANETI,    'ganeti.*',     'update_db'),
201
    (QUEUE_CRON_CREDITS,    EXCHANGE_CRON,      '*.credits.*',  'update_credits'),
202
    (QUEUE_EMAIL,           EXCHANGE_API,       '*.email.*',    'send_email'),
203
    (QUEUE_EMAIL,           EXCHANGE_CRON,      '*.email.*',    'send_email'),
198
    # Queue                 # Exchange          # RouteKey            #Handler
199
    (QUEUE_GANETI_EVENTS,   EXCHANGE_GANETI,    'ganeti.*.event.op',  'update_db'),
200
    (QUEUE_GANETI_EVENTS,   EXCHANGE_GANETI,    'ganeti.*.event.net', 'update_net'),
201
    (QUEUE_CRON_CREDITS,    EXCHANGE_CRON,      '*.credits.*',        'update_credits'),
202
    (QUEUE_EMAIL,           EXCHANGE_API,       '*.email.*',          'send_email'),
203
    (QUEUE_EMAIL,           EXCHANGE_CRON,      '*.email.*',          'send_email'),
204 204
]
205 205

  
206
def fix_amqp_settings(backend_prefix):
207
    """Configure AMQP-specific settings
208

  
209
    Configure AMQP-specific settings based on backend_prefix.
210
    This function *must* be called later in settings.py, with
211
    BACKEND_PREFIX_ID as argument.
212

  
213
    """
214
    global DB_HANDLER_KEY_OP, DB_HANDLER_KEY_NET, BINDINGS, QUEUES
215

  
216
    prefix = backend_prefix.split('-')[0]
217
    DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix    # notifications of type "ganeti-op-status"
218
    DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix  # notifications of type "ganeti-net-status"
219
    BINDINGS[0] = ("events-%s-op" % prefix, EXCHANGE_GANETI, DB_HANDLER_KEY_OP, 'update_db')
220
    BINDINGS[1] = ("events-%s-net" % prefix, EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net')
221
    QUEUES += ("events-%s-op" % prefix, "events-%s-net" % prefix)
222

  
223
# Fix up the AMQP-specific settings based on BACKEND_PREFIX_ID
224
# Make sure to call it again, if you modify it at some later point
225
fix_amqp_settings(BACKEND_PREFIX_ID)
226

  
206 227
# Logic dispatcher settings
207 228
DISPATCHER_LOG_FILE = "/var/log/synnefo/dispatcher.log"
208 229

  

Also available in: Unified diff