Revision 39ccdb18 snf-cyclades-gtools/synnefo/ganeti/eventd.py

b/snf-cyclades-gtools/synnefo/ganeti/eventd.py
73 73
    status = op.status
74 74
    if status == constants.JOB_STATUS_QUEUED:
75 75
        return job.received_timestamp
76
    if status == constants.JOB_STATUS_WAITLOCK:
77
    #if status == constants.JOB_STATUS_WAITING:
78
        return op.start_timestamp
76
    try: # Compatibility with Ganeti version
77
        if status == constants.JOB_STATUS_WAITLOCK:
78
            return op.start_timestamp
79
    except AttributeError:
80
        if status == constants.JOB_STATUS_WAITING:
81
            return op.start_timestamp
79 82
    if status == constants.JOB_STATUS_CANCELING:
80 83
        return op.start_timestamp
81 84
    if status == constants.JOB_STATUS_RUNNING:
......
96 99
        return repr("Invalid backend status: %s in job %s"
97 100
                    % (self.status, self.job))
98 101

  
102
def prefix_from_name(name):
103
    return name.split('-')[0]
104

  
105

  
106
def get_field(from_, field):
107
    try:
108
        return getattr(from_, field)
109
    except AttributeError:
110
        None
111

  
99 112

  
100 113
class JobFileHandler(pyinotify.ProcessEvent):
101 114
    def __init__(self, logger):
......
106 119
        self.client.connect()
107 120
        handler_logger.info("Connected succesfully")
108 121

  
122
        self.op_handlers = {"INSTANCE": self.process_instance_op,
123
                            "NETWORK": self.process_network_op}
124
                            # "GROUP": self.process_group_op}
125

  
109 126
    def process_IN_CLOSE_WRITE(self, event):
110 127
        self.process_IN_MOVED_TO(event)
111 128

  
......
121 138
            return
122 139

  
123 140
        data = serializer.LoadJson(data)
124
        try: # Version compatibility issue with Ganeti
141
        try: # Compatibility with Ganeti version
125 142
            job = jqueue._QueuedJob.Restore(None, data, False)
126 143
        except TypeError:
127 144
            job = jqueue._QueuedJob.Restore(None, data)
128 145

  
146
        job_id = int(job.id)
129 147

  
130 148
        for op in job.ops:
131
            instances = None
132
            try:
133
                instances = " ".join(op.input.instances)
134
            except AttributeError:
135
                pass
149
            op_id = op.input.OP_ID
136 150

  
151
            msg = None
137 152
            try:
138
                instances = op.input.instance_name
139
            except AttributeError:
153
                handler_fn = self.op_handlers[op_id.split('_')[1]]
154
                msg, routekey = handler_fn(op, job_id)
155
            except KeyError:
140 156
                pass
141 157

  
142
            if not instances or len(instances.split(" ")) != 1:
143
                # Do not publish messages for jobs with no or multiple
144
                # instances.
145
                # Currently snf-dispatcher can not normally handle these messages
146
                self.logger.debug("Ignoring Job: %d: %s(%s)", int(job.id),
147
                                  op.input.OP_ID, instances)
158
            if not msg:
159
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
148 160
                continue
149 161

  
162
            # Generate a unique message identifier
163
            event_time = get_time_from_status(op, job)
164

  
150 165
            # Get the last line of the op log as message
151 166
            try:
152 167
                logmsg = op.log[-1][-1]
153 168
            except IndexError:
154 169
                logmsg = None
155 170

  
156
            # Generate a unique message identifier
157
            event_time = get_time_from_status(op, job)
171
            # Add shared attributes for all operations
172
            msg.update({"event_time": event_time,
173
                        "operation": op_id,
174
                        "status": op.status,
175
                        "logmsg": logmsg,
176
                        "jobId": job_id})
158 177

  
159
            self.logger.debug("Job: %d: %s(%s) %s %s",
160
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
161

  
162
            # Construct message
163
            msg = {
164
                    "event_time" : event_time,
165
                    "type": "ganeti-op-status",
166
                    "instance": instances,
167
                    "operation": op.input.OP_ID,
168
                    "jobId": int(job.id),
169
                    "status": op.status,
170
                    "logmsg": logmsg
171
                    }
172

  
173
            instance_prefix = instances.split('-')[0]
174
            routekey = "ganeti.%s.event.op" % instance_prefix
175

  
176
            self.logger.debug("Delivering msg: %s (key=%s)", json.dumps(msg),
177
                              routekey)
178 178
            msg = json.dumps(msg)
179
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
179 180

  
180 181
            # Send the message to RabbitMQ
181 182
            self.client.basic_publish(exchange=settings.EXCHANGE_GANETI,
182 183
                                      routing_key=routekey,
183 184
                                      body=msg)
184 185

  
186

  
187
    def process_instance_op(self, op, job_id):
188
        """ Process OP_INSTANCE_* opcodes.
189

  
190
        """
191
        input = op.input
192
        op_id = input.OP_ID
193

  
194
        instances = None
195
        instances = get_field(input, 'instance_name')
196
        if not instances:
197
            instances = get_field(input, 'instances')
198
            if not instances or len(instances) > 1:
199
                # Do not publish messages for jobs with no or multiple
200
                # instances.
201
                # Currently snf-dispatcher can not normally handle these messages
202
                return None, None
203
            else:
204
                instances = instances[0]
205

  
206
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
207
                          instances, op.status)
208

  
209
        msg = {"type": "ganeti-op-status",
210
               "instance": instances,
211
               "operation": op_id}
212

  
213
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
214

  
215
        return msg, routekey
216

  
217
    def process_network_op(self, op, job_id):
218
        """ Process OP_NETWORK_* opcodes.
219

  
220
        """
221

  
222
        input = op.input
223
        op_id = input.OP_ID
224
        network_name = get_field(input, 'network_name')
225

  
226
        if not network_name:
227
            return None, None
228

  
229
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
230
                          network_name, op.status)
231

  
232
        msg = {'operation':    op_id,
233
               'type':         "ganeti-network-status",
234
               'network':      network_name,
235
               'subnet':       get_field(input, 'network'),
236
               # 'network_mode': get_field(input, 'network_mode'),
237
               # 'network_link': get_field(input, 'network_link'),
238
               'gateway':      get_field(input, 'gateway'),
239
               # 'reserved_ips': get_field(input, 'reserved_ips'),
240
               'group_name':   get_field(input, 'group_name')}
241

  
242
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
243

  
244
        return msg, routekey
245

  
246

  
247
    # def process_group_op(self, op, job_id):
248
    #     """ Process OP_GROUP_* opcodes.
249

  
250
    #     """
251
    #     return None, None
252

  
253

  
254

  
255

  
256

  
185 257
handler_logger = None
186 258
def fatal_signal_handler(signum, frame):
187 259
    global handler_logger

Also available in: Unified diff