Revision a4eae71f
b/lib/bootstrap.py | ||
---|---|---|
289 | 289 |
InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config) |
290 | 290 |
cfg = config.ConfigWriter() |
291 | 291 |
ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE) |
292 |
cfg.Update(cfg.GetClusterInfo()) |
|
292 |
cfg.Update(cfg.GetClusterInfo(), logging.error)
|
|
293 | 293 |
|
294 | 294 |
# start the master ip |
295 | 295 |
# TODO: Review rpc call from bootstrap |
... | ... | |
482 | 482 |
cluster_info.master_node = new_master |
483 | 483 |
# this will also regenerate the ssconf files, since we updated the |
484 | 484 |
# cluster info |
485 |
cfg.Update(cluster_info) |
|
485 |
cfg.Update(cluster_info, logging.error)
|
|
486 | 486 |
|
487 | 487 |
result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting) |
488 | 488 |
msg = result.fail_msg |
b/lib/cmdlib.py | ||
---|---|---|
1730 | 1730 |
" correcting: recorded %d, actual %d", idx, |
1731 | 1731 |
instance.name, disk.size, size) |
1732 | 1732 |
disk.size = size |
1733 |
self.cfg.Update(instance) |
|
1733 |
self.cfg.Update(instance, feedback_fn)
|
|
1734 | 1734 |
changed.append((instance.name, idx, size)) |
1735 | 1735 |
if self._EnsureChildSizes(disk): |
1736 |
self.cfg.Update(instance) |
|
1736 |
self.cfg.Update(instance, feedback_fn)
|
|
1737 | 1737 |
changed.append((instance.name, idx, disk.size)) |
1738 | 1738 |
return changed |
1739 | 1739 |
|
... | ... | |
1794 | 1794 |
cluster = self.cfg.GetClusterInfo() |
1795 | 1795 |
cluster.cluster_name = clustername |
1796 | 1796 |
cluster.master_ip = ip |
1797 |
self.cfg.Update(cluster) |
|
1797 |
self.cfg.Update(cluster, feedback_fn)
|
|
1798 | 1798 |
|
1799 | 1799 |
# update the known hosts file |
1800 | 1800 |
ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE) |
... | ... | |
1988 | 1988 |
# we need to update the pool size here, otherwise the save will fail |
1989 | 1989 |
_AdjustCandidatePool(self, []) |
1990 | 1990 |
|
1991 |
self.cfg.Update(self.cluster) |
|
1991 |
self.cfg.Update(self.cluster, feedback_fn)
|
|
1992 | 1992 |
|
1993 | 1993 |
|
1994 | 1994 |
def _RedistributeAncillaryFiles(lu, additional_nodes=None): |
... | ... | |
2009 | 2009 |
dist_nodes.extend(additional_nodes) |
2010 | 2010 |
if myself.name in dist_nodes: |
2011 | 2011 |
dist_nodes.remove(myself.name) |
2012 |
|
|
2012 | 2013 |
# 2. Gather files to distribute |
2013 | 2014 |
dist_files = set([constants.ETC_HOSTS, |
2014 | 2015 |
constants.SSH_KNOWN_HOSTS_FILE, |
... | ... | |
2058 | 2059 |
"""Redistribute the configuration. |
2059 | 2060 |
|
2060 | 2061 |
""" |
2061 |
self.cfg.Update(self.cfg.GetClusterInfo()) |
|
2062 |
self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
|
|
2062 | 2063 |
_RedistributeAncillaryFiles(self) |
2063 | 2064 |
|
2064 | 2065 |
|
... | ... | |
2958 | 2959 |
_RedistributeAncillaryFiles(self) |
2959 | 2960 |
self.context.ReaddNode(new_node) |
2960 | 2961 |
# make sure we redistribute the config |
2961 |
self.cfg.Update(new_node) |
|
2962 |
self.cfg.Update(new_node, feedback_fn)
|
|
2962 | 2963 |
# and make sure the new node will not have old files around |
2963 | 2964 |
if not new_node.master_candidate: |
2964 | 2965 |
result = self.rpc.call_node_demote_from_mc(new_node.name) |
... | ... | |
3113 | 3114 |
result.append(("offline", "clear offline status due to drain")) |
3114 | 3115 |
|
3115 | 3116 |
# this will trigger configuration file update, if needed |
3116 |
self.cfg.Update(node) |
|
3117 |
self.cfg.Update(node, feedback_fn)
|
|
3117 | 3118 |
# this will trigger job queue propagation or cleanup |
3118 | 3119 |
if changed_mc: |
3119 | 3120 |
self.context.ReaddNode(node) |
... | ... | |
3828 | 3829 |
if self.op.os_type is not None: |
3829 | 3830 |
feedback_fn("Changing OS to '%s'..." % self.op.os_type) |
3830 | 3831 |
inst.os = self.op.os_type |
3831 |
self.cfg.Update(inst) |
|
3832 |
self.cfg.Update(inst, feedback_fn)
|
|
3832 | 3833 |
|
3833 | 3834 |
_StartInstanceDisks(self, inst, None) |
3834 | 3835 |
try: |
... | ... | |
4501 | 4502 |
|
4502 | 4503 |
instance.primary_node = target_node |
4503 | 4504 |
# distribute new instance config to the other nodes |
4504 |
self.cfg.Update(instance) |
|
4505 |
self.cfg.Update(instance, feedback_fn)
|
|
4505 | 4506 |
|
4506 | 4507 |
# Only start the instance if it's marked as up |
4507 | 4508 |
if instance.admin_up: |
... | ... | |
4725 | 4726 |
(",".join(errs),)) |
4726 | 4727 |
|
4727 | 4728 |
instance.primary_node = target_node |
4728 |
self.cfg.Update(instance) |
|
4729 |
self.cfg.Update(instance, feedback_fn)
|
|
4729 | 4730 |
|
4730 | 4731 |
self.LogInfo("Removing the disks on the original node") |
4731 | 4732 |
_RemoveDisks(self, instance, target_node=source_node) |
... | ... | |
4963 | 4964 |
self.feedback_fn("* instance running on secondary node (%s)," |
4964 | 4965 |
" updating config" % target_node) |
4965 | 4966 |
instance.primary_node = target_node |
4966 |
self.cfg.Update(instance) |
|
4967 |
self.cfg.Update(instance, self.feedback_fn)
|
|
4967 | 4968 |
demoted_node = source_node |
4968 | 4969 |
else: |
4969 | 4970 |
self.feedback_fn("* instance confirmed to be running on its" |
... | ... | |
5090 | 5091 |
|
5091 | 5092 |
instance.primary_node = target_node |
5092 | 5093 |
# distribute new instance config to the other nodes |
5093 |
self.cfg.Update(instance) |
|
5094 |
self.cfg.Update(instance, self.feedback_fn)
|
|
5094 | 5095 |
|
5095 | 5096 |
result = self.rpc.call_finalize_migration(target_node, |
5096 | 5097 |
instance, |
... | ... | |
6040 | 6041 |
|
6041 | 6042 |
if self.op.start: |
6042 | 6043 |
iobj.admin_up = True |
6043 |
self.cfg.Update(iobj) |
|
6044 |
self.cfg.Update(iobj, feedback_fn)
|
|
6044 | 6045 |
logging.info("Starting instance %s on node %s", instance, pnode_name) |
6045 | 6046 |
feedback_fn("* starting instance...") |
6046 | 6047 |
result = self.rpc.call_instance_start(pnode_name, iobj, None, None) |
... | ... | |
6490 | 6491 |
try: |
6491 | 6492 |
# Should we replace the secondary node? |
6492 | 6493 |
if self.new_node is not None: |
6493 |
return self._ExecDrbd8Secondary()
|
|
6494 |
fn = self._ExecDrbd8Secondary
|
|
6494 | 6495 |
else: |
6495 |
return self._ExecDrbd8DiskOnly() |
|
6496 |
fn = self._ExecDrbd8DiskOnly |
|
6497 |
|
|
6498 |
return fn(feedback_fn) |
|
6496 | 6499 |
|
6497 | 6500 |
finally: |
6498 | 6501 |
# Deactivate the instance disks if we're replacing them on a down instance |
... | ... | |
6608 | 6611 |
self.lu.LogWarning("Can't remove old LV: %s" % msg, |
6609 | 6612 |
hint="remove unused LVs manually") |
6610 | 6613 |
|
6611 |
def _ExecDrbd8DiskOnly(self): |
|
6614 |
def _ExecDrbd8DiskOnly(self, feedback_fn):
|
|
6612 | 6615 |
"""Replace a disk on the primary or secondary for DRBD 8. |
6613 | 6616 |
|
6614 | 6617 |
The algorithm for replace is quite complicated: |
... | ... | |
6716 | 6719 |
|
6717 | 6720 |
dev.children = new_lvs |
6718 | 6721 |
|
6719 |
self.cfg.Update(self.instance) |
|
6722 |
self.cfg.Update(self.instance, feedback_fn)
|
|
6720 | 6723 |
|
6721 | 6724 |
# Wait for sync |
6722 | 6725 |
# This can fail as the old devices are degraded and _WaitForSync |
... | ... | |
6731 | 6734 |
self.lu.LogStep(6, steps_total, "Removing old storage") |
6732 | 6735 |
self._RemoveOldStorage(self.target_node, iv_names) |
6733 | 6736 |
|
6734 |
def _ExecDrbd8Secondary(self): |
|
6737 |
def _ExecDrbd8Secondary(self, feedback_fn):
|
|
6735 | 6738 |
"""Replace the secondary node for DRBD 8. |
6736 | 6739 |
|
6737 | 6740 |
The algorithm for replace is quite complicated: |
... | ... | |
6844 | 6847 |
dev.logical_id = new_logical_id |
6845 | 6848 |
self.cfg.SetDiskID(dev, self.instance.primary_node) |
6846 | 6849 |
|
6847 |
self.cfg.Update(self.instance) |
|
6850 |
self.cfg.Update(self.instance, feedback_fn)
|
|
6848 | 6851 |
|
6849 | 6852 |
# and now perform the drbd attach |
6850 | 6853 |
self.lu.LogInfo("Attaching primary drbds to new secondary" |
... | ... | |
7015 | 7018 |
result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) |
7016 | 7019 |
result.Raise("Grow request failed to node %s" % node) |
7017 | 7020 |
disk.RecordGrow(self.op.amount) |
7018 |
self.cfg.Update(instance) |
|
7021 |
self.cfg.Update(instance, feedback_fn)
|
|
7019 | 7022 |
if self.op.wait_for_sync: |
7020 | 7023 |
disk_abort = not _WaitForSync(self, instance) |
7021 | 7024 |
if disk_abort: |
... | ... | |
7685 | 7688 |
for key, val in self.op.beparams.iteritems(): |
7686 | 7689 |
result.append(("be/%s" % key, val)) |
7687 | 7690 |
|
7688 |
self.cfg.Update(instance) |
|
7691 |
self.cfg.Update(instance, feedback_fn)
|
|
7689 | 7692 |
|
7690 | 7693 |
return result |
7691 | 7694 |
|
... | ... | |
8088 | 8091 |
except errors.TagError, err: |
8089 | 8092 |
raise errors.OpExecError("Error while setting tag: %s" % str(err)) |
8090 | 8093 |
try: |
8091 |
self.cfg.Update(self.target) |
|
8094 |
self.cfg.Update(self.target, feedback_fn)
|
|
8092 | 8095 |
except errors.ConfigurationError: |
8093 | 8096 |
raise errors.OpRetryError("There has been a modification to the" |
8094 | 8097 |
" config file and the operation has been" |
... | ... | |
8127 | 8130 |
for tag in self.op.tags: |
8128 | 8131 |
self.target.RemoveTag(tag) |
8129 | 8132 |
try: |
8130 |
self.cfg.Update(self.target) |
|
8133 |
self.cfg.Update(self.target, feedback_fn)
|
|
8131 | 8134 |
except errors.ConfigurationError: |
8132 | 8135 |
raise errors.OpRetryError("There has been a modification to the" |
8133 | 8136 |
" config file and the operation has been" |
b/lib/config.py | ||
---|---|---|
1145 | 1145 |
if modified: |
1146 | 1146 |
self._WriteConfig() |
1147 | 1147 |
|
1148 |
def _DistributeConfig(self): |
|
1148 |
def _DistributeConfig(self, feedback_fn):
|
|
1149 | 1149 |
"""Distribute the configuration to the other nodes. |
1150 | 1150 |
|
1151 | 1151 |
Currently, this only copies the configuration file. In the future, |
... | ... | |
1154 | 1154 |
""" |
1155 | 1155 |
if self._offline: |
1156 | 1156 |
return True |
1157 |
|
|
1157 | 1158 |
bad = False |
1158 | 1159 |
|
1159 | 1160 |
node_list = [] |
... | ... | |
1180 | 1181 |
msg = ("Copy of file %s to node %s failed: %s" % |
1181 | 1182 |
(self._cfg_file, to_node, msg)) |
1182 | 1183 |
logging.error(msg) |
1184 |
|
|
1185 |
if feedback_fn: |
|
1186 |
feedback_fn(msg) |
|
1187 |
|
|
1183 | 1188 |
bad = True |
1189 |
|
|
1184 | 1190 |
return not bad |
1185 | 1191 |
|
1186 |
def _WriteConfig(self, destination=None): |
|
1192 |
def _WriteConfig(self, destination=None, feedback_fn=None):
|
|
1187 | 1193 |
"""Write the configuration data to persistent storage. |
1188 | 1194 |
|
1189 | 1195 |
""" |
1196 |
assert feedback_fn is None or callable(feedback_fn) |
|
1197 |
|
|
1190 | 1198 |
# first, cleanup the _temporary_ids set, if an ID is now in the |
1191 | 1199 |
# other objects it should be discarded to prevent unbounded growth |
1192 | 1200 |
# of that structure |
... | ... | |
1206 | 1214 |
self.write_count += 1 |
1207 | 1215 |
|
1208 | 1216 |
# and redistribute the config file to master candidates |
1209 |
self._DistributeConfig() |
|
1217 |
self._DistributeConfig(feedback_fn)
|
|
1210 | 1218 |
|
1211 | 1219 |
# Write ssconf files on all nodes (including locally) |
1212 | 1220 |
if self._last_cluster_serial < self._config_data.cluster.serial_no: |
... | ... | |
1214 | 1222 |
result = rpc.RpcRunner.call_write_ssconf_files( |
1215 | 1223 |
self._UnlockedGetNodeList(), |
1216 | 1224 |
self._UnlockedGetSsconfValues()) |
1225 |
|
|
1217 | 1226 |
for nname, nresu in result.items(): |
1218 | 1227 |
msg = nresu.fail_msg |
1219 | 1228 |
if msg: |
1220 |
logging.warning("Error while uploading ssconf files to" |
|
1221 |
" node %s: %s", nname, msg) |
|
1229 |
errmsg = ("Error while uploading ssconf files to" |
|
1230 |
" node %s: %s" % (nname, msg)) |
|
1231 |
logging.warning(errmsg) |
|
1232 |
|
|
1233 |
if feedback_fn: |
|
1234 |
feedback_fn(errmsg) |
|
1235 |
|
|
1222 | 1236 |
self._last_cluster_serial = self._config_data.cluster.serial_no |
1223 | 1237 |
|
1224 | 1238 |
def _UnlockedGetSsconfValues(self): |
... | ... | |
1302 | 1316 |
return self._config_data.cluster |
1303 | 1317 |
|
1304 | 1318 |
@locking.ssynchronized(_config_lock) |
1305 |
def Update(self, target): |
|
1319 |
def Update(self, target, feedback_fn):
|
|
1306 | 1320 |
"""Notify function to be called after updates. |
1307 | 1321 |
|
1308 | 1322 |
This function must be called when an object (as returned by |
... | ... | |
1314 | 1328 |
@param target: an instance of either L{objects.Cluster}, |
1315 | 1329 |
L{objects.Node} or L{objects.Instance} which is existing in |
1316 | 1330 |
the cluster |
1331 |
@param feedback_fn: Callable feedback function |
|
1317 | 1332 |
|
1318 | 1333 |
""" |
1319 | 1334 |
if self._config_data is None: |
... | ... | |
1346 | 1361 |
for nic in target.nics: |
1347 | 1362 |
self._temporary_macs.discard(nic.mac) |
1348 | 1363 |
|
1349 |
self._WriteConfig() |
|
1364 |
self._WriteConfig(feedback_fn=feedback_fn) |
b/test/ganeti.config_unittest.py | ||
---|---|---|
108 | 108 |
# construct a fake cluster object |
109 | 109 |
fake_cl = objects.Cluster() |
110 | 110 |
# fail if we didn't read the config |
111 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl) |
|
111 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl, None)
|
|
112 | 112 |
|
113 | 113 |
cl = cfg.GetClusterInfo() |
114 | 114 |
# first pass, must not fail |
115 |
cfg.Update(cl) |
|
115 |
cfg.Update(cl, None)
|
|
116 | 116 |
# second pass, also must not fail (after the config has been written) |
117 |
cfg.Update(cl) |
|
117 |
cfg.Update(cl, None)
|
|
118 | 118 |
# but the fake_cl update should still fail |
119 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl) |
|
119 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl, None)
|
|
120 | 120 |
|
121 | 121 |
def testUpdateNode(self): |
122 | 122 |
"""Test updates on one node object""" |
... | ... | |
124 | 124 |
# construct a fake node |
125 | 125 |
fake_node = objects.Node() |
126 | 126 |
# fail if we didn't read the config |
127 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node) |
|
127 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node, |
|
128 |
None) |
|
128 | 129 |
|
129 | 130 |
node = cfg.GetNodeInfo(cfg.GetNodeList()[0]) |
130 | 131 |
# first pass, must not fail |
131 |
cfg.Update(node) |
|
132 |
cfg.Update(node, None)
|
|
132 | 133 |
# second pass, also must not fail (after the config has been written) |
133 |
cfg.Update(node) |
|
134 |
cfg.Update(node, None)
|
|
134 | 135 |
# but the fake_node update should still fail |
135 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node) |
|
136 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node, |
|
137 |
None) |
|
136 | 138 |
|
137 | 139 |
def testUpdateInstance(self): |
138 | 140 |
"""Test updates on one instance object""" |
... | ... | |
141 | 143 |
inst = self._create_instance() |
142 | 144 |
fake_instance = objects.Instance() |
143 | 145 |
# fail if we didn't read the config |
144 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance) |
|
146 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance, |
|
147 |
None) |
|
145 | 148 |
|
146 | 149 |
cfg.AddInstance(inst) |
147 | 150 |
instance = cfg.GetInstanceInfo(cfg.GetInstanceList()[0]) |
148 | 151 |
# first pass, must not fail |
149 |
cfg.Update(instance) |
|
152 |
cfg.Update(instance, None)
|
|
150 | 153 |
# second pass, also must not fail (after the config has been written) |
151 |
cfg.Update(instance) |
|
154 |
cfg.Update(instance, None)
|
|
152 | 155 |
# but the fake_instance update should still fail |
153 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance) |
|
156 |
self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance, |
|
157 |
None) |
|
154 | 158 |
|
155 | 159 |
def testNICParameterSyntaxCheck(self): |
156 | 160 |
"""Test the NIC's CheckParameterSyntax function""" |
Also available in: Unified diff