root / tools / cluster-merge @ 224ff0f7
History | View | Annotate | Download (30.6 kB)
1 |
#!/usr/bin/python |
---|---|
2 |
# |
3 |
|
4 |
# Copyright (C) 2010, 2012 Google Inc. |
5 |
# |
6 |
# This program is free software; you can redistribute it and/or modify |
7 |
# it under the terms of the GNU General Public License as published by |
8 |
# the Free Software Foundation; either version 2 of the License, or |
9 |
# (at your option) any later version. |
10 |
# |
11 |
# This program is distributed in the hope that it will be useful, but |
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of |
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
14 |
# General Public License for more details. |
15 |
# |
16 |
# You should have received a copy of the GNU General Public License |
17 |
# along with this program; if not, write to the Free Software |
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
19 |
# 02110-1301, USA. |
20 |
|
21 |
"""Tool to merge two or more clusters together. |
22 |
|
23 |
The clusters have to run the same version of Ganeti! |
24 |
|
25 |
""" |
26 |
|
27 |
# pylint: disable=C0103 |
28 |
# C0103: Invalid name cluster-merge |
29 |
|
30 |
import logging |
31 |
import os |
32 |
import optparse |
33 |
import shutil |
34 |
import sys |
35 |
import tempfile |
36 |
|
37 |
from ganeti import cli |
38 |
from ganeti import config |
39 |
from ganeti import constants |
40 |
from ganeti import errors |
41 |
from ganeti import ssh |
42 |
from ganeti import utils |
43 |
from ganeti import pathutils |
44 |
|
45 |
|
46 |
_GROUPS_MERGE = "merge" |
47 |
_GROUPS_RENAME = "rename" |
48 |
_CLUSTERMERGE_ECID = "clustermerge-ecid" |
49 |
_RESTART_ALL = "all" |
50 |
_RESTART_UP = "up" |
51 |
_RESTART_NONE = "none" |
52 |
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE) |
53 |
_PARAMS_STRICT = "strict" |
54 |
_PARAMS_WARN = "warn" |
55 |
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN) |
56 |
|
57 |
|
58 |
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800, |
59 |
action="store", type="int", |
60 |
dest="pause_period", |
61 |
help=("Amount of time in seconds watcher" |
62 |
" should be suspended from running")) |
63 |
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY", |
64 |
choices=(_GROUPS_MERGE, _GROUPS_RENAME), |
65 |
dest="groups", |
66 |
help=("How to handle groups that have the" |
67 |
" same name (One of: %s/%s)" % |
68 |
(_GROUPS_MERGE, _GROUPS_RENAME))) |
69 |
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT, |
70 |
metavar="STRATEGY", |
71 |
choices=_PARAMS_CHOICES, |
72 |
dest="params", |
73 |
help=("How to handle params that have" |
74 |
" different values (One of: %s/%s)" % |
75 |
_PARAMS_CHOICES)) |
76 |
|
77 |
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL, |
78 |
metavar="STRATEGY", |
79 |
choices=_RESTART_CHOICES, |
80 |
dest="restart", |
81 |
help=("How to handle restarting instances" |
82 |
" same name (One of: %s/%s/%s)" % |
83 |
_RESTART_CHOICES)) |
84 |
|
85 |
SKIP_STOP_INSTANCES_OPT = \ |
86 |
cli.cli_option("--skip-stop-instances", default=True, action="store_false", |
87 |
dest="stop_instances", |
88 |
help=("Don't stop the instances on the clusters, just check " |
89 |
"that none is running")) |
90 |
|
91 |
|
92 |
def Flatten(unflattened_list): |
93 |
"""Flattens a list. |
94 |
|
95 |
@param unflattened_list: A list of unflattened list objects. |
96 |
@return: A flattened list |
97 |
|
98 |
""" |
99 |
flattened_list = [] |
100 |
|
101 |
for item in unflattened_list: |
102 |
if isinstance(item, list): |
103 |
flattened_list.extend(Flatten(item)) |
104 |
else: |
105 |
flattened_list.append(item) |
106 |
return flattened_list |
107 |
|
108 |
|
109 |
class MergerData(object): |
110 |
"""Container class to hold data used for merger. |
111 |
|
112 |
""" |
113 |
def __init__(self, cluster, key_path, nodes, instances, master_node, |
114 |
config_path=None): |
115 |
"""Initialize the container. |
116 |
|
117 |
@param cluster: The name of the cluster |
118 |
@param key_path: Path to the ssh private key used for authentication |
119 |
@param nodes: List of online nodes in the merging cluster |
120 |
@param instances: List of instances running on merging cluster |
121 |
@param master_node: Name of the master node |
122 |
@param config_path: Path to the merging cluster config |
123 |
|
124 |
""" |
125 |
self.cluster = cluster |
126 |
self.key_path = key_path |
127 |
self.nodes = nodes |
128 |
self.instances = instances |
129 |
self.master_node = master_node |
130 |
self.config_path = config_path |
131 |
|
132 |
|
133 |
class Merger(object): |
134 |
"""Handling the merge. |
135 |
|
136 |
""" |
137 |
RUNNING_STATUSES = frozenset([ |
138 |
constants.INSTST_RUNNING, |
139 |
constants.INSTST_ERRORUP, |
140 |
]) |
141 |
|
142 |
def __init__(self, clusters, pause_period, groups, restart, params, |
143 |
stop_instances): |
144 |
"""Initialize object with sane defaults and infos required. |
145 |
|
146 |
@param clusters: The list of clusters to merge in |
147 |
@param pause_period: The time watcher shall be disabled for |
148 |
@param groups: How to handle group conflicts |
149 |
@param restart: How to handle instance restart |
150 |
@param stop_instances: Indicates whether the instances must be stopped |
151 |
(True) or if the Merger must only check if no |
152 |
instances are running on the mergee clusters (False) |
153 |
|
154 |
""" |
155 |
self.merger_data = [] |
156 |
self.clusters = clusters |
157 |
self.pause_period = pause_period |
158 |
self.work_dir = tempfile.mkdtemp(suffix="cluster-merger") |
159 |
(self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"]) |
160 |
self.ssh_runner = ssh.SshRunner(self.cluster_name) |
161 |
self.groups = groups |
162 |
self.restart = restart |
163 |
self.params = params |
164 |
self.stop_instances = stop_instances |
165 |
if self.restart == _RESTART_UP: |
166 |
raise NotImplementedError |
167 |
|
168 |
def Setup(self): |
169 |
"""Sets up our end so we can do the merger. |
170 |
|
171 |
This method is setting us up as a preparation for the merger. |
172 |
It makes the initial contact and gathers information needed. |
173 |
|
174 |
@raise errors.RemoteError: for errors in communication/grabbing |
175 |
|
176 |
""" |
177 |
(remote_path, _, _) = ssh.GetUserFiles("root") |
178 |
|
179 |
if self.cluster_name in self.clusters: |
180 |
raise errors.CommandError("Cannot merge cluster %s with itself" % |
181 |
self.cluster_name) |
182 |
|
183 |
# Fetch remotes private key |
184 |
for cluster in self.clusters: |
185 |
result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False, |
186 |
ask_key=False) |
187 |
if result.failed: |
188 |
raise errors.RemoteError("There was an error while grabbing ssh private" |
189 |
" key from %s. Fail reason: %s; output: %s" % |
190 |
(cluster, result.fail_reason, result.output)) |
191 |
|
192 |
key_path = utils.PathJoin(self.work_dir, cluster) |
193 |
utils.WriteFile(key_path, mode=0600, data=result.stdout) |
194 |
|
195 |
result = self._RunCmd(cluster, "gnt-node list -o name,offline" |
196 |
" --no-headers --separator=,", private_key=key_path) |
197 |
if result.failed: |
198 |
raise errors.RemoteError("Unable to retrieve list of nodes from %s." |
199 |
" Fail reason: %s; output: %s" % |
200 |
(cluster, result.fail_reason, result.output)) |
201 |
nodes_statuses = [line.split(",") for line in result.stdout.splitlines()] |
202 |
nodes = [node_status[0] for node_status in nodes_statuses |
203 |
if node_status[1] == "N"] |
204 |
|
205 |
result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers", |
206 |
private_key=key_path) |
207 |
if result.failed: |
208 |
raise errors.RemoteError("Unable to retrieve list of instances from" |
209 |
" %s. Fail reason: %s; output: %s" % |
210 |
(cluster, result.fail_reason, result.output)) |
211 |
instances = result.stdout.splitlines() |
212 |
|
213 |
path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" % |
214 |
constants.SS_MASTER_NODE) |
215 |
result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path) |
216 |
if result.failed: |
217 |
raise errors.RemoteError("Unable to retrieve the master node name from" |
218 |
" %s. Fail reason: %s; output: %s" % |
219 |
(cluster, result.fail_reason, result.output)) |
220 |
master_node = result.stdout.strip() |
221 |
|
222 |
self.merger_data.append(MergerData(cluster, key_path, nodes, instances, |
223 |
master_node)) |
224 |
|
225 |
def _PrepareAuthorizedKeys(self): |
226 |
"""Prepare the authorized_keys on every merging node. |
227 |
|
228 |
This method add our public key to remotes authorized_key for further |
229 |
communication. |
230 |
|
231 |
""" |
232 |
(_, pub_key_file, auth_keys) = ssh.GetUserFiles("root") |
233 |
pub_key = utils.ReadFile(pub_key_file) |
234 |
|
235 |
for data in self.merger_data: |
236 |
for node in data.nodes: |
237 |
result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" % |
238 |
(auth_keys, pub_key)), |
239 |
private_key=data.key_path, max_attempts=3) |
240 |
|
241 |
if result.failed: |
242 |
raise errors.RemoteError("Unable to add our public key to %s in %s." |
243 |
" Fail reason: %s; output: %s" % |
244 |
(node, data.cluster, result.fail_reason, |
245 |
result.output)) |
246 |
|
247 |
def _RunCmd(self, hostname, command, user="root", use_cluster_key=False, |
248 |
strict_host_check=False, private_key=None, batch=True, |
249 |
ask_key=False, max_attempts=1): |
250 |
"""Wrapping SshRunner.Run with default parameters. |
251 |
|
252 |
For explanation of parameters see L{ganeti.ssh.SshRunner.Run}. |
253 |
|
254 |
""" |
255 |
for _ in range(max_attempts): |
256 |
result = self.ssh_runner.Run(hostname=hostname, command=command, |
257 |
user=user, use_cluster_key=use_cluster_key, |
258 |
strict_host_check=strict_host_check, |
259 |
private_key=private_key, batch=batch, |
260 |
ask_key=ask_key) |
261 |
if not result.failed: |
262 |
break |
263 |
|
264 |
return result |
265 |
|
266 |
def _CheckRunningInstances(self): |
267 |
"""Checks if on the clusters to be merged there are running instances |
268 |
|
269 |
@rtype: boolean |
270 |
@return: True if there are running instances, False otherwise |
271 |
|
272 |
""" |
273 |
for cluster in self.clusters: |
274 |
result = self._RunCmd(cluster, "gnt-instance list -o status") |
275 |
if self.RUNNING_STATUSES.intersection(result.output.splitlines()): |
276 |
return True |
277 |
|
278 |
return False |
279 |
|
280 |
def _StopMergingInstances(self): |
281 |
"""Stop instances on merging clusters. |
282 |
|
283 |
""" |
284 |
for cluster in self.clusters: |
285 |
result = self._RunCmd(cluster, "gnt-instance shutdown --all" |
286 |
" --force-multiple") |
287 |
|
288 |
if result.failed: |
289 |
raise errors.RemoteError("Unable to stop instances on %s." |
290 |
" Fail reason: %s; output: %s" % |
291 |
(cluster, result.fail_reason, result.output)) |
292 |
|
293 |
def _DisableWatcher(self): |
294 |
"""Disable watch on all merging clusters, including ourself. |
295 |
|
296 |
""" |
297 |
for cluster in ["localhost"] + self.clusters: |
298 |
result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" % |
299 |
self.pause_period) |
300 |
|
301 |
if result.failed: |
302 |
raise errors.RemoteError("Unable to pause watcher on %s." |
303 |
" Fail reason: %s; output: %s" % |
304 |
(cluster, result.fail_reason, result.output)) |
305 |
|
306 |
def _RemoveMasterIps(self): |
307 |
"""Removes the master IPs from the master nodes of each cluster. |
308 |
|
309 |
""" |
310 |
for data in self.merger_data: |
311 |
result = self._RunCmd(data.master_node, |
312 |
"gnt-cluster deactivate-master-ip --yes") |
313 |
|
314 |
if result.failed: |
315 |
raise errors.RemoteError("Unable to remove master IP on %s." |
316 |
" Fail reason: %s; output: %s" % |
317 |
(data.master_node, |
318 |
result.fail_reason, |
319 |
result.output)) |
320 |
|
321 |
def _StopDaemons(self): |
322 |
"""Stop all daemons on merging nodes. |
323 |
|
324 |
""" |
325 |
cmd = "%s stop-all" % pathutils.DAEMON_UTIL |
326 |
for data in self.merger_data: |
327 |
for node in data.nodes: |
328 |
result = self._RunCmd(node, cmd, max_attempts=3) |
329 |
|
330 |
if result.failed: |
331 |
raise errors.RemoteError("Unable to stop daemons on %s." |
332 |
" Fail reason: %s; output: %s." % |
333 |
(node, result.fail_reason, result.output)) |
334 |
|
335 |
def _FetchRemoteConfig(self): |
336 |
"""Fetches and stores remote cluster config from the master. |
337 |
|
338 |
This step is needed before we can merge the config. |
339 |
|
340 |
""" |
341 |
for data in self.merger_data: |
342 |
result = self._RunCmd(data.cluster, "cat %s" % |
343 |
pathutils.CLUSTER_CONF_FILE) |
344 |
|
345 |
if result.failed: |
346 |
raise errors.RemoteError("Unable to retrieve remote config on %s." |
347 |
" Fail reason: %s; output %s" % |
348 |
(data.cluster, result.fail_reason, |
349 |
result.output)) |
350 |
|
351 |
data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" % |
352 |
data.cluster) |
353 |
utils.WriteFile(data.config_path, data=result.stdout) |
354 |
|
355 |
# R0201: Method could be a function |
356 |
def _KillMasterDaemon(self): # pylint: disable=R0201 |
357 |
"""Kills the local master daemon. |
358 |
|
359 |
@raise errors.CommandError: If unable to kill |
360 |
|
361 |
""" |
362 |
result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"]) |
363 |
if result.failed: |
364 |
raise errors.CommandError("Unable to stop master daemons." |
365 |
" Fail reason: %s; output: %s" % |
366 |
(result.fail_reason, result.output)) |
367 |
|
368 |
def _MergeConfig(self): |
369 |
"""Merges all foreign config into our own config. |
370 |
|
371 |
""" |
372 |
my_config = config.ConfigWriter(offline=True) |
373 |
fake_ec_id = 0 # Needs to be uniq over the whole config merge |
374 |
|
375 |
for data in self.merger_data: |
376 |
other_config = config.ConfigWriter(data.config_path, accept_foreign=True) |
377 |
self._MergeClusterConfigs(my_config, other_config) |
378 |
self._MergeNodeGroups(my_config, other_config) |
379 |
|
380 |
for node in other_config.GetNodeList(): |
381 |
node_info = other_config.GetNodeInfo(node) |
382 |
# Offline the node, it will be reonlined later at node readd |
383 |
node_info.master_candidate = False |
384 |
node_info.drained = False |
385 |
node_info.offline = True |
386 |
my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id)) |
387 |
fake_ec_id += 1 |
388 |
|
389 |
for instance in other_config.GetInstanceList(): |
390 |
instance_info = other_config.GetInstanceInfo(instance) |
391 |
|
392 |
# Update the DRBD port assignments |
393 |
# This is a little bit hackish |
394 |
for dsk in instance_info.disks: |
395 |
if dsk.dev_type in constants.LDS_DRBD: |
396 |
port = my_config.AllocatePort() |
397 |
|
398 |
logical_id = list(dsk.logical_id) |
399 |
logical_id[2] = port |
400 |
dsk.logical_id = tuple(logical_id) |
401 |
|
402 |
physical_id = list(dsk.physical_id) |
403 |
physical_id[1] = physical_id[3] = port |
404 |
dsk.physical_id = tuple(physical_id) |
405 |
|
406 |
my_config.AddInstance(instance_info, |
407 |
_CLUSTERMERGE_ECID + str(fake_ec_id)) |
408 |
fake_ec_id += 1 |
409 |
|
410 |
def _MergeClusterConfigs(self, my_config, other_config): |
411 |
"""Checks that all relevant cluster parameters are compatible |
412 |
|
413 |
""" |
414 |
my_cluster = my_config.GetClusterInfo() |
415 |
other_cluster = other_config.GetClusterInfo() |
416 |
err_count = 0 |
417 |
|
418 |
# |
419 |
# Generic checks |
420 |
# |
421 |
check_params = [ |
422 |
"beparams", |
423 |
"default_iallocator", |
424 |
"drbd_usermode_helper", |
425 |
"hidden_os", |
426 |
"maintain_node_health", |
427 |
"master_netdev", |
428 |
"ndparams", |
429 |
"nicparams", |
430 |
"primary_ip_family", |
431 |
"tags", |
432 |
"uid_pool", |
433 |
] |
434 |
check_params_strict = [ |
435 |
"volume_group_name", |
436 |
] |
437 |
if constants.ENABLE_FILE_STORAGE: |
438 |
check_params_strict.append("file_storage_dir") |
439 |
if constants.ENABLE_SHARED_FILE_STORAGE: |
440 |
check_params_strict.append("shared_file_storage_dir") |
441 |
check_params.extend(check_params_strict) |
442 |
|
443 |
if self.params == _PARAMS_STRICT: |
444 |
params_strict = True |
445 |
else: |
446 |
params_strict = False |
447 |
|
448 |
for param_name in check_params: |
449 |
my_param = getattr(my_cluster, param_name) |
450 |
other_param = getattr(other_cluster, param_name) |
451 |
if my_param != other_param: |
452 |
logging.error("The value (%s) of the cluster parameter %s on %s" |
453 |
" differs to this cluster's value (%s)", |
454 |
other_param, param_name, other_cluster.cluster_name, |
455 |
my_param) |
456 |
if params_strict or param_name in check_params_strict: |
457 |
err_count += 1 |
458 |
|
459 |
# |
460 |
# Custom checks |
461 |
# |
462 |
|
463 |
# Check default hypervisor |
464 |
my_defhyp = my_cluster.enabled_hypervisors[0] |
465 |
other_defhyp = other_cluster.enabled_hypervisors[0] |
466 |
if my_defhyp != other_defhyp: |
467 |
logging.warning("The default hypervisor (%s) differs on %s, new" |
468 |
" instances will be created with this cluster's" |
469 |
" default hypervisor (%s)", other_defhyp, |
470 |
other_cluster.cluster_name, my_defhyp) |
471 |
|
472 |
if (set(my_cluster.enabled_hypervisors) != |
473 |
set(other_cluster.enabled_hypervisors)): |
474 |
logging.error("The set of enabled hypervisors (%s) on %s differs to" |
475 |
" this cluster's set (%s)", |
476 |
other_cluster.enabled_hypervisors, |
477 |
other_cluster.cluster_name, my_cluster.enabled_hypervisors) |
478 |
err_count += 1 |
479 |
|
480 |
# Check hypervisor params for hypervisors we care about |
481 |
for hyp in my_cluster.enabled_hypervisors: |
482 |
for param in my_cluster.hvparams[hyp]: |
483 |
my_value = my_cluster.hvparams[hyp][param] |
484 |
other_value = other_cluster.hvparams[hyp][param] |
485 |
if my_value != other_value: |
486 |
logging.error("The value (%s) of the %s parameter of the %s" |
487 |
" hypervisor on %s differs to this cluster's parameter" |
488 |
" (%s)", |
489 |
other_value, param, hyp, other_cluster.cluster_name, |
490 |
my_value) |
491 |
if params_strict: |
492 |
err_count += 1 |
493 |
|
494 |
# Check os hypervisor params for hypervisors we care about |
495 |
for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()): |
496 |
for hyp in my_cluster.enabled_hypervisors: |
497 |
my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp) |
498 |
other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp) |
499 |
if my_os_hvp != other_os_hvp: |
500 |
logging.error("The OS parameters (%s) for the %s OS for the %s" |
501 |
" hypervisor on %s differs to this cluster's parameters" |
502 |
" (%s)", |
503 |
other_os_hvp, os_name, hyp, other_cluster.cluster_name, |
504 |
my_os_hvp) |
505 |
if params_strict: |
506 |
err_count += 1 |
507 |
|
508 |
# |
509 |
# Warnings |
510 |
# |
511 |
if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts: |
512 |
logging.warning("The modify_etc_hosts value (%s) differs on %s," |
513 |
" this cluster's value (%s) will take precedence", |
514 |
other_cluster.modify_etc_hosts, |
515 |
other_cluster.cluster_name, |
516 |
my_cluster.modify_etc_hosts) |
517 |
|
518 |
if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup: |
519 |
logging.warning("The modify_ssh_setup value (%s) differs on %s," |
520 |
" this cluster's value (%s) will take precedence", |
521 |
other_cluster.modify_ssh_setup, |
522 |
other_cluster.cluster_name, |
523 |
my_cluster.modify_ssh_setup) |
524 |
|
525 |
# |
526 |
# Actual merging |
527 |
# |
528 |
my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs + |
529 |
other_cluster.reserved_lvs)) |
530 |
|
531 |
if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks: |
532 |
logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this" |
533 |
" cluster's value (%s). The least permissive value (%s)" |
534 |
" will be used", other_cluster.prealloc_wipe_disks, |
535 |
other_cluster.cluster_name, |
536 |
my_cluster.prealloc_wipe_disks, True) |
537 |
my_cluster.prealloc_wipe_disks = True |
538 |
|
539 |
for os_, osparams in other_cluster.osparams.items(): |
540 |
if os_ not in my_cluster.osparams: |
541 |
my_cluster.osparams[os_] = osparams |
542 |
elif my_cluster.osparams[os_] != osparams: |
543 |
logging.error("The OS parameters (%s) for the %s OS on %s differs to" |
544 |
" this cluster's parameters (%s)", |
545 |
osparams, os_, other_cluster.cluster_name, |
546 |
my_cluster.osparams[os_]) |
547 |
if params_strict: |
548 |
err_count += 1 |
549 |
|
550 |
if err_count: |
551 |
raise errors.ConfigurationError("Cluster config for %s has incompatible" |
552 |
" values, please fix and re-run" % |
553 |
other_cluster.cluster_name) |
554 |
|
555 |
# R0201: Method could be a function |
556 |
def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201 |
557 |
if os_name in cluster.os_hvp: |
558 |
return cluster.os_hvp[os_name].get(hyp, None) |
559 |
else: |
560 |
return None |
561 |
|
562 |
# R0201: Method could be a function |
563 |
def _MergeNodeGroups(self, my_config, other_config): |
564 |
"""Adds foreign node groups |
565 |
|
566 |
ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts. |
567 |
""" |
568 |
# pylint: disable=R0201 |
569 |
logging.info("Node group conflict strategy: %s", self.groups) |
570 |
|
571 |
my_grps = my_config.GetAllNodeGroupsInfo().values() |
572 |
other_grps = other_config.GetAllNodeGroupsInfo().values() |
573 |
|
574 |
# Check for node group naming conflicts: |
575 |
conflicts = [] |
576 |
for other_grp in other_grps: |
577 |
for my_grp in my_grps: |
578 |
if other_grp.name == my_grp.name: |
579 |
conflicts.append(other_grp) |
580 |
|
581 |
if conflicts: |
582 |
conflict_names = utils.CommaJoin([g.name for g in conflicts]) |
583 |
logging.info("Node groups in both local and remote cluster: %s", |
584 |
conflict_names) |
585 |
|
586 |
# User hasn't specified how to handle conflicts |
587 |
if not self.groups: |
588 |
raise errors.CommandError("The following node group(s) are in both" |
589 |
" clusters, and no merge strategy has been" |
590 |
" supplied (see the --groups option): %s" % |
591 |
conflict_names) |
592 |
|
593 |
# User wants to rename conflicts |
594 |
elif self.groups == _GROUPS_RENAME: |
595 |
for grp in conflicts: |
596 |
new_name = "%s-%s" % (grp.name, other_config.GetClusterName()) |
597 |
logging.info("Renaming remote node group from %s to %s" |
598 |
" to resolve conflict", grp.name, new_name) |
599 |
grp.name = new_name |
600 |
|
601 |
# User wants to merge conflicting groups |
602 |
elif self.groups == _GROUPS_MERGE: |
603 |
for other_grp in conflicts: |
604 |
logging.info("Merging local and remote '%s' groups", other_grp.name) |
605 |
for node_name in other_grp.members[:]: |
606 |
node = other_config.GetNodeInfo(node_name) |
607 |
# Access to a protected member of a client class |
608 |
# pylint: disable=W0212 |
609 |
other_config._UnlockedRemoveNodeFromGroup(node) |
610 |
|
611 |
# Access to a protected member of a client class |
612 |
# pylint: disable=W0212 |
613 |
my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name) |
614 |
|
615 |
# Access to a protected member of a client class |
616 |
# pylint: disable=W0212 |
617 |
my_config._UnlockedAddNodeToGroup(node, my_grp_uuid) |
618 |
node.group = my_grp_uuid |
619 |
# Remove from list of groups to add |
620 |
other_grps.remove(other_grp) |
621 |
|
622 |
for grp in other_grps: |
623 |
#TODO: handle node group conflicts |
624 |
my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID) |
625 |
|
626 |
# R0201: Method could be a function |
627 |
def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201 |
628 |
"""Starts the local master daemon. |
629 |
|
630 |
@param no_vote: Should the masterd started without voting? default: False |
631 |
@raise errors.CommandError: If unable to start daemon. |
632 |
|
633 |
""" |
634 |
env = {} |
635 |
if no_vote: |
636 |
env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it" |
637 |
|
638 |
result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env) |
639 |
if result.failed: |
640 |
raise errors.CommandError("Couldn't start ganeti master." |
641 |
" Fail reason: %s; output: %s" % |
642 |
(result.fail_reason, result.output)) |
643 |
|
644 |
def _ReaddMergedNodesAndRedist(self): |
645 |
"""Readds all merging nodes and make sure their config is up-to-date. |
646 |
|
647 |
@raise errors.CommandError: If anything fails. |
648 |
|
649 |
""" |
650 |
for data in self.merger_data: |
651 |
for node in data.nodes: |
652 |
logging.info("Readding node %s", node) |
653 |
result = utils.RunCmd(["gnt-node", "add", "--readd", |
654 |
"--no-ssh-key-check", node]) |
655 |
if result.failed: |
656 |
logging.error("%s failed to be readded. Reason: %s, output: %s", |
657 |
node, result.fail_reason, result.output) |
658 |
|
659 |
result = utils.RunCmd(["gnt-cluster", "redist-conf"]) |
660 |
if result.failed: |
661 |
raise errors.CommandError("Redistribution failed. Fail reason: %s;" |
662 |
" output: %s" % (result.fail_reason, |
663 |
result.output)) |
664 |
|
665 |
# R0201: Method could be a function |
666 |
def _StartupAllInstances(self): # pylint: disable=R0201 |
667 |
"""Starts up all instances (locally). |
668 |
|
669 |
@raise errors.CommandError: If unable to start clusters |
670 |
|
671 |
""" |
672 |
result = utils.RunCmd(["gnt-instance", "startup", "--all", |
673 |
"--force-multiple"]) |
674 |
if result.failed: |
675 |
raise errors.CommandError("Unable to start all instances." |
676 |
" Fail reason: %s; output: %s" % |
677 |
(result.fail_reason, result.output)) |
678 |
|
679 |
# R0201: Method could be a function |
680 |
# TODO: make this overridable, for some verify errors |
681 |
def _VerifyCluster(self): # pylint: disable=R0201 |
682 |
"""Runs gnt-cluster verify to verify the health. |
683 |
|
684 |
@raise errors.ProgrammError: If cluster fails on verification |
685 |
|
686 |
""" |
687 |
result = utils.RunCmd(["gnt-cluster", "verify"]) |
688 |
if result.failed: |
689 |
raise errors.CommandError("Verification of cluster failed." |
690 |
" Fail reason: %s; output: %s" % |
691 |
(result.fail_reason, result.output)) |
692 |
|
693 |
def Merge(self): |
694 |
"""Does the actual merge. |
695 |
|
696 |
It runs all the steps in the right order and updates the user about steps |
697 |
taken. Also it keeps track of rollback_steps to undo everything. |
698 |
|
699 |
""" |
700 |
rbsteps = [] |
701 |
try: |
702 |
logging.info("Pre cluster verification") |
703 |
self._VerifyCluster() |
704 |
|
705 |
logging.info("Prepare authorized_keys") |
706 |
rbsteps.append("Remove our key from authorized_keys on nodes:" |
707 |
" %(nodes)s") |
708 |
self._PrepareAuthorizedKeys() |
709 |
|
710 |
rbsteps.append("Start all instances again on the merging" |
711 |
" clusters: %(clusters)s") |
712 |
if self.stop_instances: |
713 |
logging.info("Stopping merging instances (takes a while)") |
714 |
self._StopMergingInstances() |
715 |
logging.info("Checking that no instances are running on the mergees") |
716 |
instances_running = self._CheckRunningInstances() |
717 |
if instances_running: |
718 |
raise errors.CommandError("Some instances are still running on the" |
719 |
" mergees") |
720 |
logging.info("Disable watcher") |
721 |
self._DisableWatcher() |
722 |
logging.info("Merging config") |
723 |
self._FetchRemoteConfig() |
724 |
logging.info("Removing master IPs on mergee master nodes") |
725 |
self._RemoveMasterIps() |
726 |
logging.info("Stop daemons on merging nodes") |
727 |
self._StopDaemons() |
728 |
|
729 |
logging.info("Stopping master daemon") |
730 |
self._KillMasterDaemon() |
731 |
|
732 |
rbsteps.append("Restore %s from another master candidate" |
733 |
" and restart master daemon" % |
734 |
pathutils.CLUSTER_CONF_FILE) |
735 |
self._MergeConfig() |
736 |
self._StartMasterDaemon(no_vote=True) |
737 |
|
738 |
# Point of no return, delete rbsteps |
739 |
del rbsteps[:] |
740 |
|
741 |
logging.warning("We are at the point of no return. Merge can not easily" |
742 |
" be undone after this point.") |
743 |
logging.info("Readd nodes") |
744 |
self._ReaddMergedNodesAndRedist() |
745 |
|
746 |
logging.info("Merge done, restart master daemon normally") |
747 |
self._KillMasterDaemon() |
748 |
self._StartMasterDaemon() |
749 |
|
750 |
if self.restart == _RESTART_ALL: |
751 |
logging.info("Starting instances again") |
752 |
self._StartupAllInstances() |
753 |
else: |
754 |
logging.info("Not starting instances again") |
755 |
logging.info("Post cluster verification") |
756 |
self._VerifyCluster() |
757 |
except errors.GenericError, e: |
758 |
logging.exception(e) |
759 |
|
760 |
if rbsteps: |
761 |
nodes = Flatten([data.nodes for data in self.merger_data]) |
762 |
info = { |
763 |
"clusters": self.clusters, |
764 |
"nodes": nodes, |
765 |
} |
766 |
logging.critical("In order to rollback do the following:") |
767 |
for step in rbsteps: |
768 |
logging.critical(" * %s", step % info) |
769 |
else: |
770 |
logging.critical("Nothing to rollback.") |
771 |
|
772 |
# TODO: Keep track of steps done for a flawless resume? |
773 |
|
774 |
def Cleanup(self): |
775 |
"""Clean up our environment. |
776 |
|
777 |
This cleans up remote private keys and configs and after that |
778 |
deletes the temporary directory. |
779 |
|
780 |
""" |
781 |
shutil.rmtree(self.work_dir) |
782 |
|
783 |
|
784 |
def SetupLogging(options): |
785 |
"""Setting up logging infrastructure. |
786 |
|
787 |
@param options: Parsed command line options |
788 |
|
789 |
""" |
790 |
formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s") |
791 |
|
792 |
stderr_handler = logging.StreamHandler() |
793 |
stderr_handler.setFormatter(formatter) |
794 |
if options.debug: |
795 |
stderr_handler.setLevel(logging.NOTSET) |
796 |
elif options.verbose: |
797 |
stderr_handler.setLevel(logging.INFO) |
798 |
else: |
799 |
stderr_handler.setLevel(logging.WARNING) |
800 |
|
801 |
root_logger = logging.getLogger("") |
802 |
root_logger.setLevel(logging.NOTSET) |
803 |
root_logger.addHandler(stderr_handler) |
804 |
|
805 |
|
806 |
def main(): |
807 |
"""Main routine. |
808 |
|
809 |
""" |
810 |
program = os.path.basename(sys.argv[0]) |
811 |
|
812 |
parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>", |
813 |
prog=program) |
814 |
parser.add_option(cli.DEBUG_OPT) |
815 |
parser.add_option(cli.VERBOSE_OPT) |
816 |
parser.add_option(PAUSE_PERIOD_OPT) |
817 |
parser.add_option(GROUPS_OPT) |
818 |
parser.add_option(RESTART_OPT) |
819 |
parser.add_option(PARAMS_OPT) |
820 |
parser.add_option(SKIP_STOP_INSTANCES_OPT) |
821 |
|
822 |
(options, args) = parser.parse_args() |
823 |
|
824 |
SetupLogging(options) |
825 |
|
826 |
if not args: |
827 |
parser.error("No clusters specified") |
828 |
|
829 |
cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period, |
830 |
options.groups, options.restart, options.params, |
831 |
options.stop_instances) |
832 |
try: |
833 |
try: |
834 |
cluster_merger.Setup() |
835 |
cluster_merger.Merge() |
836 |
except errors.GenericError, e: |
837 |
logging.exception(e) |
838 |
return constants.EXIT_FAILURE |
839 |
finally: |
840 |
cluster_merger.Cleanup() |
841 |
|
842 |
return constants.EXIT_SUCCESS |
843 |
|
844 |
|
845 |
if __name__ == "__main__": |
846 |
sys.exit(main()) |