Revision c4e55622
b/snf-common/synnefo/lib/utils.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
import datetime |
|
35 |
|
|
36 |
def split_time(value): |
|
37 |
"""Splits time as floating point number into a tuple. |
|
38 |
|
|
39 |
@param value: Time in seconds |
|
40 |
@type value: int or float |
|
41 |
@return: Tuple containing (seconds, microseconds) |
|
42 |
|
|
43 |
""" |
|
44 |
(seconds, microseconds) = divmod(int(value * 1000000), 1000000) |
|
45 |
|
|
46 |
assert 0 <= seconds, \ |
|
47 |
"Seconds must be larger than or equal to 0, but are %s" % seconds |
|
48 |
assert 0 <= microseconds <= 999999, \ |
|
49 |
"Microseconds must be 0-999999, but are %s" % microseconds |
|
50 |
|
|
51 |
return (int(seconds), int(microseconds)) |
|
52 |
|
|
53 |
|
|
54 |
def merge_time(timetuple): |
|
55 |
"""Merges a tuple into a datetime object |
|
56 |
|
|
57 |
@param timetuple: Time as tuple, (seconds, microseconds) |
|
58 |
@type timetuple: tuple |
|
59 |
@return: Time as a datetime object |
|
60 |
|
|
61 |
""" |
|
62 |
(seconds, microseconds) = timetuple |
|
63 |
|
|
64 |
assert 0 <= seconds, \ |
|
65 |
"Seconds must be larger than or equal to 0, but are %s" % seconds |
|
66 |
assert 0 <= microseconds <= 999999, \ |
|
67 |
"Microseconds must be 0-999999, but are %s" % microseconds |
|
68 |
|
|
69 |
t1 = float(seconds) + (float(microseconds) * 0.000001) |
|
70 |
return datetime.datetime.fromtimestamp(t1) |
b/snf-cyclades-app/synnefo/db/migrations/0033_auto__chg_field_networklink_available__add_field_virtualmachine_backen.py | ||
---|---|---|
1 |
# encoding: utf-8 |
|
2 |
import datetime |
|
3 |
from south.db import db |
|
4 |
from south.v2 import SchemaMigration |
|
5 |
from django.db import models |
|
6 |
|
|
7 |
class Migration(SchemaMigration): |
|
8 |
|
|
9 |
def forwards(self, orm): |
|
10 |
|
|
11 |
# Changing field 'NetworkLink.available' |
|
12 |
db.alter_column('db_networklink', 'available', self.gf('django.db.models.fields.BooleanField')(blank=True)) |
|
13 |
|
|
14 |
# Adding field 'VirtualMachine.backendtime' |
|
15 |
db.add_column('db_virtualmachine', 'backendtime', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(1, 1, 1, 0, 0)), keep_default=False) |
|
16 |
|
|
17 |
# Changing field 'VirtualMachine.deleted' |
|
18 |
db.alter_column('db_virtualmachine', 'deleted', self.gf('django.db.models.fields.BooleanField')(blank=True)) |
|
19 |
|
|
20 |
# Changing field 'VirtualMachine.suspended' |
|
21 |
db.alter_column('db_virtualmachine', 'suspended', self.gf('django.db.models.fields.BooleanField')(blank=True)) |
|
22 |
|
|
23 |
# Changing field 'Flavor.deleted' |
|
24 |
db.alter_column('db_flavor', 'deleted', self.gf('django.db.models.fields.BooleanField')(blank=True)) |
|
25 |
|
|
26 |
# Changing field 'Network.public' |
|
27 |
db.alter_column('db_network', 'public', self.gf('django.db.models.fields.BooleanField')(blank=True)) |
|
28 |
|
|
29 |
|
|
30 |
def backwards(self, orm): |
|
31 |
|
|
32 |
# Changing field 'NetworkLink.available' |
|
33 |
db.alter_column('db_networklink', 'available', self.gf('django.db.models.fields.BooleanField')()) |
|
34 |
|
|
35 |
# Deleting field 'VirtualMachine.backendtime' |
|
36 |
db.delete_column('db_virtualmachine', 'backendtime') |
|
37 |
|
|
38 |
# Changing field 'VirtualMachine.deleted' |
|
39 |
db.alter_column('db_virtualmachine', 'deleted', self.gf('django.db.models.fields.BooleanField')()) |
|
40 |
|
|
41 |
# Changing field 'VirtualMachine.suspended' |
|
42 |
db.alter_column('db_virtualmachine', 'suspended', self.gf('django.db.models.fields.BooleanField')()) |
|
43 |
|
|
44 |
# Changing field 'Flavor.deleted' |
|
45 |
db.alter_column('db_flavor', 'deleted', self.gf('django.db.models.fields.BooleanField')()) |
|
46 |
|
|
47 |
# Changing field 'Network.public' |
|
48 |
db.alter_column('db_network', 'public', self.gf('django.db.models.fields.BooleanField')()) |
|
49 |
|
|
50 |
|
|
51 |
models = { |
|
52 |
'db.flavor': { |
|
53 |
'Meta': {'unique_together': "(('cpu', 'ram', 'disk', 'disk_template'),)", 'object_name': 'Flavor'}, |
|
54 |
'cpu': ('django.db.models.fields.IntegerField', [], {'default': '0'}), |
|
55 |
'deleted': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'blank': 'True'}), |
|
56 |
'disk': ('django.db.models.fields.IntegerField', [], {'default': '0'}), |
|
57 |
'disk_template': ('django.db.models.fields.CharField', [], {'default': "'drbd'", 'max_length': '32'}), |
|
58 |
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), |
|
59 |
'ram': ('django.db.models.fields.IntegerField', [], {'default': '0'}) |
|
60 |
}, |
|
61 |
'db.network': { |
|
62 |
'Meta': {'object_name': 'Network'}, |
|
63 |
'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), |
|
64 |
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), |
|
65 |
'link': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'to': "orm['db.NetworkLink']"}), |
|
66 |
'machines': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['db.VirtualMachine']", 'through': "orm['db.NetworkInterface']", 'symmetrical': 'False'}), |
|
67 |
'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}), |
|
68 |
'public': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'blank': 'True'}), |
|
69 |
'state': ('django.db.models.fields.CharField', [], {'max_length': '30'}), |
|
70 |
'updated': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'}), |
|
71 |
'userid': ('django.db.models.fields.CharField', [], {'max_length': '100', 'null': 'True'}) |
|
72 |
}, |
|
73 |
'db.networkinterface': { |
|
74 |
'Meta': {'object_name': 'NetworkInterface'}, |
|
75 |
'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), |
|
76 |
'firewall_profile': ('django.db.models.fields.CharField', [], {'max_length': '30', 'null': 'True'}), |
|
77 |
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), |
|
78 |
'index': ('django.db.models.fields.IntegerField', [], {'null': 'True'}), |
|
79 |
'ipv4': ('django.db.models.fields.CharField', [], {'max_length': '15', 'null': 'True'}), |
|
80 |
'ipv6': ('django.db.models.fields.CharField', [], {'max_length': '100', 'null': 'True'}), |
|
81 |
'mac': ('django.db.models.fields.CharField', [], {'max_length': '17', 'null': 'True'}), |
|
82 |
'machine': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'nics'", 'to': "orm['db.VirtualMachine']"}), |
|
83 |
'network': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'nics'", 'to': "orm['db.Network']"}), |
|
84 |
'updated': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'}) |
|
85 |
}, |
|
86 |
'db.networklink': { |
|
87 |
'Meta': {'object_name': 'NetworkLink'}, |
|
88 |
'available': ('django.db.models.fields.BooleanField', [], {'default': 'True', 'blank': 'True'}), |
|
89 |
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), |
|
90 |
'index': ('django.db.models.fields.IntegerField', [], {}), |
|
91 |
'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}), |
|
92 |
'network': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': "orm['db.Network']"}) |
|
93 |
}, |
|
94 |
'db.virtualmachine': { |
|
95 |
'Meta': {'object_name': 'VirtualMachine'}, |
|
96 |
'action': ('django.db.models.fields.CharField', [], {'max_length': '30', 'null': 'True'}), |
|
97 |
'backendjobid': ('django.db.models.fields.PositiveIntegerField', [], {'null': 'True'}), |
|
98 |
'backendjobstatus': ('django.db.models.fields.CharField', [], {'max_length': '30', 'null': 'True'}), |
|
99 |
'backendlogmsg': ('django.db.models.fields.TextField', [], {'null': 'True'}), |
|
100 |
'backendopcode': ('django.db.models.fields.CharField', [], {'max_length': '30', 'null': 'True'}), |
|
101 |
'backendtime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(1, 1, 1, 0, 0)'}), |
|
102 |
'buildpercentage': ('django.db.models.fields.IntegerField', [], {'default': '0'}), |
|
103 |
'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), |
|
104 |
'deleted': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'blank': 'True'}), |
|
105 |
'flavor': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['db.Flavor']"}), |
|
106 |
'hostid': ('django.db.models.fields.CharField', [], {'max_length': '100'}), |
|
107 |
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), |
|
108 |
'imageid': ('django.db.models.fields.CharField', [], {'max_length': '100'}), |
|
109 |
'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}), |
|
110 |
'operstate': ('django.db.models.fields.CharField', [], {'max_length': '30', 'null': 'True'}), |
|
111 |
'suspended': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'blank': 'True'}), |
|
112 |
'updated': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'}), |
|
113 |
'userid': ('django.db.models.fields.CharField', [], {'max_length': '100'}) |
|
114 |
}, |
|
115 |
'db.virtualmachinemetadata': { |
|
116 |
'Meta': {'unique_together': "(('meta_key', 'vm'),)", 'object_name': 'VirtualMachineMetadata'}, |
|
117 |
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), |
|
118 |
'meta_key': ('django.db.models.fields.CharField', [], {'max_length': '50'}), |
|
119 |
'meta_value': ('django.db.models.fields.CharField', [], {'max_length': '500'}), |
|
120 |
'vm': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'metadata'", 'to': "orm['db.VirtualMachine']"}) |
|
121 |
} |
|
122 |
} |
|
123 |
|
|
124 |
complete_apps = ['db'] |
b/snf-cyclades-app/synnefo/db/models.py | ||
---|---|---|
27 | 27 |
# those of the authors and should not be interpreted as representing official |
28 | 28 |
# policies, either expressed or implied, of GRNET S.A. |
29 | 29 |
|
30 |
import datetime |
|
31 |
|
|
30 | 32 |
from django.conf import settings |
31 | 33 |
from django.db import models |
32 | 34 |
|
... | ... | |
167 | 169 |
max_length=30, null=True) |
168 | 170 |
backendlogmsg = models.TextField(null=True) |
169 | 171 |
buildpercentage = models.IntegerField(default=0) |
172 |
backendtime = models.DateTimeField(default=datetime.datetime.min) |
|
170 | 173 |
|
171 | 174 |
# Error classes |
172 | 175 |
class InvalidBackendIdError(Exception): |
b/snf-cyclades-app/synnefo/logic/backend.py | ||
---|---|---|
56 | 56 |
|
57 | 57 |
|
58 | 58 |
@transaction.commit_on_success |
59 |
def process_op_status(vm, jobid, opcode, status, logmsg): |
|
59 |
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
|
|
60 | 60 |
"""Process a job progress notification from the backend |
61 | 61 |
|
62 | 62 |
Process an incoming message from the backend (currently Ganeti). |
... | ... | |
96 | 96 |
vm.deleted = True |
97 | 97 |
vm.nics.all().delete() |
98 | 98 |
|
99 |
vm.backendtime = etime |
|
99 | 100 |
# Any other notification of failure leaves the operating state unchanged |
100 | 101 |
|
101 | 102 |
vm.save() |
102 | 103 |
|
103 | 104 |
|
104 | 105 |
@transaction.commit_on_success |
105 |
def process_net_status(vm, nics): |
|
106 |
def process_net_status(vm, etime, nics):
|
|
106 | 107 |
"""Process a net status notification from the backend |
107 | 108 |
|
108 | 109 |
Process an incoming message from the Ganeti backend, |
... | ... | |
145 | 146 |
# network nics modified, update network object |
146 | 147 |
net.save() |
147 | 148 |
|
149 |
vm.backendtime = etime |
|
148 | 150 |
vm.save() |
149 | 151 |
|
150 | 152 |
|
151 | 153 |
@transaction.commit_on_success |
152 |
def process_create_progress(vm, rprogress, wprogress): |
|
154 |
def process_create_progress(vm, etime, rprogress, wprogress):
|
|
153 | 155 |
|
154 | 156 |
# XXX: This only uses the read progress for now. |
155 | 157 |
# Explore whether it would make sense to use the value of wprogress |
... | ... | |
180 | 182 |
# raise VirtualMachine.IllegalState("VM is not in building state") |
181 | 183 |
|
182 | 184 |
vm.buildpercentage = percentage |
185 |
vm.backendtime = etime |
|
183 | 186 |
vm.save() |
184 | 187 |
|
185 | 188 |
|
b/snf-cyclades-app/synnefo/logic/callbacks.py | ||
---|---|---|
31 | 31 |
# from AMQP queues. |
32 | 32 |
|
33 | 33 |
import logging |
34 |
import socket |
|
35 |
import traceback |
|
36 | 34 |
import json |
37 |
import sys |
|
35 |
from functools import wraps |
|
36 |
from datetime import datetime |
|
38 | 37 |
|
39 | 38 |
from synnefo.db.models import VirtualMachine |
40 | 39 |
from synnefo.logic import utils, backend |
41 | 40 |
|
41 |
from synnefo.lib.utils import merge_time |
|
42 |
|
|
42 | 43 |
|
43 | 44 |
log = logging.getLogger() |
44 | 45 |
|
45 | 46 |
|
46 |
def update_db(message): |
|
47 |
def is_update_required(func): |
|
48 |
""" |
|
49 |
Decorator for checking if an incoming message needs to update the db. |
|
50 |
|
|
51 |
The database will not be updated in the following cases: |
|
52 |
- The message has been redelivered and the action has already been |
|
53 |
completed. In this case the event_time will be equal with the one |
|
54 |
in the database. |
|
55 |
- The message describes a previous state in the ganeti, from the one that is |
|
56 |
described in the db. In this case the event_time will be smaller from the |
|
57 |
one in the database. |
|
58 |
|
|
59 |
This decorator is also acknowledging the messages to the AMQP broker. |
|
60 |
|
|
61 |
""" |
|
62 |
@wraps(func) |
|
63 |
def wrapper(client, message, *args, **kwargs): |
|
64 |
log.debug("Checking if action is required for msg %s", message) |
|
65 |
|
|
66 |
try: |
|
67 |
msg = json.loads(message['body']) |
|
68 |
|
|
69 |
event_time = merge_time(msg['event_time']) |
|
70 |
|
|
71 |
vm_id = utils.id_from_instance_name(msg["instance"]) |
|
72 |
vm = VirtualMachine.objects.get(id=vm_id) |
|
73 |
|
|
74 |
db_time = vm.backendtime |
|
75 |
if event_time <= db_time: |
|
76 |
format_ = "%d/%m/%y %H:%M:%S:%f" |
|
77 |
log.debug("Ignoring message. event_timestamp: %s db_timestamp: %s", |
|
78 |
event_time.strftime(format_), |
|
79 |
db_time.strftime(format_)) |
|
80 |
client.basic_ack(message) |
|
81 |
return |
|
82 |
|
|
83 |
# New message. Update the database! |
|
84 |
func(client, message) |
|
85 |
|
|
86 |
except ValueError: |
|
87 |
log.error("Incoming message not in JSON format: %s", message) |
|
88 |
client.basic_ack(message) |
|
89 |
except KeyError: |
|
90 |
log.error("Malformed incoming JSON, missing attributes: %s", |
|
91 |
message) |
|
92 |
client.basic_ack(message) |
|
93 |
except VirtualMachine.InvalidBackendIdError: |
|
94 |
log.debug("Ignoring msg for unknown instance %s.", msg['instance']) |
|
95 |
client.basic_ack(message) |
|
96 |
except VirtualMachine.DoesNotExist: |
|
97 |
log.error("VM for instance %s with id %d not found in DB.", |
|
98 |
msg['instance'], vm_id) |
|
99 |
client.basic_ack(message) |
|
100 |
except Exception as e: |
|
101 |
log.exception("Unexpected error: %s, msg: %s", e, msg) |
|
102 |
else: |
|
103 |
# Acknowledge the message |
|
104 |
client.basic_ack(message) |
|
105 |
|
|
106 |
return wrapper |
|
107 |
|
|
108 |
|
|
109 |
@is_update_required |
|
110 |
def update_db(client, message): |
|
47 | 111 |
"""Process a notification of type 'ganeti-op-status'""" |
48 |
log.debug("Processing ganeti-op-status msg: %s", message.body) |
|
49 |
msg = None |
|
50 |
try: |
|
51 |
msg = json.loads(message.body) |
|
52 |
|
|
53 |
if msg["type"] != "ganeti-op-status": |
|
54 |
log.error("Message is of unknown type %s.", msg["type"]) |
|
55 |
return |
|
56 |
|
|
57 |
if msg["operation"] == "OP_INSTANCE_QUERY_DATA": |
|
58 |
return status_job_finished(message) |
|
59 |
|
|
60 |
vmid = utils.id_from_instance_name(msg["instance"]) |
|
61 |
vm = VirtualMachine.objects.get(id=vmid) |
|
62 |
|
|
63 |
backend.process_op_status(vm, msg["jobId"], msg["operation"], |
|
64 |
msg["status"], msg["logmsg"]) |
|
65 |
log.debug("Done processing ganeti-op-status msg for vm %s.", |
|
66 |
msg["instance"]) |
|
67 |
message.channel.basic_ack(message.delivery_tag) |
|
68 |
except KeyError: |
|
69 |
log.error("Malformed incoming JSON, missing attributes: %s", |
|
70 |
message.body) |
|
71 |
except VirtualMachine.InvalidBackendIdError: |
|
72 |
log.debug("Ignoring msg for unknown instance %s.", msg["instance"]) |
|
73 |
except VirtualMachine.InvalidBackendMsgError, e: |
|
74 |
log.debug("Ignoring msg of unknown type: %s.", e) |
|
75 |
except VirtualMachine.DoesNotExist: |
|
76 |
log.error("VM for instance %s with id %d not found in DB.", |
|
77 |
msg["instance"], vmid) |
|
78 |
except Exception as e: |
|
79 |
log.exception("Unexpected error, msg: %s", msg) |
|
112 |
log.debug("Processing ganeti-op-status msg: %s", message['body']) |
|
113 |
|
|
114 |
msg = json.loads(message['body']) |
|
80 | 115 |
|
116 |
if msg['type'] != "ganeti-op-status": |
|
117 |
log.error("Message is of unknown type %s.", msg['type']) |
|
118 |
return |
|
81 | 119 |
|
82 |
def update_net(message): |
|
120 |
if msg['operation'] == "OP_INSTANCE_QUERY_DATA": |
|
121 |
return status_job_finished(client, message) |
|
122 |
|
|
123 |
vm_id = utils.id_from_instance_name(msg['instance']) |
|
124 |
vm = VirtualMachine.objects.get(id=vm_id) |
|
125 |
|
|
126 |
event_time = merge_time(msg['event_time']) |
|
127 |
backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'], |
|
128 |
msg['status'], msg['logmsg']) |
|
129 |
|
|
130 |
log.debug("Done processing ganeti-op-status msg for vm %s.", |
|
131 |
msg['instance']) |
|
132 |
|
|
133 |
|
|
134 |
@is_update_required |
|
135 |
def update_net(client, message): |
|
83 | 136 |
"""Process a notification of type 'ganeti-net-status'""" |
84 |
log.debug("Processing ganeti-net-status msg: %s", message.body) |
|
85 |
msg = None |
|
86 |
try: |
|
87 |
msg = json.loads(message.body) |
|
88 |
|
|
89 |
if msg["type"] != "ganeti-net-status": |
|
90 |
log.error("Message is of unknown type %s", msg["type"]) |
|
91 |
return |
|
92 |
|
|
93 |
vmid = utils.id_from_instance_name(msg["instance"]) |
|
94 |
vm = VirtualMachine.objects.get(id=vmid) |
|
95 |
|
|
96 |
backend.process_net_status(vm, msg["nics"]) |
|
97 |
log.debug("Done processing ganeti-net-status msg for vm %s.", |
|
98 |
msg["instance"]) |
|
99 |
message.channel.basic_ack(message.delivery_tag) |
|
100 |
except KeyError: |
|
101 |
log.error("Malformed incoming JSON, missing attributes: %s", |
|
102 |
message.body) |
|
103 |
except VirtualMachine.InvalidBackendIdError: |
|
104 |
log.debug("Ignoring msg for unknown instance %s.", msg["instance"]) |
|
105 |
except VirtualMachine.DoesNotExist: |
|
106 |
log.error("VM for instance %s with id %d not found in DB.", |
|
107 |
msg["instance"], vmid) |
|
108 |
except Exception as e: |
|
109 |
log.exception("Unexpected error, msg: %s", msg) |
|
137 |
log.debug("Processing ganeti-net-status msg: %s", message['body']) |
|
110 | 138 |
|
139 |
msg = json.loads(message['body']) |
|
111 | 140 |
|
112 |
def update_build_progress(message): |
|
113 |
"""Process a create progress message""" |
|
114 |
log.debug("Processing ganeti-create-progress msg: %s", message.body) |
|
115 |
msg = None |
|
116 |
try: |
|
117 |
msg = json.loads(message.body) |
|
118 |
|
|
119 |
if msg['type'] != "ganeti-create-progress": |
|
120 |
log.error("Message is of unknown type %s", msg["type"]) |
|
121 |
return |
|
122 |
|
|
123 |
# XXX: The following assumes names like snf-12 |
|
124 |
vmid = msg['instance'].split('-')[1] |
|
125 |
vm = VirtualMachine.objects.get(id=vmid) |
|
126 |
|
|
127 |
backend.process_create_progress(vm, msg['rprogress'], None) |
|
128 |
log.debug("Done processing ganeti-create-progress msg for vm %s.", |
|
129 |
msg["instance"]) |
|
130 |
message.channel.basic_ack(message.delivery_tag) |
|
131 |
except KeyError: |
|
132 |
log.error("Malformed incoming JSON, missing attributes: %s", |
|
133 |
message.body) |
|
134 |
except Exception as e: |
|
135 |
log.exception("Unexpected error, msg: %s", msg) |
|
136 |
raise |
|
141 |
if msg['type'] != "ganeti-net-status": |
|
142 |
log.error("Message is of unknown type %s", msg['type']) |
|
143 |
return |
|
137 | 144 |
|
145 |
vm_id = utils.id_from_instance_name(msg['instance']) |
|
146 |
vm = VirtualMachine.objects.get(id=vm_id) |
|
138 | 147 |
|
139 |
def trigger_status_update(message): |
|
140 |
"""Triggers a status update job for a specific VM id""" |
|
141 |
log.debug("Request to trigger status update: %s", message.body) |
|
142 |
msg = None |
|
143 |
try: |
|
144 |
msg = json.loads(message.body) |
|
148 |
event_time = merge_time(msg['event_time']) |
|
149 |
backend.process_net_status(vm, event_time, msg['nics']) |
|
145 | 150 |
|
146 |
if msg["type"] != "reconcile": |
|
147 |
log.error("Message is of unknown type %s", msg["type"]) |
|
148 |
return |
|
151 |
log.debug("Done processing ganeti-net-status msg for vm %s.", |
|
152 |
msg["instance"]) |
|
149 | 153 |
|
150 |
if msg["vmid"] == "": |
|
151 |
log.error("Reconciliation message does not specify a VM id") |
|
152 |
return |
|
153 | 154 |
|
154 |
vm = VirtualMachine.objects.get(id=msg["vmid"]) |
|
155 |
backend.request_status_update(vm) |
|
155 |
@is_update_required |
|
156 |
def update_build_progress(client, message): |
|
157 |
"""Process a create progress message""" |
|
158 |
log.debug("Processing ganeti-create-progress msg: %s", message['body']) |
|
156 | 159 |
|
157 |
message.channel.basic_ack(message.delivery_tag) |
|
158 |
except KeyError as k: |
|
159 |
log.error("Malformed incoming JSON, missing attributes: %s", k) |
|
160 |
except Exception as e: |
|
161 |
log.exception("Unexpected error, msg: %s", msg) |
|
160 |
msg = json.loads(message['body']) |
|
162 | 161 |
|
162 |
if msg['type'] != "ganeti-create-progress": |
|
163 |
log.error("Message is of unknown type %s", msg['type']) |
|
164 |
return |
|
163 | 165 |
|
164 |
def status_job_finished(message): |
|
165 |
"""Updates VM status based on a previously sent status update request""" |
|
166 |
msg = None |
|
167 |
try: |
|
168 |
msg = json.loads(message.body) |
|
166 |
vm_id = utils.id_from_instance_name(msg['instance']) |
|
167 |
vm = VirtualMachine.objects.get(id=vm_id) |
|
169 | 168 |
|
170 |
if msg["operation"] != 'OP_INSTANCE_QUERY_DATA': |
|
171 |
log.error("Message is of unknown type %s", msg["operation"]) |
|
172 |
return |
|
169 |
event_time = merge_time(msg['event_time']) |
|
170 |
backend.process_create_progress(vm, event_time, msg['rprogress'], None) |
|
173 | 171 |
|
174 |
if msg["status"] != "success": |
|
175 |
log.warn("Ignoring non-success status update from job %d on VM %s", |
|
176 |
msg['jobId'], msg['instance']) |
|
177 |
message.channel.basic_ack(message.delivery_tag) |
|
178 |
return |
|
172 |
log.debug("Done processing ganeti-create-progress msg for vm %s.", |
|
173 |
msg['instance']) |
|
179 | 174 |
|
180 |
status = backend.get_job_status(msg['jobId']) |
|
181 | 175 |
|
182 |
log.debug("Node status job result: %s", status) |
|
176 |
def status_job_finished(client, message): |
|
177 |
"""Updates VM status based on a previously sent status update request""" |
|
183 | 178 |
|
184 |
if status['summary'][0] != u'INSTANCE_QUERY_DATA': |
|
185 |
log.error("Status update is of unknown type %s", |
|
186 |
status['summary']) |
|
187 |
return |
|
179 |
msg = json.loads(message['body']) |
|
188 | 180 |
|
189 |
conf_state = status['opresult'][0][msg['instance']]['config_state'] |
|
190 |
run_state = status['opresult'][0][msg['instance']]['run_state'] |
|
181 |
if msg['operation'] != 'OP_INSTANCE_QUERY_DATA': |
|
182 |
log.error("Message is of unknown type %s", msg['operation']) |
|
183 |
return |
|
191 | 184 |
|
192 |
# XXX: The following assumes names like snf-12 |
|
193 |
instid = msg['instance'].split('-')[1] |
|
185 |
if msg['status'] != 'success': |
|
186 |
log.warn("Ignoring non-success status update from job %d on VM %s", |
|
187 |
msg['jobId'], msg['instance']) |
|
188 |
client.basic_ack(message.delivery_tag) |
|
189 |
return |
|
194 | 190 |
|
195 |
vm = VirtualMachine.objects.get(id = instid)
|
|
191 |
status = backend.get_job_status(msg['jobId'])
|
|
196 | 192 |
|
197 |
if run_state == "up": |
|
198 |
opcode = "OP_INSTANCE_REBOOT" |
|
199 |
else: |
|
200 |
opcode = "OP_INSTANCE_SHUTDOWN" |
|
193 |
log.debug("Node status job result: %s", status) |
|
201 | 194 |
|
202 |
backend.process_op_status(vm=vm, jobid=msg['jobId'],opcode=opcode, |
|
203 |
status="success", |
|
204 |
logmsg="Reconciliation: simulated event") |
|
195 |
if status['summary'][0] != u'INSTANCE_QUERY_DATA': |
|
196 |
log.error("Status update is of unknown type %s", |
|
197 |
status['summary']) |
|
198 |
return |
|
205 | 199 |
|
206 |
message.channel.basic_ack(message.delivery_tag)
|
|
207 |
except KeyError as k:
|
|
208 |
log.error("Malformed incoming JSON, missing attributes: %s", k) |
|
209 |
except Exception as e:
|
|
210 |
log.exception("Unexpected error, msg: %s", msg)
|
|
200 |
conf_state = status['opresult'][0][msg['instance']]['config_state']
|
|
201 |
run_state = status['opresult'][0][msg['instance']]['run_state']
|
|
202 |
|
|
203 |
vm_id = utils.id_from_instance_name(msg['instance'])
|
|
204 |
vm = VirtualMachine.objects.get(id = vm_id)
|
|
211 | 205 |
|
206 |
if run_state == "up": |
|
207 |
opcode = "OP_INSTANCE_REBOOT" |
|
208 |
else: |
|
209 |
opcode = "OP_INSTANCE_SHUTDOWN" |
|
212 | 210 |
|
213 |
def dummy_proc(message): |
|
211 |
event_time = merge_time(msg['event_time']) |
|
212 |
backend.process_op_status(vm=vm, etime=event_time, |
|
213 |
jobid=msg['jobId'], opcode=opcode, |
|
214 |
status='success', |
|
215 |
logmsg="Reconciliation: simulated event") |
|
216 |
|
|
217 |
@is_update_required |
|
218 |
def dummy_proc(client, message): |
|
214 | 219 |
try: |
215 |
log.debug("Msg: %s", message.body)
|
|
216 |
message.channel.basic_ack(message.delivery_tag)
|
|
220 |
log.debug("Msg: %s", message['body'])
|
|
221 |
pass
|
|
217 | 222 |
except Exception as e: |
218 | 223 |
log.exception("Could not receive message") |
219 |
pass |
b/snf-cyclades-app/synnefo/logic/dispatcher.py | ||
---|---|---|
54 | 54 |
import socket |
55 | 55 |
from daemon import daemon |
56 | 56 |
|
57 |
from synnefo.lib.amqp import AMQPClient |
|
58 |
|
|
57 | 59 |
# Take care of differences between python-daemon versions. |
58 | 60 |
try: |
59 | 61 |
from daemon import pidfile |
... | ... | |
75 | 77 |
|
76 | 78 |
|
77 | 79 |
class Dispatcher: |
78 |
chan = None |
|
79 | 80 |
debug = False |
80 |
clienttags = []
|
|
81 |
client_promises = []
|
|
81 | 82 |
|
82 | 83 |
def __init__(self, debug=False): |
83 | 84 |
self.debug = debug |
84 | 85 |
self._init() |
85 | 86 |
|
86 | 87 |
def wait(self): |
88 |
log.info("Waiting for messages..") |
|
87 | 89 |
while True: |
88 | 90 |
try: |
89 |
self.chan.wait()
|
|
91 |
self.client.basic_wait()
|
|
90 | 92 |
except SystemExit: |
91 | 93 |
break |
92 |
except amqp.exceptions.AMQPConnectionException: |
|
93 |
log.error("Server went away, reconnecting...") |
|
94 |
self._init() |
|
95 |
except socket.error: |
|
96 |
log.error("Server went away, reconnecting...") |
|
97 |
self._init() |
|
98 |
except Exception, e: |
|
99 |
log.exception("Caught unexpected exception") |
|
100 |
|
|
101 |
[self.chan.basic_cancel(clienttag) for clienttag in self.clienttags] |
|
102 |
self.chan.connection.close() |
|
103 |
self.chan.close() |
|
94 |
except Exception as e: |
|
95 |
log.exception("Caught unexpected exception: %s", e) |
|
96 |
|
|
97 |
self.client.basic_cancel() |
|
98 |
self.client.close() |
|
104 | 99 |
|
105 | 100 |
def _init(self): |
106 | 101 |
global QUEUES, BINDINGS |
107 | 102 |
log.info("Initializing") |
108 | 103 |
|
109 |
# Connect to RabbitMQ |
|
110 |
conn = None |
|
111 |
while conn == None: |
|
112 |
log.info("Attempting to connect to %s", settings.RABBIT_HOST) |
|
113 |
try: |
|
114 |
conn = amqp.Connection(host=settings.RABBIT_HOST, |
|
115 |
userid=settings.RABBIT_USERNAME, |
|
116 |
password=settings.RABBIT_PASSWORD, |
|
117 |
virtual_host=settings.RABBIT_VHOST) |
|
118 |
except socket.error: |
|
119 |
log.error("Failed to connect to %s, retrying in 10s", |
|
120 |
settings.RABBIT_HOST) |
|
121 |
time.sleep(10) |
|
122 |
|
|
123 |
log.info("Connection succesful, opening channel") |
|
124 |
self.chan = conn.channel() |
|
104 |
self.client = AMQPClient() |
|
105 |
# Connect to AMQP host |
|
106 |
self.client.connect() |
|
125 | 107 |
|
126 | 108 |
# Declare queues and exchanges |
127 | 109 |
for exchange in settings.EXCHANGES: |
128 |
self.chan.exchange_declare(exchange=exchange, type="topic",
|
|
129 |
durable=True, auto_delete=False)
|
|
110 |
self.client.exchange_declare(exchange_name=exchange,
|
|
111 |
exchange_type="topic")
|
|
130 | 112 |
|
131 | 113 |
for queue in QUEUES: |
132 |
self.chan.queue_declare(queue=queue, durable=True,
|
|
133 |
exclusive=False, auto_delete=False)
|
|
114 |
# Queues are mirrored to all RabbitMQ brokers
|
|
115 |
self.client.queue_declare(queue=queue,mirrored=True)
|
|
134 | 116 |
|
135 | 117 |
bindings = BINDINGS |
136 | 118 |
|
... | ... | |
142 | 124 |
log.error("Cannot find callback %s", binding[3]) |
143 | 125 |
raise SystemExit(1) |
144 | 126 |
|
145 |
self.chan.queue_bind(queue=binding[0], exchange=binding[1], |
|
146 |
routing_key=binding[2]) |
|
147 |
tag = self.chan.basic_consume(queue=binding[0], callback=callback) |
|
127 |
self.client.queue_bind(queue=binding[0], exchange=binding[1], |
|
128 |
routing_key=binding[2]) |
|
129 |
|
|
130 |
consume_promise = self.client.basic_consume(queue=binding[0], |
|
131 |
callback=callback) |
|
132 |
|
|
148 | 133 |
log.debug("Binding %s(%s) to queue %s with handler %s", |
149 |
binding[1], binding[2], binding[0], binding[3])
|
|
150 |
self.clienttags.append(tag)
|
|
134 |
binding[1], binding[2], binding[0], binding[3]) |
|
135 |
self.client_promises.append(consume_promise)
|
|
151 | 136 |
|
152 | 137 |
|
153 | 138 |
def _init_queues(): |
... | ... | |
180 | 165 |
(QUEUE_GANETI_EVENTS_OP, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP, 'update_db'), |
181 | 166 |
(QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'), |
182 | 167 |
(QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER, 'update_build_progress'), |
183 |
(QUEUE_RECONC, settings.EXCHANGE_CRON, RECONC_HANDLER, 'trigger_status_update'), |
|
184 | 168 |
] |
185 | 169 |
|
186 | 170 |
if settings.DEBUG is True: |
... | ... | |
248 | 232 |
Delete declared queues from RabbitMQ. Use with care! |
249 | 233 |
""" |
250 | 234 |
global QUEUES, BINDINGS |
251 |
conn = get_connection()
|
|
252 |
chan = conn.channel()
|
|
235 |
client = AMQPClient()
|
|
236 |
client.connect()
|
|
253 | 237 |
|
254 | 238 |
print "Queues to be deleted: ", QUEUES |
255 | 239 |
|
... | ... | |
257 | 241 |
return |
258 | 242 |
|
259 | 243 |
for queue in QUEUES: |
260 |
try: |
|
261 |
chan.queue_delete(queue=queue) |
|
262 |
print "Deleting queue %s" % queue |
|
263 |
except amqp.exceptions.AMQPChannelException as e: |
|
264 |
print e.amqp_reply_code, " ", e.amqp_reply_text |
|
265 |
chan = conn.channel() |
|
244 |
result = client.queue_delete(queue=queue) |
|
245 |
print "Deleting queue %s. Result: %s" % (queue, result) |
|
266 | 246 |
|
267 |
chan.connection.close()
|
|
247 |
client.close()
|
|
268 | 248 |
|
269 | 249 |
|
270 | 250 |
def purge_exchanges(): |
... | ... | |
272 | 252 |
global QUEUES, BINDINGS |
273 | 253 |
purge_queues() |
274 | 254 |
|
275 |
conn = get_connection()
|
|
276 |
chan = conn.channel()
|
|
255 |
client = AMQPClient()
|
|
256 |
client.connect()
|
|
277 | 257 |
|
278 | 258 |
print "Exchanges to be deleted: ", settings.EXCHANGES |
279 | 259 |
|
... | ... | |
281 | 261 |
return |
282 | 262 |
|
283 | 263 |
for exchange in settings.EXCHANGES: |
284 |
try: |
|
285 |
chan.exchange_delete(exchange=exchange) |
|
286 |
except amqp.exceptions.AMQPChannelException as e: |
|
287 |
print e.amqp_reply_code, " ", e.amqp_reply_text |
|
264 |
result = client.exchange_delete(exchange=exchange) |
|
265 |
print "Deleting exchange %s. Result: %s" % (exchange, result) |
|
288 | 266 |
|
289 |
chan.connection.close()
|
|
267 |
client.close()
|
|
290 | 268 |
|
291 | 269 |
|
292 | 270 |
def drain_queue(queue): |
... | ... | |
303 | 281 |
|
304 | 282 |
if not get_user_confirmation(): |
305 | 283 |
return |
306 |
conn = get_connection() |
|
307 |
chan = conn.channel() |
|
284 |
|
|
285 |
client = AMQPClient() |
|
286 |
client.connect() |
|
308 | 287 |
|
309 | 288 |
# Register a temporary queue binding |
310 | 289 |
for binding in BINDINGS: |
311 | 290 |
if binding[0] == queue: |
312 | 291 |
exch = binding[1] |
313 | 292 |
|
314 |
if not exch: |
|
315 |
print "Queue not bound to any exchange: %s" % queue |
|
316 |
return |
|
317 |
|
|
318 |
chan.queue_bind(queue=queue, exchange=exch, routing_key='#') |
|
319 |
tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc) |
|
293 |
tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc) |
|
320 | 294 |
|
321 | 295 |
print "Queue draining about to start, hit Ctrl+c when done" |
322 | 296 |
time.sleep(2) |
... | ... | |
327 | 301 |
|
328 | 302 |
num_processed = 0 |
329 | 303 |
while True: |
330 |
chan.wait()
|
|
304 |
client.basic_wait()
|
|
331 | 305 |
num_processed += 1 |
332 | 306 |
sys.stderr.write("Ignored %d messages\r" % num_processed) |
333 | 307 |
|
334 |
chan.basic_cancel(tag) |
|
335 |
chan.connection.close() |
|
336 |
|
|
308 |
client.basic_cancel(tag) |
|
309 |
client.close() |
|
337 | 310 |
|
338 |
def get_connection(): |
|
339 |
conn = amqp.Connection(host=settings.RABBIT_HOST, |
|
340 |
userid=settings.RABBIT_USERNAME, |
|
341 |
password=settings.RABBIT_PASSWORD, |
|
342 |
virtual_host=settings.RABBIT_VHOST) |
|
343 |
return conn |
|
344 | 311 |
|
345 | 312 |
|
346 | 313 |
def get_user_confirmation(): |
b/snf-cyclades-app/synnefo/logic/management/commands/reconcile.py | ||
---|---|---|
35 | 35 |
|
36 | 36 |
""" |
37 | 37 |
import sys |
38 |
import datetime |
|
38 | 39 |
|
39 | 40 |
from optparse import make_option |
40 | 41 |
|
... | ... | |
144 | 145 |
"Simulating successful Ganeti removal for %d " \ |
145 | 146 |
"servers in the DB:" % len(stale) |
146 | 147 |
for vm in VirtualMachine.objects.filter(pk__in=stale): |
147 |
backend.process_op_status(vm=vm, jobid=-0, |
|
148 |
event_time = datetime.datetime.now() |
|
149 |
backend.process_op_status(vm=vm, etime=event_time, jobid=-0, |
|
148 | 150 |
opcode='OP_INSTANCE_REMOVE', status='success', |
149 | 151 |
logmsg='Reconciliation: simulated Ganeti event') |
150 | 152 |
print >> sys.stderr, " ...done" |
... | ... | |
166 | 168 |
vm = VirtualMachine.objects.get(pk=id) |
167 | 169 |
opcode = "OP_INSTANCE_REBOOT" if ganeti_up \ |
168 | 170 |
else "OP_INSTANCE_SHUTDOWN" |
169 |
backend.process_op_status(vm=vm, jobid=-0, |
|
171 |
event_time = datetime.datetime.now() |
|
172 |
backend.process_op_status(vm=vm, etime=event_time ,jobid=-0, |
|
170 | 173 |
opcode=opcode, status='success', |
171 | 174 |
logmsg='Reconciliation: simulated Ganeti event') |
172 | 175 |
print >> sys.stderr, " ...done" |
b/snf-cyclades-gtools/synnefo/ganeti/eventd.py | ||
---|---|---|
46 | 46 |
path = os.path.normpath(os.path.join(os.getcwd(), '..')) |
47 | 47 |
sys.path.append(path) |
48 | 48 |
|
49 |
import time |
|
50 | 49 |
import json |
51 | 50 |
import logging |
52 | 51 |
import pyinotify |
... | ... | |
55 | 54 |
import socket |
56 | 55 |
from signal import signal, SIGINT, SIGTERM |
57 | 56 |
|
58 |
from amqplib import client_0_8 as amqp |
|
59 |
|
|
60 | 57 |
from ganeti import utils |
61 | 58 |
from ganeti import jqueue |
62 | 59 |
from ganeti import constants |
63 | 60 |
from ganeti import serializer |
64 | 61 |
|
65 | 62 |
from synnefo import settings |
63 |
from synnefo.lib.amqp import AMQPClient |
|
64 |
|
|
65 |
def get_time_from_status(op, job): |
|
66 |
"""Generate a unique message identifier for a ganeti job. |
|
67 |
|
|
68 |
The identifier is based on the timestamp of the job. Since a ganeti |
|
69 |
job passes from multiple states, we need to pick the timestamp that |
|
70 |
corresponds to each state. |
|
71 |
|
|
72 |
""" |
|
73 |
status = op.status |
|
74 |
if status == constants.JOB_STATUS_QUEUED: |
|
75 |
return job.received_timestamp |
|
76 |
if status == constants.JOB_STATUS_WAITLOCK: |
|
77 |
#if status == constants.JOB_STATUS_WAITING: |
|
78 |
return op.start_timestamp |
|
79 |
if status == constants.JOB_STATUS_CANCELING: |
|
80 |
return op.start_timestamp |
|
81 |
if status == constants.JOB_STATUS_RUNNING: |
|
82 |
return op.exec_timestamp |
|
83 |
if status in constants.JOBS_FINALIZED: |
|
84 |
# success, canceled, error |
|
85 |
return op.end_timestamp |
|
86 |
|
|
87 |
raise InvalidBackendState(status, job) |
|
88 |
|
|
89 |
|
|
90 |
class InvalidBackendStatus(Exception): |
|
91 |
def __init__(self, status, job): |
|
92 |
self.status = status |
|
93 |
self.job = job |
|
94 |
|
|
95 |
def __str__(self): |
|
96 |
return repr("Invalid backend status: %s in job %s" |
|
97 |
% (self.status, self.job)) |
|
98 |
|
|
66 | 99 |
|
67 | 100 |
class JobFileHandler(pyinotify.ProcessEvent): |
68 | 101 |
def __init__(self, logger): |
69 | 102 |
pyinotify.ProcessEvent.__init__(self) |
70 | 103 |
self.logger = logger |
71 |
self.chan = None |
|
72 |
|
|
73 |
def open_channel(self): |
|
74 |
conn = None |
|
75 |
while conn == None: |
|
76 |
handler_logger.info("Attempting to connect to %s", |
|
77 |
settings.RABBIT_HOST) |
|
78 |
try: |
|
79 |
conn = amqp.Connection(host=settings.RABBIT_HOST, |
|
80 |
userid=settings.RABBIT_USERNAME, |
|
81 |
password=settings.RABBIT_PASSWORD, |
|
82 |
virtual_host=settings.RABBIT_VHOST) |
|
83 |
except socket.error: |
|
84 |
time.sleep(1) |
|
85 |
|
|
86 |
handler_logger.info("Connection succesful, opening channel") |
|
87 |
return conn.channel() |
|
104 |
self.client = AMQPClient() |
|
105 |
handler_logger.info("Attempting to connect to RabbitMQ hosts") |
|
106 |
self.client.connect() |
|
107 |
handler_logger.info("Connected succesfully") |
|
88 | 108 |
|
89 | 109 |
def process_IN_CLOSE_WRITE(self, event): |
90 | 110 |
self.process_IN_MOVED_TO(event) |
91 | 111 |
|
92 | 112 |
def process_IN_MOVED_TO(self, event): |
93 |
if self.chan == None: |
|
94 |
self.chan = self.open_channel() |
|
95 |
|
|
96 | 113 |
jobfile = os.path.join(event.path, event.name) |
97 | 114 |
if not event.name.startswith("job-"): |
98 | 115 |
self.logger.debug("Not a job file: %s" % event.path) |
... | ... | |
128 | 145 |
except IndexError: |
129 | 146 |
logmsg = None |
130 | 147 |
|
148 |
# Generate a unique message identifier |
|
149 |
event_time = get_time_from_status(op, job) |
|
150 |
|
|
131 | 151 |
self.logger.debug("Job: %d: %s(%s) %s %s", |
132 | 152 |
int(job.id), op.input.OP_ID, instances, op.status, logmsg) |
133 | 153 |
|
134 | 154 |
# Construct message |
135 | 155 |
msg = { |
156 |
"event_time" : event_time, |
|
136 | 157 |
"type": "ganeti-op-status", |
137 | 158 |
"instance": instances, |
138 | 159 |
"operation": op.input.OP_ID, |
... | ... | |
140 | 161 |
"status": op.status, |
141 | 162 |
"logmsg": logmsg |
142 | 163 |
} |
143 |
if logmsg: |
|
144 |
msg["message"] = logmsg |
|
145 |
|
|
146 |
instance = instances.split('-')[0] |
|
147 |
routekey = "ganeti.%s.event.op" % instance |
|
148 |
|
|
149 |
self.logger.debug("Delivering msg: %s (key=%s)", |
|
150 |
json.dumps(msg), routekey) |
|
151 |
msg = amqp.Message(json.dumps(msg)) |
|
152 |
msg.properties["delivery_mode"] = 2 # Persistent |
|
153 |
|
|
154 |
while True: |
|
155 |
try: |
|
156 |
self.chan.basic_publish(msg, |
|
157 |
exchange=settings.EXCHANGE_GANETI, |
|
158 |
routing_key=routekey) |
|
159 |
self.logger.debug("Message published to AMQP successfully") |
|
160 |
return |
|
161 |
except socket.error: |
|
162 |
self.logger.exception("Server went away, reconnecting...") |
|
163 |
self.chan = self.open_channel() |
|
164 |
except Exception: |
|
165 |
self.logger.exception("Caught unexpected exception, msg: ", |
|
166 |
msg) |
|
167 |
raise |
|
164 |
|
|
165 |
instance_prefix = instances.split('-')[0] |
|
166 |
routekey = "ganeti.%s.event.op" % instance_prefix |
|
167 |
|
|
168 |
self.logger.debug("Delivering msg: %s (key=%s)", json.dumps(msg), |
|
169 |
routekey) |
|
170 |
msg = json.dumps(msg) |
|
171 |
|
|
172 |
# Send the message to RabbitMQ |
|
173 |
self.client.basic_publish(exchange=settings.EXCHANGE_GANETI, |
|
174 |
routing_key=routekey, |
|
175 |
body=msg) |
|
168 | 176 |
|
169 | 177 |
handler_logger = None |
170 | 178 |
def fatal_signal_handler(signum, frame): |
b/snf-cyclades-gtools/synnefo/ganeti/hook.py | ||
---|---|---|
45 | 45 |
import os |
46 | 46 |
import subprocess |
47 | 47 |
|
48 |
import time |
|
49 | 48 |
import json |
50 | 49 |
import socket |
51 | 50 |
import logging |
52 | 51 |
|
53 |
from amqplib import client_0_8 as amqp
|
|
52 |
from time import time
|
|
54 | 53 |
|
55 | 54 |
from synnefo import settings |
55 |
from synnefo.lib.amqp import AMQPClient |
|
56 |
from synnefo.lib.utils import split_time |
|
56 | 57 |
|
57 | 58 |
|
58 | 59 |
def mac2eui64(mac, prefixstr): |
... | ... | |
135 | 136 |
nics_list.append(nics[i]) |
136 | 137 |
|
137 | 138 |
msg = { |
139 |
"event_time": split_time(time()), |
|
138 | 140 |
"type": "ganeti-net-status", |
139 | 141 |
"instance": instance, |
140 | 142 |
"nics": nics_list |
... | ... | |
149 | 151 |
self.environ = environ |
150 | 152 |
self.instance = instance |
151 | 153 |
self.prefix = prefix |
154 |
# Retry up to two times(per host) to open a channel to RabbitMQ. |
|
155 |
# The hook needs to abort if this count is exceeded, because it |
|
156 |
# runs synchronously with VM creation inside Ganeti, and may only |
|
157 |
# run for a finite amount of time. |
|
158 |
|
|
159 |
# FIXME: We need a reconciliation mechanism between the DB and |
|
160 |
# Ganeti, for cases exactly like this. |
|
161 |
self.client = AMQPClient(max_retries= 2*len(settings.AMQP_HOSTS)) |
|
162 |
self.client.connect() |
|
152 | 163 |
|
153 | 164 |
def on_master(self): |
154 | 165 |
"""Return True if running on the Ganeti master""" |
... | ... | |
158 | 169 |
for (msgtype, msg) in msgs: |
159 | 170 |
routekey = "ganeti.%s.event.%s" % (self.prefix, msgtype) |
160 | 171 |
self.logger.debug("Pushing message to RabbitMQ: %s (key = %s)", |
161 |
json.dumps(msg), routekey) |
|
162 |
msg = amqp.Message(json.dumps(msg)) |
|
163 |
msg.properties["delivery_mode"] = 2 # Persistent |
|
164 |
|
|
165 |
# Retry up to five times to open a channel to RabbitMQ. |
|
166 |
# The hook needs to abort if this count is exceeded, because it |
|
167 |
# runs synchronously with VM creation inside Ganeti, and may only |
|
168 |
# run for a finite amount of time. |
|
169 |
# |
|
170 |
# FIXME: We need a reconciliation mechanism between the DB and |
|
171 |
# Ganeti, for cases exactly like this. |
|
172 |
conn = None |
|
173 |
sent = False |
|
174 |
retry = 0 |
|
175 |
while not sent and retry < 5: |
|
176 |
self.logger.debug("Attempting to publish to RabbitMQ at %s", |
|
177 |
settings.RABBIT_HOST) |
|
178 |
try: |
|
179 |
if not conn: |
|
180 |
conn = amqp.Connection(host=settings.RABBIT_HOST, |
|
181 |
userid=settings.RABBIT_USERNAME, |
|
182 |
password=settings.RABBIT_PASSWORD, |
|
183 |
virtual_host=settings.RABBIT_VHOST) |
|
184 |
chann = conn.channel() |
|
185 |
self.logger.debug("Successfully connected to RabbitMQ at %s", |
|
186 |
settings.RABBIT_HOST) |
|
187 |
|
|
188 |
chann.basic_publish(msg, |
|
189 |
exchange=settings.EXCHANGE_GANETI, |
|
190 |
routing_key=routekey) |
|
191 |
sent = True |
|
192 |
self.logger.debug("Successfully sent message to RabbitMQ") |
|
193 |
except socket.error: |
|
194 |
conn = False |
|
195 |
retry += 1 |
|
196 |
self.logger.exception("Publish to RabbitMQ failed, retry=%d in 1s", |
|
197 |
retry) |
|
198 |
time.sleep(1) |
|
199 |
|
|
200 |
if not sent: |
|
201 |
raise Exception("Publish to RabbitMQ failed after %d tries, aborting" % retry) |
|
202 |
|
|
172 |
json.dumps(msg), routekey) |
|
173 |
msg = json.dumps(msg) |
|
174 |
self.client.basic_publish(exchange=settings.EXCHANGE_GANETI, |
|
175 |
routing_key=routekey, |
|
176 |
body=msg) |
|
177 |
self.client.close() |
|
203 | 178 |
|
204 | 179 |
class PostStartHook(GanetiHook): |
205 | 180 |
"""Post-instance-startup Ganeti Hook. |
b/snf-cyclades-gtools/synnefo/ganeti/progress_monitor.py | ||
---|---|---|
51 | 51 |
import signal |
52 | 52 |
import socket |
53 | 53 |
|
54 |
from amqplib import client_0_8 as amqp |
|
55 |
|
|
56 | 54 |
from synnefo import settings |
57 |
|
|
58 |
|
|
59 |
class AMQPClient(object): |
|
60 |
def __init__(self, routekey): |
|
61 |
self.conn = None |
|
62 |
self.chan = None |
|
63 |
self.routekey = routekey |
|
64 |
|
|
65 |
def open_channel(self): |
|
66 |
if not self.conn: |
|
67 |
try: |
|
68 |
sys.stderr.write("Attempting to connect to %s\n" % |
|
69 |
settings.RABBIT_HOST) |
|
70 |
self.conn = amqp.Connection(host=settings.RABBIT_HOST, |
|
71 |
userid=settings.RABBIT_USERNAME, |
|
72 |
password=settings.RABBIT_PASSWORD, |
|
73 |
virtual_host=settings.RABBIT_VHOST) |
|
74 |
except socket.error: |
|
75 |
sys.stderr.write("Connection failed, will retry in 1s\n") |
|
76 |
time.sleep(1) |
|
77 |
|
|
78 |
if self.conn: |
|
79 |
sys.stderr.write("Connection succesful, opening channel\n") |
|
80 |
self.chan = self.conn.channel() |
|
81 |
|
|
82 |
def send_message(self, msg): |
|
83 |
sys.stderr.write("Delivering msg with key=%s:\n%s\n" % |
|
84 |
(self.routekey, json.dumps(msg))) |
|
85 |
msg = amqp.Message(json.dumps(msg)) |
|
86 |
msg.properties["delivery_mode"] = 2 # Persistent |
|
87 |
|
|
88 |
if not self.chan: |
|
89 |
self.open_channel() |
|
90 |
if not self.chan: |
|
91 |
return |
|
92 |
|
|
93 |
try: |
|
94 |
self.chan.basic_publish(msg, |
|
95 |
exchange=settings.EXCHANGE_GANETI, |
|
96 |
routing_key=self.routekey) |
|
97 |
except socket.error: |
|
98 |
sys.stderr.write("Server went away, reconnecting...\n") |
|
99 |
self.conn = None |
|
100 |
self.chan = None |
|
55 |
from synnefo.lib.amqp import AMQPClient |
|
56 |
from synnefo.lib.utils import split_time |
|
101 | 57 |
|
102 | 58 |
|
103 | 59 |
def parse_arguments(args): |
... | ... | |
174 | 130 |
# determine the routekey for AMPQ |
175 | 131 |
prefix = opts.instance_name.split('-')[0] |
176 | 132 |
routekey = "ganeti.%s.event.progress" % prefix |
177 |
amqp = AMQPClient(routekey) |
|
133 |
amqp_client = AMQPClient() |
|
134 |
amqp_client.connect() |
|
178 | 135 |
|
179 | 136 |
pid = os.fork() |
180 | 137 |
if pid == 0: |
... | ... | |
208 | 165 |
# send a final notification |
209 | 166 |
final_msg = dict(type="ganeti-create-progress", |
210 | 167 |
instance=opts.instance_name) |
168 |
final_msg['event_time'] = split_time(time.time()) |
|
211 | 169 |
if opts.read_bytes: |
212 | 170 |
final_msg['rprogress'] = float(100) |
213 | 171 |
if opts.write_bytes: |
214 | 172 |
final_msg['wprogress'] = float(100) |
215 |
amqp.send_message(final_msg) |
|
173 |
amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI, |
|
174 |
routing_key=routekey, |
|
175 |
body=json.dumps(final_msg)) |
|
216 | 176 |
return 0 |
217 | 177 |
|
218 | 178 |
# retrieve the current values of the read/write byte counters |
... | ... | |
226 | 186 |
# Construct notification of type 'ganeti-create-progress' |
227 | 187 |
msg = dict(type="ganeti-create-progress", |
228 | 188 |
instance=opts.instance_name) |
189 |
msg['event_time'] = split_time(time.time()) |
|
229 | 190 |
if opts.read_bytes: |
230 | 191 |
msg['rprogress'] = float("%2.2f" % |
231 | 192 |
(rchar * 100.0 / opts.read_bytes)) |
... | ... | |
234 | 195 |
(wchar * 100.0 / opts.write_bytes)) |
235 | 196 |
|
236 | 197 |
# and send it over AMQP |
237 |
amqp.send_message(msg) |
|
198 |
amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI, |
|
199 |
routing_key=routekey, |
|
200 |
body=json.dumps(msg)) |
|
238 | 201 |
|
239 | 202 |
# Sleep for a while |
240 | 203 |
time.sleep(3) |
Also available in: Unified diff