Revision 27a8e4ae

b/snf-cyclades-gtools/synnefo/ganeti/eventd.py
63 63
from synnefo import settings
64 64
from synnefo.lib.amqp import AMQPClient
65 65

  
66

  
66 67
def get_time_from_status(op, job):
67 68
    """Generate a unique message identifier for a ganeti job.
68 69

  
......
74 75
    status = op.status
75 76
    if status == constants.JOB_STATUS_QUEUED:
76 77
        return job.received_timestamp
77
    try: # Compatibility with Ganeti version
78
    try:  # Compatibility with Ganeti version
78 79
        if status == constants.JOB_STATUS_WAITLOCK:
79 80
            return op.start_timestamp
80 81
    except AttributeError:
......
88 89
        # success, canceled, error
89 90
        return op.end_timestamp
90 91

  
91
    raise InvalidBackendState(status, job)
92
    raise InvalidBackendStatus(status, job)
92 93

  
93 94

  
94 95
class InvalidBackendStatus(Exception):
......
100 101
        return repr("Invalid backend status: %s in job %s"
101 102
                    % (self.status, self.job))
102 103

  
104

  
103 105
def prefix_from_name(name):
104 106
    return name.split('-')[0]
105 107

  
......
112 114

  
113 115

  
114 116
class JobFileHandler(pyinotify.ProcessEvent):
115
    def __init__(self, logger):
117
    def __init__(self, logger, cluster_name):
116 118
        pyinotify.ProcessEvent.__init__(self)
117 119
        self.logger = logger
120
        self.cluster_name = cluster_name
121

  
118 122
        self.client = AMQPClient(confirm_buffer=25)
119 123
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
124

  
120 125
        self.client.connect()
121
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
122 126
        handler_logger.info("Connected succesfully")
123 127

  
128
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
129

  
124 130
        self.op_handlers = {"INSTANCE": self.process_instance_op,
125 131
                            "NETWORK": self.process_network_op}
126 132
                            # "GROUP": self.process_group_op}
......
140 146
            return
141 147

  
142 148
        data = serializer.LoadJson(data)
143
        try: # Compatibility with Ganeti version
149
        try:  # Compatibility with Ganeti version
144 150
            job = jqueue._QueuedJob.Restore(None, data, False)
145 151
        except TypeError:
146 152
            job = jqueue._QueuedJob.Restore(None, data)
......
174 180
            msg.update({"event_time": event_time,
175 181
                        "operation": op_id,
176 182
                        "status": op.status,
183
                        "cluster": self.cluster_name,
177 184
                        "logmsg": logmsg,
178 185
                        "jobId": job_id})
179 186

  
......
254 261

  
255 262

  
256 263

  
264
def find_cluster_name():
265
    path = constants.DATA_DIR + "/ssconf_" + constants.SS_CLUSTER_NAME
266
    f = open(path)
267
    name = f.readline().rstrip()
268
    f.close()
269
    return name
257 270

  
258 271
handler_logger = None
259 272
def fatal_signal_handler(signum, frame):
......
335 348
    wm = pyinotify.WatchManager()
336 349
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
337 350
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
338
    handler = JobFileHandler(logger)
351
    cluster_name = find_cluster_name()
352
    handler = JobFileHandler(logger, cluster_name)
339 353
    notifier = pyinotify.Notifier(wm, handler)
340 354

  
341 355
    try:
......
344 358
        if res[constants.QUEUE_DIR] < 0:
345 359
            raise Exception("pyinotify add_watch returned negative descriptor")
346 360

  
347
        logger.info("Now watching %s" % constants.QUEUE_DIR)
361
        logger.info("Now watching %s of %s" % (constants.QUEUE_DIR,
362
                cluster_name))
348 363

  
349 364
        while True:    # loop forever
350 365
            # process the queue of events as explained above

Also available in: Unified diff