Revision c4e55622

b/snf-common/synnefo/lib/utils.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
import datetime
35

  
36
def split_time(value):
37
  """Splits time as floating point number into a tuple.
38

  
39
  @param value: Time in seconds
40
  @type value: int or float
41
  @return: Tuple containing (seconds, microseconds)
42

  
43
  """
44
  (seconds, microseconds) = divmod(int(value * 1000000), 1000000)
45

  
46
  assert 0 <= seconds, \
47
    "Seconds must be larger than or equal to 0, but are %s" % seconds
48
  assert 0 <= microseconds <= 999999, \
49
    "Microseconds must be 0-999999, but are %s" % microseconds
50

  
51
  return (int(seconds), int(microseconds))
52

  
53

  
54
def merge_time(timetuple):
55
  """Merges a tuple into a datetime object
56

  
57
  @param timetuple: Time as tuple, (seconds, microseconds)
58
  @type timetuple: tuple
59
  @return: Time as a datetime object
60

  
61
  """
62
  (seconds, microseconds) = timetuple
63

  
64
  assert 0 <= seconds, \
65
    "Seconds must be larger than or equal to 0, but are %s" % seconds
66
  assert 0 <= microseconds <= 999999, \
67
    "Microseconds must be 0-999999, but are %s" % microseconds
68

  
69
  t1 = float(seconds) + (float(microseconds) * 0.000001)
70
  return datetime.datetime.fromtimestamp(t1)
b/snf-cyclades-app/synnefo/db/migrations/0033_auto__chg_field_networklink_available__add_field_virtualmachine_backen.py
1
# encoding: utf-8
2
import datetime
3
from south.db import db
4
from south.v2 import SchemaMigration
5
from django.db import models
6

  
7
class Migration(SchemaMigration):
8
    
9
    def forwards(self, orm):
10
        
11
        # Changing field 'NetworkLink.available'
12
        db.alter_column('db_networklink', 'available', self.gf('django.db.models.fields.BooleanField')(blank=True))
13

  
14
        # Adding field 'VirtualMachine.backendtime'
15
        db.add_column('db_virtualmachine', 'backendtime', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(1, 1, 1, 0, 0)), keep_default=False)
16

  
17
        # Changing field 'VirtualMachine.deleted'
18
        db.alter_column('db_virtualmachine', 'deleted', self.gf('django.db.models.fields.BooleanField')(blank=True))
19

  
20
        # Changing field 'VirtualMachine.suspended'
21
        db.alter_column('db_virtualmachine', 'suspended', self.gf('django.db.models.fields.BooleanField')(blank=True))
22

  
23
        # Changing field 'Flavor.deleted'
24
        db.alter_column('db_flavor', 'deleted', self.gf('django.db.models.fields.BooleanField')(blank=True))
25

  
26
        # Changing field 'Network.public'
27
        db.alter_column('db_network', 'public', self.gf('django.db.models.fields.BooleanField')(blank=True))
28
    
29
    
30
    def backwards(self, orm):
31
        
32
        # Changing field 'NetworkLink.available'
33
        db.alter_column('db_networklink', 'available', self.gf('django.db.models.fields.BooleanField')())
34

  
35
        # Deleting field 'VirtualMachine.backendtime'
36
        db.delete_column('db_virtualmachine', 'backendtime')
37

  
38
        # Changing field 'VirtualMachine.deleted'
39
        db.alter_column('db_virtualmachine', 'deleted', self.gf('django.db.models.fields.BooleanField')())
40

  
41
        # Changing field 'VirtualMachine.suspended'
42
        db.alter_column('db_virtualmachine', 'suspended', self.gf('django.db.models.fields.BooleanField')())
43

  
44
        # Changing field 'Flavor.deleted'
45
        db.alter_column('db_flavor', 'deleted', self.gf('django.db.models.fields.BooleanField')())
46

  
47
        # Changing field 'Network.public'
48
        db.alter_column('db_network', 'public', self.gf('django.db.models.fields.BooleanField')())
49
    
50
    
51
    models = {
52
        'db.flavor': {
53
            'Meta': {'unique_together': "(('cpu', 'ram', 'disk', 'disk_template'),)", 'object_name': 'Flavor'},
54
            'cpu': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
55
            'deleted': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'blank': 'True'}),
56
            'disk': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
57
            'disk_template': ('django.db.models.fields.CharField', [], {'default': "'drbd'", 'max_length': '32'}),
58
            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
59
            'ram': ('django.db.models.fields.IntegerField', [], {'default': '0'})
60
        },
61
        'db.network': {
62
            'Meta': {'object_name': 'Network'},
63
            'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
64
            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
65
            'link': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'to': "orm['db.NetworkLink']"}),
66
            'machines': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['db.VirtualMachine']", 'through': "orm['db.NetworkInterface']", 'symmetrical': 'False'}),
67
            'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
68
            'public': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'blank': 'True'}),
69
            'state': ('django.db.models.fields.CharField', [], {'max_length': '30'}),
70
            'updated': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'}),
71
            'userid': ('django.db.models.fields.CharField', [], {'max_length': '100', 'null': 'True'})
72
        },
73
        'db.networkinterface': {
74
            'Meta': {'object_name': 'NetworkInterface'},
75
            'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
76
            'firewall_profile': ('django.db.models.fields.CharField', [], {'max_length': '30', 'null': 'True'}),
77
            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
78
            'index': ('django.db.models.fields.IntegerField', [], {'null': 'True'}),
79
            'ipv4': ('django.db.models.fields.CharField', [], {'max_length': '15', 'null': 'True'}),
80
            'ipv6': ('django.db.models.fields.CharField', [], {'max_length': '100', 'null': 'True'}),
81
            'mac': ('django.db.models.fields.CharField', [], {'max_length': '17', 'null': 'True'}),
82
            'machine': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'nics'", 'to': "orm['db.VirtualMachine']"}),
83
            'network': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'nics'", 'to': "orm['db.Network']"}),
84
            'updated': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'})
85
        },
86
        'db.networklink': {
87
            'Meta': {'object_name': 'NetworkLink'},
88
            'available': ('django.db.models.fields.BooleanField', [], {'default': 'True', 'blank': 'True'}),
89
            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
90
            'index': ('django.db.models.fields.IntegerField', [], {}),
91
            'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
92
            'network': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': "orm['db.Network']"})
93
        },
94
        'db.virtualmachine': {
95
            'Meta': {'object_name': 'VirtualMachine'},
96
            'action': ('django.db.models.fields.CharField', [], {'max_length': '30', 'null': 'True'}),
97
            'backendjobid': ('django.db.models.fields.PositiveIntegerField', [], {'null': 'True'}),
98
            'backendjobstatus': ('django.db.models.fields.CharField', [], {'max_length': '30', 'null': 'True'}),
99
            'backendlogmsg': ('django.db.models.fields.TextField', [], {'null': 'True'}),
100
            'backendopcode': ('django.db.models.fields.CharField', [], {'max_length': '30', 'null': 'True'}),
101
            'backendtime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(1, 1, 1, 0, 0)'}),
102
            'buildpercentage': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
103
            'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
104
            'deleted': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'blank': 'True'}),
105
            'flavor': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['db.Flavor']"}),
106
            'hostid': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
107
            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
108
            'imageid': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
109
            'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
110
            'operstate': ('django.db.models.fields.CharField', [], {'max_length': '30', 'null': 'True'}),
111
            'suspended': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'blank': 'True'}),
112
            'updated': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'}),
113
            'userid': ('django.db.models.fields.CharField', [], {'max_length': '100'})
114
        },
115
        'db.virtualmachinemetadata': {
116
            'Meta': {'unique_together': "(('meta_key', 'vm'),)", 'object_name': 'VirtualMachineMetadata'},
117
            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
118
            'meta_key': ('django.db.models.fields.CharField', [], {'max_length': '50'}),
119
            'meta_value': ('django.db.models.fields.CharField', [], {'max_length': '500'}),
120
            'vm': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'metadata'", 'to': "orm['db.VirtualMachine']"})
121
        }
122
    }
123
    
124
    complete_apps = ['db']
b/snf-cyclades-app/synnefo/db/models.py
27 27
# those of the authors and should not be interpreted as representing official
28 28
# policies, either expressed or implied, of GRNET S.A.
29 29

  
30
import datetime
31

  
30 32
from django.conf import settings
31 33
from django.db import models
32 34

  
......
167 169
            max_length=30, null=True)
168 170
    backendlogmsg = models.TextField(null=True)
169 171
    buildpercentage = models.IntegerField(default=0)
172
    backendtime = models.DateTimeField(default=datetime.datetime.min)
170 173

  
171 174
    # Error classes
172 175
    class InvalidBackendIdError(Exception):
b/snf-cyclades-app/synnefo/logic/backend.py
56 56

  
57 57

  
58 58
@transaction.commit_on_success
59
def process_op_status(vm, jobid, opcode, status, logmsg):
59
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
60 60
    """Process a job progress notification from the backend
61 61

  
62 62
    Process an incoming message from the backend (currently Ganeti).
......
96 96
        vm.deleted = True
97 97
        vm.nics.all().delete()
98 98

  
99
    vm.backendtime = etime
99 100
    # Any other notification of failure leaves the operating state unchanged
100 101

  
101 102
    vm.save()
102 103

  
103 104

  
104 105
@transaction.commit_on_success
105
def process_net_status(vm, nics):
106
def process_net_status(vm, etime, nics):
106 107
    """Process a net status notification from the backend
107 108

  
108 109
    Process an incoming message from the Ganeti backend,
......
145 146
        # network nics modified, update network object
146 147
        net.save()
147 148

  
149
    vm.backendtime = etime
148 150
    vm.save()
149 151

  
150 152

  
151 153
@transaction.commit_on_success
152
def process_create_progress(vm, rprogress, wprogress):
154
def process_create_progress(vm, etime, rprogress, wprogress):
153 155

  
154 156
    # XXX: This only uses the read progress for now.
155 157
    #      Explore whether it would make sense to use the value of wprogress
......
180 182
    #    raise VirtualMachine.IllegalState("VM is not in building state")
181 183

  
182 184
    vm.buildpercentage = percentage
185
    vm.backendtime = etime
183 186
    vm.save()
184 187

  
185 188

  
b/snf-cyclades-app/synnefo/logic/callbacks.py
31 31
# from AMQP queues.
32 32

  
33 33
import logging
34
import socket
35
import traceback
36 34
import json
37
import sys
35
from functools import wraps
36
from datetime import datetime
38 37

  
39 38
from synnefo.db.models import VirtualMachine
40 39
from synnefo.logic import utils, backend
41 40

  
41
from synnefo.lib.utils import merge_time
42

  
42 43

  
43 44
log = logging.getLogger()
44 45

  
45 46

  
46
def update_db(message):
47
def is_update_required(func):
48
    """
49
    Decorator for checking if an incoming message needs to update the db.
50

  
51
    The database will not be updated in the following cases:
52
    - The message has been redelivered and the action has already been
53
      completed. In this case the event_time will be equal with the one
54
      in the database.
55
    - The message describes a previous state in the ganeti, from the one that is
56
      described in the db. In this case the event_time will be smaller from the
57
      one in the database.
58

  
59
    This decorator is also acknowledging the messages to the AMQP broker.
60

  
61
    """
62
    @wraps(func)
63
    def wrapper(client, message, *args, **kwargs):
64
        log.debug("Checking if action is required for msg %s",  message)
65

  
66
        try:
67
            msg = json.loads(message['body'])
68

  
69
            event_time = merge_time(msg['event_time'])
70

  
71
            vm_id = utils.id_from_instance_name(msg["instance"])
72
            vm = VirtualMachine.objects.get(id=vm_id)
73

  
74
            db_time = vm.backendtime
75
            if event_time <= db_time:
76
                format_ = "%d/%m/%y %H:%M:%S:%f"
77
                log.debug("Ignoring message. event_timestamp: %s db_timestamp: %s",
78
                          event_time.strftime(format_),
79
                          db_time.strftime(format_))
80
                client.basic_ack(message)
81
                return
82

  
83
            # New message. Update the database!
84
            func(client, message)
85

  
86
        except ValueError:
87
            log.error("Incoming message not in JSON format: %s", message)
88
            client.basic_ack(message)
89
        except KeyError:
90
            log.error("Malformed incoming JSON, missing attributes: %s",
91
                      message)
92
            client.basic_ack(message)
93
        except VirtualMachine.InvalidBackendIdError:
94
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
95
            client.basic_ack(message)
96
        except VirtualMachine.DoesNotExist:
97
            log.error("VM for instance %s with id %d not found in DB.",
98
                      msg['instance'], vm_id)
99
            client.basic_ack(message)
100
        except Exception as e:
101
            log.exception("Unexpected error: %s, msg: %s", e, msg)
102
        else:
103
            # Acknowledge the message
104
            client.basic_ack(message)
105

  
106
    return wrapper
107

  
108

  
109
@is_update_required
110
def update_db(client, message):
47 111
    """Process a notification of type 'ganeti-op-status'"""
48
    log.debug("Processing ganeti-op-status msg: %s", message.body)
49
    msg = None
50
    try:
51
        msg = json.loads(message.body)
52

  
53
        if msg["type"] != "ganeti-op-status":
54
            log.error("Message is of unknown type %s.", msg["type"])
55
            return
56

  
57
        if msg["operation"] == "OP_INSTANCE_QUERY_DATA":
58
            return status_job_finished(message)
59

  
60
        vmid = utils.id_from_instance_name(msg["instance"])
61
        vm = VirtualMachine.objects.get(id=vmid)
62

  
63
        backend.process_op_status(vm, msg["jobId"], msg["operation"],
64
                                  msg["status"], msg["logmsg"])
65
        log.debug("Done processing ganeti-op-status msg for vm %s.",
66
                      msg["instance"])
67
        message.channel.basic_ack(message.delivery_tag)
68
    except KeyError:
69
        log.error("Malformed incoming JSON, missing attributes: %s",
70
                      message.body)
71
    except VirtualMachine.InvalidBackendIdError:
72
        log.debug("Ignoring msg for unknown instance %s.", msg["instance"])
73
    except VirtualMachine.InvalidBackendMsgError, e:
74
        log.debug("Ignoring msg of unknown type: %s.", e)
75
    except VirtualMachine.DoesNotExist:
76
        log.error("VM for instance %s with id %d not found in DB.",
77
                      msg["instance"], vmid)
78
    except Exception as e:
79
        log.exception("Unexpected error, msg: %s", msg)
112
    log.debug("Processing ganeti-op-status msg: %s", message['body'])
113

  
114
    msg = json.loads(message['body'])
80 115

  
116
    if msg['type'] != "ganeti-op-status":
117
        log.error("Message is of unknown type %s.", msg['type'])
118
        return
81 119

  
82
def update_net(message):
120
    if msg['operation'] == "OP_INSTANCE_QUERY_DATA":
121
        return status_job_finished(client, message)
122

  
123
    vm_id = utils.id_from_instance_name(msg['instance'])
124
    vm = VirtualMachine.objects.get(id=vm_id)
125

  
126
    event_time = merge_time(msg['event_time'])
127
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
128
                              msg['status'], msg['logmsg'])
129

  
130
    log.debug("Done processing ganeti-op-status msg for vm %s.",
131
              msg['instance'])
132

  
133

  
134
@is_update_required
135
def update_net(client, message):
83 136
    """Process a notification of type 'ganeti-net-status'"""
84
    log.debug("Processing ganeti-net-status msg: %s", message.body)
85
    msg = None
86
    try:
87
        msg = json.loads(message.body)
88

  
89
        if msg["type"] != "ganeti-net-status":
90
            log.error("Message is of unknown type %s", msg["type"])
91
            return
92

  
93
        vmid = utils.id_from_instance_name(msg["instance"])
94
        vm = VirtualMachine.objects.get(id=vmid)
95

  
96
        backend.process_net_status(vm, msg["nics"])
97
        log.debug("Done processing ganeti-net-status msg for vm %s.",
98
                      msg["instance"])
99
        message.channel.basic_ack(message.delivery_tag)
100
    except KeyError:
101
        log.error("Malformed incoming JSON, missing attributes: %s",
102
                      message.body)
103
    except VirtualMachine.InvalidBackendIdError:
104
        log.debug("Ignoring msg for unknown instance %s.", msg["instance"])
105
    except VirtualMachine.DoesNotExist:
106
        log.error("VM for instance %s with id %d not found in DB.",
107
                      msg["instance"], vmid)
108
    except Exception as e:
109
        log.exception("Unexpected error, msg: %s", msg)
137
    log.debug("Processing ganeti-net-status msg: %s", message['body'])
110 138

  
139
    msg = json.loads(message['body'])
111 140

  
112
def update_build_progress(message):
113
    """Process a create progress message"""
114
    log.debug("Processing ganeti-create-progress msg: %s", message.body)
115
    msg = None
116
    try:
117
        msg = json.loads(message.body)
118

  
119
        if msg['type'] != "ganeti-create-progress":
120
            log.error("Message is of unknown type %s", msg["type"])
121
            return
122

  
123
        # XXX: The following assumes names like snf-12
124
        vmid = msg['instance'].split('-')[1]
125
        vm = VirtualMachine.objects.get(id=vmid)
126

  
127
        backend.process_create_progress(vm, msg['rprogress'], None)
128
        log.debug("Done processing ganeti-create-progress msg for vm %s.",
129
                      msg["instance"])
130
        message.channel.basic_ack(message.delivery_tag)
131
    except KeyError:
132
        log.error("Malformed incoming JSON, missing attributes: %s",
133
                      message.body)
134
    except Exception as e:
135
        log.exception("Unexpected error, msg: %s", msg)
136
        raise
141
    if msg['type'] != "ganeti-net-status":
142
        log.error("Message is of unknown type %s", msg['type'])
143
        return
137 144

  
145
    vm_id = utils.id_from_instance_name(msg['instance'])
146
    vm = VirtualMachine.objects.get(id=vm_id)
138 147

  
139
def trigger_status_update(message):
140
    """Triggers a status update job for a specific VM id"""
141
    log.debug("Request to trigger status update: %s", message.body)
142
    msg = None
143
    try:
144
        msg = json.loads(message.body)
148
    event_time = merge_time(msg['event_time'])
149
    backend.process_net_status(vm, event_time, msg['nics'])
145 150

  
146
        if msg["type"] != "reconcile":
147
             log.error("Message is of unknown type %s", msg["type"])
148
             return
151
    log.debug("Done processing ganeti-net-status msg for vm %s.",
152
              msg["instance"])
149 153

  
150
        if msg["vmid"] == "":
151
            log.error("Reconciliation message does not specify a VM id")
152
            return
153 154

  
154
        vm = VirtualMachine.objects.get(id=msg["vmid"])
155
        backend.request_status_update(vm)
155
@is_update_required
156
def update_build_progress(client, message):
157
    """Process a create progress message"""
158
    log.debug("Processing ganeti-create-progress msg: %s", message['body'])
156 159

  
157
        message.channel.basic_ack(message.delivery_tag)
158
    except KeyError as k:
159
        log.error("Malformed incoming JSON, missing attributes: %s", k)
160
    except Exception as e:
161
        log.exception("Unexpected error, msg: %s", msg)
160
    msg = json.loads(message['body'])
162 161

  
162
    if msg['type'] != "ganeti-create-progress":
163
        log.error("Message is of unknown type %s", msg['type'])
164
        return
163 165

  
164
def status_job_finished(message):
165
    """Updates VM status based on a previously sent status update request"""
166
    msg = None
167
    try:
168
        msg = json.loads(message.body)
166
    vm_id = utils.id_from_instance_name(msg['instance'])
167
    vm = VirtualMachine.objects.get(id=vm_id)
169 168

  
170
        if msg["operation"] != 'OP_INSTANCE_QUERY_DATA':
171
            log.error("Message is of unknown type %s", msg["operation"])
172
            return
169
    event_time = merge_time(msg['event_time'])
170
    backend.process_create_progress(vm, event_time, msg['rprogress'], None)
173 171

  
174
        if msg["status"] != "success":
175
            log.warn("Ignoring non-success status update from job %d on VM %s",
176
                          msg['jobId'], msg['instance'])
177
            message.channel.basic_ack(message.delivery_tag)
178
            return
172
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
173
              msg['instance'])
179 174

  
180
        status = backend.get_job_status(msg['jobId'])
181 175

  
182
        log.debug("Node status job result: %s", status)
176
def status_job_finished(client, message):
177
    """Updates VM status based on a previously sent status update request"""
183 178

  
184
        if status['summary'][0] != u'INSTANCE_QUERY_DATA':
185
             log.error("Status update is of unknown type %s",
186
                        status['summary'])
187
             return
179
    msg = json.loads(message['body'])
188 180

  
189
        conf_state = status['opresult'][0][msg['instance']]['config_state']
190
        run_state = status['opresult'][0][msg['instance']]['run_state']
181
    if msg['operation'] != 'OP_INSTANCE_QUERY_DATA':
182
        log.error("Message is of unknown type %s", msg['operation'])
183
        return
191 184

  
192
        # XXX: The following assumes names like snf-12
193
        instid = msg['instance'].split('-')[1]
185
    if msg['status'] != 'success':
186
        log.warn("Ignoring non-success status update from job %d on VM %s",
187
                 msg['jobId'], msg['instance'])
188
        client.basic_ack(message.delivery_tag)
189
        return
194 190

  
195
        vm = VirtualMachine.objects.get(id = instid)
191
    status = backend.get_job_status(msg['jobId'])
196 192

  
197
        if run_state == "up":
198
            opcode = "OP_INSTANCE_REBOOT"
199
        else:
200
            opcode = "OP_INSTANCE_SHUTDOWN"
193
    log.debug("Node status job result: %s", status)
201 194

  
202
        backend.process_op_status(vm=vm, jobid=msg['jobId'],opcode=opcode,
203
                                  status="success",
204
                                  logmsg="Reconciliation: simulated event")
195
    if status['summary'][0] != u'INSTANCE_QUERY_DATA':
196
         log.error("Status update is of unknown type %s",
197
                    status['summary'])
198
         return
205 199

  
206
        message.channel.basic_ack(message.delivery_tag)
207
    except KeyError as k:
208
        log.error("Malformed incoming JSON, missing attributes: %s", k)
209
    except Exception as e:
210
        log.exception("Unexpected error, msg: %s", msg)
200
    conf_state = status['opresult'][0][msg['instance']]['config_state']
201
    run_state = status['opresult'][0][msg['instance']]['run_state']
202

  
203
    vm_id = utils.id_from_instance_name(msg['instance'])
204
    vm = VirtualMachine.objects.get(id = vm_id)
211 205

  
206
    if run_state == "up":
207
        opcode = "OP_INSTANCE_REBOOT"
208
    else:
209
        opcode = "OP_INSTANCE_SHUTDOWN"
212 210

  
213
def dummy_proc(message):
211
    event_time = merge_time(msg['event_time'])
212
    backend.process_op_status(vm=vm, etime=event_time,
213
                              jobid=msg['jobId'], opcode=opcode,
214
                              status='success',
215
                              logmsg="Reconciliation: simulated event")
216

  
217
@is_update_required
218
def dummy_proc(client, message):
214 219
    try:
215
        log.debug("Msg: %s", message.body)
216
        message.channel.basic_ack(message.delivery_tag)
220
        log.debug("Msg: %s", message['body'])
221
        pass
217 222
    except Exception as e:
218 223
        log.exception("Could not receive message")
219
        pass
b/snf-cyclades-app/synnefo/logic/dispatcher.py
54 54
import socket
55 55
from daemon import daemon
56 56

  
57
from synnefo.lib.amqp import AMQPClient
58

  
57 59
# Take care of differences between python-daemon versions.
58 60
try:
59 61
    from daemon import pidfile
......
75 77

  
76 78

  
77 79
class Dispatcher:
78
    chan = None
79 80
    debug = False
80
    clienttags = []
81
    client_promises = []
81 82

  
82 83
    def __init__(self, debug=False):
83 84
        self.debug = debug
84 85
        self._init()
85 86

  
86 87
    def wait(self):
88
        log.info("Waiting for messages..")
87 89
        while True:
88 90
            try:
89
                self.chan.wait()
91
                self.client.basic_wait()
90 92
            except SystemExit:
91 93
                break
92
            except amqp.exceptions.AMQPConnectionException:
93
                log.error("Server went away, reconnecting...")
94
                self._init()
95
            except socket.error:
96
                log.error("Server went away, reconnecting...")
97
                self._init()
98
            except Exception, e:
99
                log.exception("Caught unexpected exception")
100

  
101
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
102
        self.chan.connection.close()
103
        self.chan.close()
94
            except Exception as e:
95
                log.exception("Caught unexpected exception: %s", e)
96

  
97
        self.client.basic_cancel()
98
        self.client.close()
104 99

  
105 100
    def _init(self):
106 101
        global QUEUES, BINDINGS
107 102
        log.info("Initializing")
108 103

  
109
        # Connect to RabbitMQ
110
        conn = None
111
        while conn == None:
112
            log.info("Attempting to connect to %s", settings.RABBIT_HOST)
113
            try:
114
                conn = amqp.Connection(host=settings.RABBIT_HOST,
115
                                       userid=settings.RABBIT_USERNAME,
116
                                       password=settings.RABBIT_PASSWORD,
117
                                       virtual_host=settings.RABBIT_VHOST)
118
            except socket.error:
119
                log.error("Failed to connect to %s, retrying in 10s",
120
                                  settings.RABBIT_HOST)
121
                time.sleep(10)
122

  
123
        log.info("Connection succesful, opening channel")
124
        self.chan = conn.channel()
104
        self.client = AMQPClient()
105
        # Connect to AMQP host
106
        self.client.connect()
125 107

  
126 108
        # Declare queues and exchanges
127 109
        for exchange in settings.EXCHANGES:
128
            self.chan.exchange_declare(exchange=exchange, type="topic",
129
                                       durable=True, auto_delete=False)
110
            self.client.exchange_declare(exchange_name=exchange,
111
                                         exchange_type="topic")
130 112

  
131 113
        for queue in QUEUES:
132
            self.chan.queue_declare(queue=queue, durable=True,
133
                                    exclusive=False, auto_delete=False)
114
            # Queues are mirrored to all RabbitMQ brokers
115
            self.client.queue_declare(queue=queue,mirrored=True)
134 116

  
135 117
        bindings = BINDINGS
136 118

  
......
142 124
                log.error("Cannot find callback %s", binding[3])
143 125
                raise SystemExit(1)
144 126

  
145
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
146
                                 routing_key=binding[2])
147
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
127
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
128
                                   routing_key=binding[2])
129

  
130
            consume_promise = self.client.basic_consume(queue=binding[0],
131
                                                        callback=callback)
132

  
148 133
            log.debug("Binding %s(%s) to queue %s with handler %s",
149
                              binding[1], binding[2], binding[0], binding[3])
150
            self.clienttags.append(tag)
134
                      binding[1], binding[2], binding[0], binding[3])
135
            self.client_promises.append(consume_promise)
151 136

  
152 137

  
153 138
def _init_queues():
......
180 165
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
181 166
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
182 167
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
183
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   RECONC_HANDLER,         'trigger_status_update'),
184 168
    ]
185 169

  
186 170
    if settings.DEBUG is True:
......
248 232
        Delete declared queues from RabbitMQ. Use with care!
249 233
    """
250 234
    global QUEUES, BINDINGS
251
    conn = get_connection()
252
    chan = conn.channel()
235
    client = AMQPClient()
236
    client.connect()
253 237

  
254 238
    print "Queues to be deleted: ", QUEUES
255 239

  
......
257 241
        return
258 242

  
259 243
    for queue in QUEUES:
260
        try:
261
            chan.queue_delete(queue=queue)
262
            print "Deleting queue %s" % queue
263
        except amqp.exceptions.AMQPChannelException as e:
264
            print e.amqp_reply_code, " ", e.amqp_reply_text
265
            chan = conn.channel()
244
        result = client.queue_delete(queue=queue)
245
        print "Deleting queue %s. Result: %s" % (queue, result)
266 246

  
267
    chan.connection.close()
247
    client.close()
268 248

  
269 249

  
270 250
def purge_exchanges():
......
272 252
    global QUEUES, BINDINGS
273 253
    purge_queues()
274 254

  
275
    conn = get_connection()
276
    chan = conn.channel()
255
    client = AMQPClient()
256
    client.connect()
277 257

  
278 258
    print "Exchanges to be deleted: ", settings.EXCHANGES
279 259

  
......
281 261
        return
282 262

  
283 263
    for exchange in settings.EXCHANGES:
284
        try:
285
            chan.exchange_delete(exchange=exchange)
286
        except amqp.exceptions.AMQPChannelException as e:
287
            print e.amqp_reply_code, " ", e.amqp_reply_text
264
        result = client.exchange_delete(exchange=exchange)
265
        print "Deleting exchange %s. Result: %s" % (exchange, result)
288 266

  
289
    chan.connection.close()
267
    client.close()
290 268

  
291 269

  
292 270
def drain_queue(queue):
......
303 281

  
304 282
    if not get_user_confirmation():
305 283
        return
306
    conn = get_connection()
307
    chan = conn.channel()
284

  
285
    client = AMQPClient()
286
    client.connect()
308 287

  
309 288
    # Register a temporary queue binding
310 289
    for binding in BINDINGS:
311 290
        if binding[0] == queue:
312 291
            exch = binding[1]
313 292

  
314
    if not exch:
315
        print "Queue not bound to any exchange: %s" % queue
316
        return
317

  
318
    chan.queue_bind(queue=queue, exchange=exch, routing_key='#')
319
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
293
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
320 294

  
321 295
    print "Queue draining about to start, hit Ctrl+c when done"
322 296
    time.sleep(2)
......
327 301

  
328 302
    num_processed = 0
329 303
    while True:
330
        chan.wait()
304
        client.basic_wait()
331 305
        num_processed += 1
332 306
        sys.stderr.write("Ignored %d messages\r" % num_processed)
333 307

  
334
    chan.basic_cancel(tag)
335
    chan.connection.close()
336

  
308
    client.basic_cancel(tag)
309
    client.close()
337 310

  
338
def get_connection():
339
    conn = amqp.Connection(host=settings.RABBIT_HOST,
340
                           userid=settings.RABBIT_USERNAME,
341
                           password=settings.RABBIT_PASSWORD,
342
                           virtual_host=settings.RABBIT_VHOST)
343
    return conn
344 311

  
345 312

  
346 313
def get_user_confirmation():
b/snf-cyclades-app/synnefo/logic/management/commands/reconcile.py
35 35

  
36 36
"""
37 37
import sys
38
import datetime
38 39

  
39 40
from optparse import make_option
40 41

  
......
144 145
                "Simulating successful Ganeti removal for %d " \
145 146
                "servers in the DB:" % len(stale)
146 147
            for vm in VirtualMachine.objects.filter(pk__in=stale):
147
                backend.process_op_status(vm=vm, jobid=-0,
148
                event_time = datetime.datetime.now()
149
                backend.process_op_status(vm=vm, etime=event_time, jobid=-0,
148 150
                    opcode='OP_INSTANCE_REMOVE', status='success',
149 151
                    logmsg='Reconciliation: simulated Ganeti event')
150 152
            print >> sys.stderr, "    ...done"
......
166 168
                vm = VirtualMachine.objects.get(pk=id)
167 169
                opcode = "OP_INSTANCE_REBOOT" if ganeti_up \
168 170
                         else "OP_INSTANCE_SHUTDOWN"
169
                backend.process_op_status(vm=vm, jobid=-0,
171
                event_time = datetime.datetime.now()
172
                backend.process_op_status(vm=vm, etime=event_time ,jobid=-0,
170 173
                    opcode=opcode, status='success',
171 174
                    logmsg='Reconciliation: simulated Ganeti event')
172 175
            print >> sys.stderr, "    ...done"
b/snf-cyclades-gtools/synnefo/ganeti/eventd.py
46 46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47 47
sys.path.append(path)
48 48

  
49
import time
50 49
import json
51 50
import logging
52 51
import pyinotify
......
55 54
import socket
56 55
from signal import signal, SIGINT, SIGTERM
57 56

  
58
from amqplib import client_0_8 as amqp
59

  
60 57
from ganeti import utils
61 58
from ganeti import jqueue
62 59
from ganeti import constants
63 60
from ganeti import serializer
64 61

  
65 62
from synnefo import settings
63
from synnefo.lib.amqp import AMQPClient
64

  
65
def get_time_from_status(op, job):
66
    """Generate a unique message identifier for a ganeti job.
67

  
68
    The identifier is based on the timestamp of the job. Since a ganeti
69
    job passes from multiple states, we need to pick the timestamp that
70
    corresponds to each state.
71

  
72
    """
73
    status = op.status
74
    if status == constants.JOB_STATUS_QUEUED:
75
        return job.received_timestamp
76
    if status == constants.JOB_STATUS_WAITLOCK:
77
    #if status == constants.JOB_STATUS_WAITING:
78
        return op.start_timestamp
79
    if status == constants.JOB_STATUS_CANCELING:
80
        return op.start_timestamp
81
    if status == constants.JOB_STATUS_RUNNING:
82
        return op.exec_timestamp
83
    if status in constants.JOBS_FINALIZED:
84
        # success, canceled, error
85
        return op.end_timestamp
86

  
87
    raise InvalidBackendState(status, job)
88

  
89

  
90
class InvalidBackendStatus(Exception):
91
    def __init__(self, status, job):
92
        self.status = status
93
        self.job = job
94

  
95
    def __str__(self):
96
        return repr("Invalid backend status: %s in job %s"
97
                    % (self.status, self.job))
98

  
66 99

  
67 100
class JobFileHandler(pyinotify.ProcessEvent):
68 101
    def __init__(self, logger):
69 102
        pyinotify.ProcessEvent.__init__(self)
70 103
        self.logger = logger
71
        self.chan = None
72

  
73
    def open_channel(self):
74
        conn = None
75
        while conn == None:
76
            handler_logger.info("Attempting to connect to %s",
77
                settings.RABBIT_HOST)
78
            try:
79
                conn = amqp.Connection(host=settings.RABBIT_HOST,
80
                     userid=settings.RABBIT_USERNAME,
81
                     password=settings.RABBIT_PASSWORD,
82
                     virtual_host=settings.RABBIT_VHOST)
83
            except socket.error:
84
                time.sleep(1)
85

  
86
        handler_logger.info("Connection succesful, opening channel")
87
        return conn.channel()
104
        self.client = AMQPClient()
105
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
106
        self.client.connect()
107
        handler_logger.info("Connected succesfully")
88 108

  
89 109
    def process_IN_CLOSE_WRITE(self, event):
90 110
        self.process_IN_MOVED_TO(event)
91 111

  
92 112
    def process_IN_MOVED_TO(self, event):
93
        if self.chan == None:
94
            self.chan = self.open_channel()
95

  
96 113
        jobfile = os.path.join(event.path, event.name)
97 114
        if not event.name.startswith("job-"):
98 115
            self.logger.debug("Not a job file: %s" % event.path)
......
128 145
            except IndexError:
129 146
                logmsg = None
130 147

  
148
            # Generate a unique message identifier
149
            event_time = get_time_from_status(op, job)
150

  
131 151
            self.logger.debug("Job: %d: %s(%s) %s %s",
132 152
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
133 153

  
134 154
            # Construct message
135 155
            msg = {
156
                    "event_time" : event_time,
136 157
                    "type": "ganeti-op-status",
137 158
                    "instance": instances,
138 159
                    "operation": op.input.OP_ID,
......
140 161
                    "status": op.status,
141 162
                    "logmsg": logmsg
142 163
                    }
143
            if logmsg:
144
                msg["message"] = logmsg
145

  
146
            instance = instances.split('-')[0]
147
            routekey = "ganeti.%s.event.op" % instance
148

  
149
            self.logger.debug("Delivering msg: %s (key=%s)",
150
                json.dumps(msg), routekey)
151
            msg = amqp.Message(json.dumps(msg))
152
            msg.properties["delivery_mode"] = 2  # Persistent
153

  
154
            while True:
155
                try:
156
                    self.chan.basic_publish(msg,
157
                            exchange=settings.EXCHANGE_GANETI,
158
                            routing_key=routekey)
159
                    self.logger.debug("Message published to AMQP successfully")
160
                    return
161
                except socket.error:
162
                    self.logger.exception("Server went away, reconnecting...")
163
                    self.chan = self.open_channel()
164
                except Exception:
165
                    self.logger.exception("Caught unexpected exception, msg: ",
166
                                          msg)
167
                    raise
164

  
165
            instance_prefix = instances.split('-')[0]
166
            routekey = "ganeti.%s.event.op" % instance_prefix
167

  
168
            self.logger.debug("Delivering msg: %s (key=%s)", json.dumps(msg),
169
                              routekey)
170
            msg = json.dumps(msg)
171

  
172
            # Send the message to RabbitMQ
173
            self.client.basic_publish(exchange=settings.EXCHANGE_GANETI,
174
                                      routing_key=routekey,
175
                                      body=msg)
168 176

  
169 177
handler_logger = None
170 178
def fatal_signal_handler(signum, frame):
b/snf-cyclades-gtools/synnefo/ganeti/hook.py
45 45
import os
46 46
import subprocess
47 47

  
48
import time
49 48
import json
50 49
import socket
51 50
import logging
52 51

  
53
from amqplib import client_0_8 as amqp
52
from time import time
54 53

  
55 54
from synnefo import settings
55
from synnefo.lib.amqp import AMQPClient
56
from synnefo.lib.utils import split_time
56 57

  
57 58

  
58 59
def mac2eui64(mac, prefixstr):
......
135 136
        nics_list.append(nics[i])
136 137

  
137 138
    msg = {
139
        "event_time": split_time(time()),
138 140
        "type": "ganeti-net-status",
139 141
        "instance": instance,
140 142
        "nics": nics_list
......
149 151
        self.environ = environ
150 152
        self.instance = instance
151 153
        self.prefix = prefix
154
        # Retry up to two times(per host) to open a channel to RabbitMQ.
155
        # The hook needs to abort if this count is exceeded, because it
156
        # runs synchronously with VM creation inside Ganeti, and may only
157
        # run for a finite amount of time.
158

  
159
        # FIXME: We need a reconciliation mechanism between the DB and
160
        #        Ganeti, for cases exactly like this.
161
        self.client = AMQPClient(max_retries= 2*len(settings.AMQP_HOSTS))
162
        self.client.connect()
152 163

  
153 164
    def on_master(self):
154 165
        """Return True if running on the Ganeti master"""
......
158 169
        for (msgtype, msg) in msgs:
159 170
            routekey = "ganeti.%s.event.%s" % (self.prefix, msgtype)
160 171
            self.logger.debug("Pushing message to RabbitMQ: %s (key = %s)",
161
                json.dumps(msg), routekey)
162
            msg = amqp.Message(json.dumps(msg))
163
            msg.properties["delivery_mode"] = 2  # Persistent
164

  
165
            # Retry up to five times to open a channel to RabbitMQ.
166
            # The hook needs to abort if this count is exceeded, because it
167
            # runs synchronously with VM creation inside Ganeti, and may only
168
            # run for a finite amount of time.
169
            #
170
            # FIXME: We need a reconciliation mechanism between the DB and
171
            #        Ganeti, for cases exactly like this.
172
            conn = None
173
            sent = False
174
            retry = 0
175
            while not sent and retry < 5:
176
                self.logger.debug("Attempting to publish to RabbitMQ at %s",
177
                    settings.RABBIT_HOST)
178
                try:
179
                    if not conn:
180
                        conn = amqp.Connection(host=settings.RABBIT_HOST,
181
                            userid=settings.RABBIT_USERNAME,
182
                            password=settings.RABBIT_PASSWORD,
183
                            virtual_host=settings.RABBIT_VHOST)
184
                        chann = conn.channel()
185
                        self.logger.debug("Successfully connected to RabbitMQ at %s",
186
                            settings.RABBIT_HOST)
187

  
188
                    chann.basic_publish(msg,
189
                        exchange=settings.EXCHANGE_GANETI,
190
                        routing_key=routekey)
191
                    sent = True
192
                    self.logger.debug("Successfully sent message to RabbitMQ")
193
                except socket.error:
194
                    conn = False
195
                    retry += 1
196
                    self.logger.exception("Publish to RabbitMQ failed, retry=%d in 1s",
197
                        retry)
198
                    time.sleep(1)
199

  
200
            if not sent:
201
                raise Exception("Publish to RabbitMQ failed after %d tries, aborting" % retry)
202

  
172
                              json.dumps(msg), routekey)
173
            msg = json.dumps(msg)
174
            self.client.basic_publish(exchange=settings.EXCHANGE_GANETI,
175
                                      routing_key=routekey,
176
                                      body=msg)
177
        self.client.close()
203 178

  
204 179
class PostStartHook(GanetiHook):
205 180
    """Post-instance-startup Ganeti Hook.
b/snf-cyclades-gtools/synnefo/ganeti/progress_monitor.py
51 51
import signal
52 52
import socket
53 53

  
54
from amqplib import client_0_8 as amqp
55

  
56 54
from synnefo import settings
57

  
58

  
59
class AMQPClient(object):
60
    def __init__(self, routekey):
61
        self.conn = None
62
        self.chan = None
63
        self.routekey = routekey
64

  
65
    def open_channel(self):
66
        if not self.conn:
67
            try:
68
                sys.stderr.write("Attempting to connect to %s\n" %
69
                                 settings.RABBIT_HOST)
70
                self.conn = amqp.Connection(host=settings.RABBIT_HOST,
71
                                            userid=settings.RABBIT_USERNAME,
72
                                            password=settings.RABBIT_PASSWORD,
73
                                            virtual_host=settings.RABBIT_VHOST)
74
            except socket.error:
75
                sys.stderr.write("Connection failed, will retry in 1s\n")
76
                time.sleep(1)
77

  
78
        if self.conn:
79
            sys.stderr.write("Connection succesful, opening channel\n")
80
            self.chan = self.conn.channel()
81

  
82
    def send_message(self, msg):
83
        sys.stderr.write("Delivering msg with key=%s:\n%s\n" %
84
                         (self.routekey, json.dumps(msg)))
85
        msg = amqp.Message(json.dumps(msg))
86
        msg.properties["delivery_mode"] = 2  # Persistent
87

  
88
        if not self.chan:
89
            self.open_channel()
90
        if not self.chan:
91
            return
92

  
93
        try:
94
            self.chan.basic_publish(msg,
95
                                    exchange=settings.EXCHANGE_GANETI,
96
                                    routing_key=self.routekey)
97
        except socket.error:
98
            sys.stderr.write("Server went away, reconnecting...\n")
99
            self.conn = None
100
            self.chan = None
55
from synnefo.lib.amqp import AMQPClient
56
from synnefo.lib.utils import split_time
101 57

  
102 58

  
103 59
def parse_arguments(args):
......
174 130
    # determine the routekey for AMPQ
175 131
    prefix = opts.instance_name.split('-')[0]
176 132
    routekey = "ganeti.%s.event.progress" % prefix
177
    amqp = AMQPClient(routekey)
133
    amqp_client = AMQPClient()
134
    amqp_client.connect()
178 135

  
179 136
    pid = os.fork()
180 137
    if pid == 0:
......
208 165
                    # send a final notification
209 166
                    final_msg = dict(type="ganeti-create-progress",
210 167
                                     instance=opts.instance_name)
168
                    final_msg['event_time'] = split_time(time.time())
211 169
                    if opts.read_bytes:
212 170
                        final_msg['rprogress'] = float(100)
213 171
                    if opts.write_bytes:
214 172
                        final_msg['wprogress'] = float(100)
215
                    amqp.send_message(final_msg)
173
                    amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI,
174
                                              routing_key=routekey,
175
                                              body=json.dumps(final_msg))
216 176
                    return 0
217 177

  
218 178
        # retrieve the current values of the read/write byte counters
......
226 186
        # Construct notification of type 'ganeti-create-progress'
227 187
        msg = dict(type="ganeti-create-progress",
228 188
                   instance=opts.instance_name)
189
        msg['event_time'] = split_time(time.time())
229 190
        if opts.read_bytes:
230 191
            msg['rprogress'] = float("%2.2f" %
231 192
                                     (rchar * 100.0 / opts.read_bytes))
......
234 195
                                     (wchar * 100.0 / opts.write_bytes))
235 196

  
236 197
        # and send it over AMQP
237
        amqp.send_message(msg)
198
        amqp_client.basic_publish(exchange=settings.EXCHANGE_GANETI,
199
                                  routing_key=routekey,
200
                                  body=json.dumps(msg))
238 201

  
239 202
        # Sleep for a while
240 203
        time.sleep(3)

Also available in: Unified diff