Add configure-time switch for split queries
[ganeti-local] / lib / client / gnt_cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Cluster related commands"""
22
23 # pylint: disable=W0401,W0613,W0614,C0103
24 # W0401: Wildcard import ganeti.cli
25 # W0613: Unused argument, since all functions follow the same API
26 # W0614: Unused import %s from wildcard import (since we need cli)
27 # C0103: Invalid name gnt-cluster
28
29 import os.path
30 import time
31 import OpenSSL
32 import itertools
33
34 from ganeti.cli import *
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import bootstrap
40 from ganeti import ssh
41 from ganeti import objects
42 from ganeti import uidpool
43 from ganeti import compat
44 from ganeti import netutils
45
46
47 ON_OPT = cli_option("--on", default=False,
48                     action="store_true", dest="on",
49                     help="Recover from an EPO")
50
51 GROUPS_OPT = cli_option("--groups", default=False,
52                         action="store_true", dest="groups",
53                         help="Arguments are node groups instead of nodes")
54
55 SHOW_MACHINE_OPT = cli_option("-M", "--show-machine-names", default=False,
56                               action="store_true",
57                               help="Show machine name for every line in output")
58
59 _EPO_PING_INTERVAL = 30 # 30 seconds between pings
60 _EPO_PING_TIMEOUT = 1 # 1 second
61 _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
62
63
64 @UsesRPC
65 def InitCluster(opts, args):
66   """Initialize the cluster.
67
68   @param opts: the command line options selected by the user
69   @type args: list
70   @param args: should contain only one element, the desired
71       cluster name
72   @rtype: int
73   @return: the desired exit code
74
75   """
76   if not opts.lvm_storage and opts.vg_name:
77     ToStderr("Options --no-lvm-storage and --vg-name conflict.")
78     return 1
79
80   vg_name = opts.vg_name
81   if opts.lvm_storage and not opts.vg_name:
82     vg_name = constants.DEFAULT_VG
83
84   if not opts.drbd_storage and opts.drbd_helper:
85     ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
86     return 1
87
88   drbd_helper = opts.drbd_helper
89   if opts.drbd_storage and not opts.drbd_helper:
90     drbd_helper = constants.DEFAULT_DRBD_HELPER
91
92   master_netdev = opts.master_netdev
93   if master_netdev is None:
94     master_netdev = constants.DEFAULT_BRIDGE
95
96   hvlist = opts.enabled_hypervisors
97   if hvlist is None:
98     hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
99   hvlist = hvlist.split(",")
100
101   hvparams = dict(opts.hvparams)
102   beparams = opts.beparams
103   nicparams = opts.nicparams
104
105   diskparams = dict(opts.diskparams)
106
107   # check the disk template types here, as we cannot rely on the type check done
108   # by the opcode parameter types
109   diskparams_keys = set(diskparams.keys())
110   if not (diskparams_keys <= constants.DISK_TEMPLATES):
111     unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
112     ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
113     return 1
114
115   # prepare beparams dict
116   beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
117   utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
118
119   # prepare nicparams dict
120   nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
121   utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
122
123   # prepare ndparams dict
124   if opts.ndparams is None:
125     ndparams = dict(constants.NDC_DEFAULTS)
126   else:
127     ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
128     utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
129
130   # prepare hvparams dict
131   for hv in constants.HYPER_TYPES:
132     if hv not in hvparams:
133       hvparams[hv] = {}
134     hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
135     utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
136
137   # prepare diskparams dict
138   for templ in constants.DISK_TEMPLATES:
139     if templ not in diskparams:
140       diskparams[templ] = {}
141     diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
142                                          diskparams[templ])
143     utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
144
145   # prepare ipolicy dict
146   ipolicy_raw = CreateIPolicyFromOpts(
147     ispecs_mem_size=opts.ispecs_mem_size,
148     ispecs_cpu_count=opts.ispecs_cpu_count,
149     ispecs_disk_count=opts.ispecs_disk_count,
150     ispecs_disk_size=opts.ispecs_disk_size,
151     ispecs_nic_count=opts.ispecs_nic_count,
152     ipolicy_disk_templates=opts.ipolicy_disk_templates,
153     ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
154     ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
155     fill_all=True)
156   ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_raw)
157
158   if opts.candidate_pool_size is None:
159     opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
160
161   if opts.mac_prefix is None:
162     opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
163
164   uid_pool = opts.uid_pool
165   if uid_pool is not None:
166     uid_pool = uidpool.ParseUidPool(uid_pool)
167
168   if opts.prealloc_wipe_disks is None:
169     opts.prealloc_wipe_disks = False
170
171   external_ip_setup_script = opts.use_external_mip_script
172   if external_ip_setup_script is None:
173     external_ip_setup_script = False
174
175   try:
176     primary_ip_version = int(opts.primary_ip_version)
177   except (ValueError, TypeError), err:
178     ToStderr("Invalid primary ip version value: %s" % str(err))
179     return 1
180
181   master_netmask = opts.master_netmask
182   try:
183     if master_netmask is not None:
184       master_netmask = int(master_netmask)
185   except (ValueError, TypeError), err:
186     ToStderr("Invalid master netmask value: %s" % str(err))
187     return 1
188
189   if opts.disk_state:
190     disk_state = utils.FlatToDict(opts.disk_state)
191   else:
192     disk_state = {}
193
194   hv_state = dict(opts.hv_state)
195
196   bootstrap.InitCluster(cluster_name=args[0],
197                         secondary_ip=opts.secondary_ip,
198                         vg_name=vg_name,
199                         mac_prefix=opts.mac_prefix,
200                         master_netmask=master_netmask,
201                         master_netdev=master_netdev,
202                         file_storage_dir=opts.file_storage_dir,
203                         shared_file_storage_dir=opts.shared_file_storage_dir,
204                         enabled_hypervisors=hvlist,
205                         hvparams=hvparams,
206                         beparams=beparams,
207                         nicparams=nicparams,
208                         ndparams=ndparams,
209                         diskparams=diskparams,
210                         ipolicy=ipolicy,
211                         candidate_pool_size=opts.candidate_pool_size,
212                         modify_etc_hosts=opts.modify_etc_hosts,
213                         modify_ssh_setup=opts.modify_ssh_setup,
214                         maintain_node_health=opts.maintain_node_health,
215                         drbd_helper=drbd_helper,
216                         uid_pool=uid_pool,
217                         default_iallocator=opts.default_iallocator,
218                         primary_ip_version=primary_ip_version,
219                         prealloc_wipe_disks=opts.prealloc_wipe_disks,
220                         use_external_mip_script=external_ip_setup_script,
221                         hv_state=hv_state,
222                         disk_state=disk_state,
223                         )
224   op = opcodes.OpClusterPostInit()
225   SubmitOpCode(op, opts=opts)
226   return 0
227
228
229 @UsesRPC
230 def DestroyCluster(opts, args):
231   """Destroy the cluster.
232
233   @param opts: the command line options selected by the user
234   @type args: list
235   @param args: should be an empty list
236   @rtype: int
237   @return: the desired exit code
238
239   """
240   if not opts.yes_do_it:
241     ToStderr("Destroying a cluster is irreversible. If you really want"
242              " destroy this cluster, supply the --yes-do-it option.")
243     return 1
244
245   op = opcodes.OpClusterDestroy()
246   master = SubmitOpCode(op, opts=opts)
247   # if we reached this, the opcode didn't fail; we can proceed to
248   # shutdown all the daemons
249   bootstrap.FinalizeClusterDestroy(master)
250   return 0
251
252
253 def RenameCluster(opts, args):
254   """Rename the cluster.
255
256   @param opts: the command line options selected by the user
257   @type args: list
258   @param args: should contain only one element, the new cluster name
259   @rtype: int
260   @return: the desired exit code
261
262   """
263   cl = GetClient()
264
265   (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
266
267   new_name = args[0]
268   if not opts.force:
269     usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
270                 " connected over the network to the cluster name, the"
271                 " operation is very dangerous as the IP address will be"
272                 " removed from the node and the change may not go through."
273                 " Continue?") % (cluster_name, new_name)
274     if not AskUser(usertext):
275       return 1
276
277   op = opcodes.OpClusterRename(name=new_name)
278   result = SubmitOpCode(op, opts=opts, cl=cl)
279
280   if result:
281     ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
282
283   return 0
284
285
286 def ActivateMasterIp(opts, args):
287   """Activates the master IP.
288
289   """
290   op = opcodes.OpClusterActivateMasterIp()
291   SubmitOpCode(op)
292   return 0
293
294
295 def DeactivateMasterIp(opts, args):
296   """Deactivates the master IP.
297
298   """
299   if not opts.confirm:
300     usertext = ("This will disable the master IP. All the open connections to"
301                 " the master IP will be closed. To reach the master you will"
302                 " need to use its node IP."
303                 " Continue?")
304     if not AskUser(usertext):
305       return 1
306
307   op = opcodes.OpClusterDeactivateMasterIp()
308   SubmitOpCode(op)
309   return 0
310
311
312 def RedistributeConfig(opts, args):
313   """Forces push of the cluster configuration.
314
315   @param opts: the command line options selected by the user
316   @type args: list
317   @param args: empty list
318   @rtype: int
319   @return: the desired exit code
320
321   """
322   op = opcodes.OpClusterRedistConf()
323   SubmitOrSend(op, opts)
324   return 0
325
326
327 def ShowClusterVersion(opts, args):
328   """Write version of ganeti software to the standard output.
329
330   @param opts: the command line options selected by the user
331   @type args: list
332   @param args: should be an empty list
333   @rtype: int
334   @return: the desired exit code
335
336   """
337   cl = GetClient(query=True)
338   result = cl.QueryClusterInfo()
339   ToStdout("Software version: %s", result["software_version"])
340   ToStdout("Internode protocol: %s", result["protocol_version"])
341   ToStdout("Configuration format: %s", result["config_version"])
342   ToStdout("OS api version: %s", result["os_api_version"])
343   ToStdout("Export interface: %s", result["export_version"])
344   return 0
345
346
347 def ShowClusterMaster(opts, args):
348   """Write name of master node to the standard output.
349
350   @param opts: the command line options selected by the user
351   @type args: list
352   @param args: should be an empty list
353   @rtype: int
354   @return: the desired exit code
355
356   """
357   master = bootstrap.GetMaster()
358   ToStdout(master)
359   return 0
360
361
362 def _PrintGroupedParams(paramsdict, level=1, roman=False):
363   """Print Grouped parameters (be, nic, disk) by group.
364
365   @type paramsdict: dict of dicts
366   @param paramsdict: {group: {param: value, ...}, ...}
367   @type level: int
368   @param level: Level of indention
369
370   """
371   indent = "  " * level
372   for item, val in sorted(paramsdict.items()):
373     if isinstance(val, dict):
374       ToStdout("%s- %s:", indent, item)
375       _PrintGroupedParams(val, level=level + 1, roman=roman)
376     elif roman and isinstance(val, int):
377       ToStdout("%s  %s: %s", indent, item, compat.TryToRoman(val))
378     else:
379       ToStdout("%s  %s: %s", indent, item, val)
380
381
382 def ShowClusterConfig(opts, args):
383   """Shows cluster information.
384
385   @param opts: the command line options selected by the user
386   @type args: list
387   @param args: should be an empty list
388   @rtype: int
389   @return: the desired exit code
390
391   """
392   cl = GetClient()
393   result = cl.QueryClusterInfo()
394
395   ToStdout("Cluster name: %s", result["name"])
396   ToStdout("Cluster UUID: %s", result["uuid"])
397
398   ToStdout("Creation time: %s", utils.FormatTime(result["ctime"]))
399   ToStdout("Modification time: %s", utils.FormatTime(result["mtime"]))
400
401   ToStdout("Master node: %s", result["master"])
402
403   ToStdout("Architecture (this node): %s (%s)",
404            result["architecture"][0], result["architecture"][1])
405
406   if result["tags"]:
407     tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
408   else:
409     tags = "(none)"
410
411   ToStdout("Tags: %s", tags)
412
413   ToStdout("Default hypervisor: %s", result["default_hypervisor"])
414   ToStdout("Enabled hypervisors: %s",
415            utils.CommaJoin(result["enabled_hypervisors"]))
416
417   ToStdout("Hypervisor parameters:")
418   _PrintGroupedParams(result["hvparams"])
419
420   ToStdout("OS-specific hypervisor parameters:")
421   _PrintGroupedParams(result["os_hvp"])
422
423   ToStdout("OS parameters:")
424   _PrintGroupedParams(result["osparams"])
425
426   ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"]))
427   ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"]))
428
429   ToStdout("Cluster parameters:")
430   ToStdout("  - candidate pool size: %s",
431             compat.TryToRoman(result["candidate_pool_size"],
432                               convert=opts.roman_integers))
433   ToStdout("  - master netdev: %s", result["master_netdev"])
434   ToStdout("  - master netmask: %s", result["master_netmask"])
435   ToStdout("  - use external master IP address setup script: %s",
436            result["use_external_mip_script"])
437   ToStdout("  - lvm volume group: %s", result["volume_group_name"])
438   if result["reserved_lvs"]:
439     reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
440   else:
441     reserved_lvs = "(none)"
442   ToStdout("  - lvm reserved volumes: %s", reserved_lvs)
443   ToStdout("  - drbd usermode helper: %s", result["drbd_usermode_helper"])
444   ToStdout("  - file storage path: %s", result["file_storage_dir"])
445   ToStdout("  - shared file storage path: %s",
446            result["shared_file_storage_dir"])
447   ToStdout("  - maintenance of node health: %s",
448            result["maintain_node_health"])
449   ToStdout("  - uid pool: %s",
450             uidpool.FormatUidPool(result["uid_pool"],
451                                   roman=opts.roman_integers))
452   ToStdout("  - default instance allocator: %s", result["default_iallocator"])
453   ToStdout("  - primary ip version: %d", result["primary_ip_version"])
454   ToStdout("  - preallocation wipe disks: %s", result["prealloc_wipe_disks"])
455   ToStdout("  - OS search path: %s", utils.CommaJoin(constants.OS_SEARCH_PATH))
456
457   ToStdout("Default node parameters:")
458   _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers)
459
460   ToStdout("Default instance parameters:")
461   _PrintGroupedParams(result["beparams"], roman=opts.roman_integers)
462
463   ToStdout("Default nic parameters:")
464   _PrintGroupedParams(result["nicparams"], roman=opts.roman_integers)
465
466   ToStdout("Default disk parameters:")
467   _PrintGroupedParams(result["diskparams"], roman=opts.roman_integers)
468
469   ToStdout("Instance policy - limits for instances:")
470   for key in constants.IPOLICY_ISPECS:
471     ToStdout("  - %s", key)
472     _PrintGroupedParams(result["ipolicy"][key], roman=opts.roman_integers)
473   ToStdout("  - enabled disk templates: %s",
474            utils.CommaJoin(result["ipolicy"][constants.IPOLICY_DTS]))
475   for key in constants.IPOLICY_PARAMETERS:
476     ToStdout("  - %s: %s", key, result["ipolicy"][key])
477
478   return 0
479
480
481 def ClusterCopyFile(opts, args):
482   """Copy a file from master to some nodes.
483
484   @param opts: the command line options selected by the user
485   @type args: list
486   @param args: should contain only one element, the path of
487       the file to be copied
488   @rtype: int
489   @return: the desired exit code
490
491   """
492   filename = args[0]
493   if not os.path.exists(filename):
494     raise errors.OpPrereqError("No such filename '%s'" % filename,
495                                errors.ECODE_INVAL)
496
497   cl = GetClient()
498
499   cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
500
501   results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
502                            secondary_ips=opts.use_replication_network,
503                            nodegroup=opts.nodegroup)
504
505   srun = ssh.SshRunner(cluster_name=cluster_name)
506   for node in results:
507     if not srun.CopyFileToNode(node, filename):
508       ToStderr("Copy of file %s to node %s failed", filename, node)
509
510   return 0
511
512
513 def RunClusterCommand(opts, args):
514   """Run a command on some nodes.
515
516   @param opts: the command line options selected by the user
517   @type args: list
518   @param args: should contain the command to be run and its arguments
519   @rtype: int
520   @return: the desired exit code
521
522   """
523   cl = GetClient()
524
525   command = " ".join(args)
526
527   nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
528
529   cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
530                                                     "master_node"])
531
532   srun = ssh.SshRunner(cluster_name=cluster_name)
533
534   # Make sure master node is at list end
535   if master_node in nodes:
536     nodes.remove(master_node)
537     nodes.append(master_node)
538
539   for name in nodes:
540     result = srun.Run(name, "root", command)
541     ToStdout("------------------------------------------------")
542     if opts.show_machine_names:
543       for line in result.output.splitlines():
544         ToStdout("%s: %s", name, line)
545     else:
546       ToStdout("node: %s", name)
547       ToStdout("%s", result.output)
548     ToStdout("return code = %s", result.exit_code)
549
550   return 0
551
552
553 def VerifyCluster(opts, args):
554   """Verify integrity of cluster, performing various test on nodes.
555
556   @param opts: the command line options selected by the user
557   @type args: list
558   @param args: should be an empty list
559   @rtype: int
560   @return: the desired exit code
561
562   """
563   skip_checks = []
564
565   if opts.skip_nplusone_mem:
566     skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
567
568   cl = GetClient()
569
570   op = opcodes.OpClusterVerify(verbose=opts.verbose,
571                                error_codes=opts.error_codes,
572                                debug_simulate_errors=opts.simulate_errors,
573                                skip_checks=skip_checks,
574                                ignore_errors=opts.ignore_errors,
575                                group_name=opts.nodegroup)
576   result = SubmitOpCode(op, cl=cl, opts=opts)
577
578   # Keep track of submitted jobs
579   jex = JobExecutor(cl=cl, opts=opts)
580
581   for (status, job_id) in result[constants.JOB_IDS_KEY]:
582     jex.AddJobId(None, status, job_id)
583
584   results = jex.GetResults()
585
586   (bad_jobs, bad_results) = \
587     map(len,
588         # Convert iterators to lists
589         map(list,
590             # Count errors
591             map(compat.partial(itertools.ifilterfalse, bool),
592                 # Convert result to booleans in a tuple
593                 zip(*((job_success, len(op_results) == 1 and op_results[0])
594                       for (job_success, op_results) in results)))))
595
596   if bad_jobs == 0 and bad_results == 0:
597     rcode = constants.EXIT_SUCCESS
598   else:
599     rcode = constants.EXIT_FAILURE
600     if bad_jobs > 0:
601       ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
602
603   return rcode
604
605
606 def VerifyDisks(opts, args):
607   """Verify integrity of cluster disks.
608
609   @param opts: the command line options selected by the user
610   @type args: list
611   @param args: should be an empty list
612   @rtype: int
613   @return: the desired exit code
614
615   """
616   cl = GetClient()
617
618   op = opcodes.OpClusterVerifyDisks()
619
620   result = SubmitOpCode(op, cl=cl, opts=opts)
621
622   # Keep track of submitted jobs
623   jex = JobExecutor(cl=cl, opts=opts)
624
625   for (status, job_id) in result[constants.JOB_IDS_KEY]:
626     jex.AddJobId(None, status, job_id)
627
628   retcode = constants.EXIT_SUCCESS
629
630   for (status, result) in jex.GetResults():
631     if not status:
632       ToStdout("Job failed: %s", result)
633       continue
634
635     ((bad_nodes, instances, missing), ) = result
636
637     for node, text in bad_nodes.items():
638       ToStdout("Error gathering data on node %s: %s",
639                node, utils.SafeEncode(text[-400:]))
640       retcode = constants.EXIT_FAILURE
641       ToStdout("You need to fix these nodes first before fixing instances")
642
643     for iname in instances:
644       if iname in missing:
645         continue
646       op = opcodes.OpInstanceActivateDisks(instance_name=iname)
647       try:
648         ToStdout("Activating disks for instance '%s'", iname)
649         SubmitOpCode(op, opts=opts, cl=cl)
650       except errors.GenericError, err:
651         nret, msg = FormatError(err)
652         retcode |= nret
653         ToStderr("Error activating disks for instance %s: %s", iname, msg)
654
655     if missing:
656       for iname, ival in missing.iteritems():
657         all_missing = compat.all(x[0] in bad_nodes for x in ival)
658         if all_missing:
659           ToStdout("Instance %s cannot be verified as it lives on"
660                    " broken nodes", iname)
661         else:
662           ToStdout("Instance %s has missing logical volumes:", iname)
663           ival.sort()
664           for node, vol in ival:
665             if node in bad_nodes:
666               ToStdout("\tbroken node %s /dev/%s", node, vol)
667             else:
668               ToStdout("\t%s /dev/%s", node, vol)
669
670       ToStdout("You need to replace or recreate disks for all the above"
671                " instances if this message persists after fixing broken nodes.")
672       retcode = constants.EXIT_FAILURE
673
674   return retcode
675
676
677 def RepairDiskSizes(opts, args):
678   """Verify sizes of cluster disks.
679
680   @param opts: the command line options selected by the user
681   @type args: list
682   @param args: optional list of instances to restrict check to
683   @rtype: int
684   @return: the desired exit code
685
686   """
687   op = opcodes.OpClusterRepairDiskSizes(instances=args)
688   SubmitOpCode(op, opts=opts)
689
690
691 @UsesRPC
692 def MasterFailover(opts, args):
693   """Failover the master node.
694
695   This command, when run on a non-master node, will cause the current
696   master to cease being master, and the non-master to become new
697   master.
698
699   @param opts: the command line options selected by the user
700   @type args: list
701   @param args: should be an empty list
702   @rtype: int
703   @return: the desired exit code
704
705   """
706   if opts.no_voting:
707     usertext = ("This will perform the failover even if most other nodes"
708                 " are down, or if this node is outdated. This is dangerous"
709                 " as it can lead to a non-consistent cluster. Check the"
710                 " gnt-cluster(8) man page before proceeding. Continue?")
711     if not AskUser(usertext):
712       return 1
713
714   return bootstrap.MasterFailover(no_voting=opts.no_voting)
715
716
717 def MasterPing(opts, args):
718   """Checks if the master is alive.
719
720   @param opts: the command line options selected by the user
721   @type args: list
722   @param args: should be an empty list
723   @rtype: int
724   @return: the desired exit code
725
726   """
727   try:
728     cl = GetClient()
729     cl.QueryClusterInfo()
730     return 0
731   except Exception: # pylint: disable=W0703
732     return 1
733
734
735 def SearchTags(opts, args):
736   """Searches the tags on all the cluster.
737
738   @param opts: the command line options selected by the user
739   @type args: list
740   @param args: should contain only one element, the tag pattern
741   @rtype: int
742   @return: the desired exit code
743
744   """
745   op = opcodes.OpTagsSearch(pattern=args[0])
746   result = SubmitOpCode(op, opts=opts)
747   if not result:
748     return 1
749   result = list(result)
750   result.sort()
751   for path, tag in result:
752     ToStdout("%s %s", path, tag)
753
754
755 def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
756   """Reads and verifies an X509 certificate.
757
758   @type cert_filename: string
759   @param cert_filename: the path of the file containing the certificate to
760                         verify encoded in PEM format
761   @type verify_private_key: bool
762   @param verify_private_key: whether to verify the private key in addition to
763                              the public certificate
764   @rtype: string
765   @return: a string containing the PEM-encoded certificate.
766
767   """
768   try:
769     pem = utils.ReadFile(cert_filename)
770   except IOError, err:
771     raise errors.X509CertError(cert_filename,
772                                "Unable to read certificate: %s" % str(err))
773
774   try:
775     OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
776   except Exception, err:
777     raise errors.X509CertError(cert_filename,
778                                "Unable to load certificate: %s" % str(err))
779
780   if verify_private_key:
781     try:
782       OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
783     except Exception, err:
784       raise errors.X509CertError(cert_filename,
785                                  "Unable to load private key: %s" % str(err))
786
787   return pem
788
789
790 def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
791                  rapi_cert_filename, new_spice_cert, spice_cert_filename,
792                  spice_cacert_filename, new_confd_hmac_key, new_cds,
793                  cds_filename, force):
794   """Renews cluster certificates, keys and secrets.
795
796   @type new_cluster_cert: bool
797   @param new_cluster_cert: Whether to generate a new cluster certificate
798   @type new_rapi_cert: bool
799   @param new_rapi_cert: Whether to generate a new RAPI certificate
800   @type rapi_cert_filename: string
801   @param rapi_cert_filename: Path to file containing new RAPI certificate
802   @type new_spice_cert: bool
803   @param new_spice_cert: Whether to generate a new SPICE certificate
804   @type spice_cert_filename: string
805   @param spice_cert_filename: Path to file containing new SPICE certificate
806   @type spice_cacert_filename: string
807   @param spice_cacert_filename: Path to file containing the certificate of the
808                                 CA that signed the SPICE certificate
809   @type new_confd_hmac_key: bool
810   @param new_confd_hmac_key: Whether to generate a new HMAC key
811   @type new_cds: bool
812   @param new_cds: Whether to generate a new cluster domain secret
813   @type cds_filename: string
814   @param cds_filename: Path to file containing new cluster domain secret
815   @type force: bool
816   @param force: Whether to ask user for confirmation
817
818   """
819   if new_rapi_cert and rapi_cert_filename:
820     ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
821              " options can be specified at the same time.")
822     return 1
823
824   if new_cds and cds_filename:
825     ToStderr("Only one of the --new-cluster-domain-secret and"
826              " --cluster-domain-secret options can be specified at"
827              " the same time.")
828     return 1
829
830   if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
831     ToStderr("When using --new-spice-certificate, the --spice-certificate"
832              " and --spice-ca-certificate must not be used.")
833     return 1
834
835   if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
836     ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
837              " specified.")
838     return 1
839
840   rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
841   try:
842     if rapi_cert_filename:
843       rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
844     if spice_cert_filename:
845       spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
846       spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
847   except errors.X509CertError, err:
848     ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
849     return 1
850
851   if cds_filename:
852     try:
853       cds = utils.ReadFile(cds_filename)
854     except Exception, err: # pylint: disable=W0703
855       ToStderr("Can't load new cluster domain secret from %s: %s" %
856                (cds_filename, str(err)))
857       return 1
858   else:
859     cds = None
860
861   if not force:
862     usertext = ("This requires all daemons on all nodes to be restarted and"
863                 " may take some time. Continue?")
864     if not AskUser(usertext):
865       return 1
866
867   def _RenewCryptoInner(ctx):
868     ctx.feedback_fn("Updating certificates and keys")
869     bootstrap.GenerateClusterCrypto(new_cluster_cert,
870                                     new_rapi_cert,
871                                     new_spice_cert,
872                                     new_confd_hmac_key,
873                                     new_cds,
874                                     rapi_cert_pem=rapi_cert_pem,
875                                     spice_cert_pem=spice_cert_pem,
876                                     spice_cacert_pem=spice_cacert_pem,
877                                     cds=cds)
878
879     files_to_copy = []
880
881     if new_cluster_cert:
882       files_to_copy.append(constants.NODED_CERT_FILE)
883
884     if new_rapi_cert or rapi_cert_pem:
885       files_to_copy.append(constants.RAPI_CERT_FILE)
886
887     if new_spice_cert or spice_cert_pem:
888       files_to_copy.append(constants.SPICE_CERT_FILE)
889       files_to_copy.append(constants.SPICE_CACERT_FILE)
890
891     if new_confd_hmac_key:
892       files_to_copy.append(constants.CONFD_HMAC_KEY)
893
894     if new_cds or cds:
895       files_to_copy.append(constants.CLUSTER_DOMAIN_SECRET_FILE)
896
897     if files_to_copy:
898       for node_name in ctx.nonmaster_nodes:
899         ctx.feedback_fn("Copying %s to %s" %
900                         (", ".join(files_to_copy), node_name))
901         for file_name in files_to_copy:
902           ctx.ssh.CopyFileToNode(node_name, file_name)
903
904   RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
905
906   ToStdout("All requested certificates and keys have been replaced."
907            " Running \"gnt-cluster verify\" now is recommended.")
908
909   return 0
910
911
912 def RenewCrypto(opts, args):
913   """Renews cluster certificates, keys and secrets.
914
915   """
916   return _RenewCrypto(opts.new_cluster_cert,
917                       opts.new_rapi_cert,
918                       opts.rapi_cert,
919                       opts.new_spice_cert,
920                       opts.spice_cert,
921                       opts.spice_cacert,
922                       opts.new_confd_hmac_key,
923                       opts.new_cluster_domain_secret,
924                       opts.cluster_domain_secret,
925                       opts.force)
926
927
928 def SetClusterParams(opts, args):
929   """Modify the cluster.
930
931   @param opts: the command line options selected by the user
932   @type args: list
933   @param args: should be an empty list
934   @rtype: int
935   @return: the desired exit code
936
937   """
938   if not (not opts.lvm_storage or opts.vg_name or
939           not opts.drbd_storage or opts.drbd_helper or
940           opts.enabled_hypervisors or opts.hvparams or
941           opts.beparams or opts.nicparams or
942           opts.ndparams or opts.diskparams or
943           opts.candidate_pool_size is not None or
944           opts.uid_pool is not None or
945           opts.maintain_node_health is not None or
946           opts.add_uids is not None or
947           opts.remove_uids is not None or
948           opts.default_iallocator is not None or
949           opts.reserved_lvs is not None or
950           opts.master_netdev is not None or
951           opts.master_netmask is not None or
952           opts.use_external_mip_script is not None or
953           opts.prealloc_wipe_disks is not None or
954           opts.hv_state or
955           opts.disk_state or
956           opts.ispecs_mem_size or
957           opts.ispecs_cpu_count or
958           opts.ispecs_disk_count or
959           opts.ispecs_disk_size or
960           opts.ispecs_nic_count or
961           opts.ipolicy_disk_templates is not None or
962           opts.ipolicy_vcpu_ratio is not None or
963           opts.ipolicy_spindle_ratio is not None):
964     ToStderr("Please give at least one of the parameters.")
965     return 1
966
967   vg_name = opts.vg_name
968   if not opts.lvm_storage and opts.vg_name:
969     ToStderr("Options --no-lvm-storage and --vg-name conflict.")
970     return 1
971
972   if not opts.lvm_storage:
973     vg_name = ""
974
975   drbd_helper = opts.drbd_helper
976   if not opts.drbd_storage and opts.drbd_helper:
977     ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
978     return 1
979
980   if not opts.drbd_storage:
981     drbd_helper = ""
982
983   hvlist = opts.enabled_hypervisors
984   if hvlist is not None:
985     hvlist = hvlist.split(",")
986
987   # a list of (name, dict) we can pass directly to dict() (or [])
988   hvparams = dict(opts.hvparams)
989   for hv_params in hvparams.values():
990     utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
991
992   diskparams = dict(opts.diskparams)
993
994   for dt_params in diskparams.values():
995     utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
996
997   beparams = opts.beparams
998   utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
999
1000   nicparams = opts.nicparams
1001   utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1002
1003   ndparams = opts.ndparams
1004   if ndparams is not None:
1005     utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1006
1007   ipolicy = CreateIPolicyFromOpts(
1008     ispecs_mem_size=opts.ispecs_mem_size,
1009     ispecs_cpu_count=opts.ispecs_cpu_count,
1010     ispecs_disk_count=opts.ispecs_disk_count,
1011     ispecs_disk_size=opts.ispecs_disk_size,
1012     ispecs_nic_count=opts.ispecs_nic_count,
1013     ipolicy_disk_templates=opts.ipolicy_disk_templates,
1014     ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1015     ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1016     )
1017
1018   mnh = opts.maintain_node_health
1019
1020   uid_pool = opts.uid_pool
1021   if uid_pool is not None:
1022     uid_pool = uidpool.ParseUidPool(uid_pool)
1023
1024   add_uids = opts.add_uids
1025   if add_uids is not None:
1026     add_uids = uidpool.ParseUidPool(add_uids)
1027
1028   remove_uids = opts.remove_uids
1029   if remove_uids is not None:
1030     remove_uids = uidpool.ParseUidPool(remove_uids)
1031
1032   if opts.reserved_lvs is not None:
1033     if opts.reserved_lvs == "":
1034       opts.reserved_lvs = []
1035     else:
1036       opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1037
1038   if opts.master_netmask is not None:
1039     try:
1040       opts.master_netmask = int(opts.master_netmask)
1041     except ValueError:
1042       ToStderr("The --master-netmask option expects an int parameter.")
1043       return 1
1044
1045   ext_ip_script = opts.use_external_mip_script
1046
1047   if opts.disk_state:
1048     disk_state = utils.FlatToDict(opts.disk_state)
1049   else:
1050     disk_state = {}
1051
1052   hv_state = dict(opts.hv_state)
1053
1054   op = opcodes.OpClusterSetParams(vg_name=vg_name,
1055                                   drbd_helper=drbd_helper,
1056                                   enabled_hypervisors=hvlist,
1057                                   hvparams=hvparams,
1058                                   os_hvp=None,
1059                                   beparams=beparams,
1060                                   nicparams=nicparams,
1061                                   ndparams=ndparams,
1062                                   diskparams=diskparams,
1063                                   ipolicy=ipolicy,
1064                                   candidate_pool_size=opts.candidate_pool_size,
1065                                   maintain_node_health=mnh,
1066                                   uid_pool=uid_pool,
1067                                   add_uids=add_uids,
1068                                   remove_uids=remove_uids,
1069                                   default_iallocator=opts.default_iallocator,
1070                                   prealloc_wipe_disks=opts.prealloc_wipe_disks,
1071                                   master_netdev=opts.master_netdev,
1072                                   master_netmask=opts.master_netmask,
1073                                   reserved_lvs=opts.reserved_lvs,
1074                                   use_external_mip_script=ext_ip_script,
1075                                   hv_state=hv_state,
1076                                   disk_state=disk_state,
1077                                   )
1078   SubmitOrSend(op, opts)
1079   return 0
1080
1081
1082 def QueueOps(opts, args):
1083   """Queue operations.
1084
1085   @param opts: the command line options selected by the user
1086   @type args: list
1087   @param args: should contain only one element, the subcommand
1088   @rtype: int
1089   @return: the desired exit code
1090
1091   """
1092   command = args[0]
1093   client = GetClient()
1094   if command in ("drain", "undrain"):
1095     drain_flag = command == "drain"
1096     client.SetQueueDrainFlag(drain_flag)
1097   elif command == "info":
1098     result = client.QueryConfigValues(["drain_flag"])
1099     if result[0]:
1100       val = "set"
1101     else:
1102       val = "unset"
1103     ToStdout("The drain flag is %s" % val)
1104   else:
1105     raise errors.OpPrereqError("Command '%s' is not valid." % command,
1106                                errors.ECODE_INVAL)
1107
1108   return 0
1109
1110
1111 def _ShowWatcherPause(until):
1112   if until is None or until < time.time():
1113     ToStdout("The watcher is not paused.")
1114   else:
1115     ToStdout("The watcher is paused until %s.", time.ctime(until))
1116
1117
1118 def WatcherOps(opts, args):
1119   """Watcher operations.
1120
1121   @param opts: the command line options selected by the user
1122   @type args: list
1123   @param args: should contain only one element, the subcommand
1124   @rtype: int
1125   @return: the desired exit code
1126
1127   """
1128   command = args[0]
1129   client = GetClient()
1130
1131   if command == "continue":
1132     client.SetWatcherPause(None)
1133     ToStdout("The watcher is no longer paused.")
1134
1135   elif command == "pause":
1136     if len(args) < 2:
1137       raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1138
1139     result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1140     _ShowWatcherPause(result)
1141
1142   elif command == "info":
1143     result = client.QueryConfigValues(["watcher_pause"])
1144     _ShowWatcherPause(result[0])
1145
1146   else:
1147     raise errors.OpPrereqError("Command '%s' is not valid." % command,
1148                                errors.ECODE_INVAL)
1149
1150   return 0
1151
1152
1153 def _OobPower(opts, node_list, power):
1154   """Puts the node in the list to desired power state.
1155
1156   @param opts: The command line options selected by the user
1157   @param node_list: The list of nodes to operate on
1158   @param power: True if they should be powered on, False otherwise
1159   @return: The success of the operation (none failed)
1160
1161   """
1162   if power:
1163     command = constants.OOB_POWER_ON
1164   else:
1165     command = constants.OOB_POWER_OFF
1166
1167   op = opcodes.OpOobCommand(node_names=node_list,
1168                             command=command,
1169                             ignore_status=True,
1170                             timeout=opts.oob_timeout,
1171                             power_delay=opts.power_delay)
1172   result = SubmitOpCode(op, opts=opts)
1173   errs = 0
1174   for node_result in result:
1175     (node_tuple, data_tuple) = node_result
1176     (_, node_name) = node_tuple
1177     (data_status, _) = data_tuple
1178     if data_status != constants.RS_NORMAL:
1179       assert data_status != constants.RS_UNAVAIL
1180       errs += 1
1181       ToStderr("There was a problem changing power for %s, please investigate",
1182                node_name)
1183
1184   if errs > 0:
1185     return False
1186
1187   return True
1188
1189
1190 def _InstanceStart(opts, inst_list, start, no_remember=False):
1191   """Puts the instances in the list to desired state.
1192
1193   @param opts: The command line options selected by the user
1194   @param inst_list: The list of instances to operate on
1195   @param start: True if they should be started, False for shutdown
1196   @param no_remember: If the instance state should be remembered
1197   @return: The success of the operation (none failed)
1198
1199   """
1200   if start:
1201     opcls = opcodes.OpInstanceStartup
1202     text_submit, text_success, text_failed = ("startup", "started", "starting")
1203   else:
1204     opcls = compat.partial(opcodes.OpInstanceShutdown,
1205                            timeout=opts.shutdown_timeout,
1206                            no_remember=no_remember)
1207     text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1208
1209   jex = JobExecutor(opts=opts)
1210
1211   for inst in inst_list:
1212     ToStdout("Submit %s of instance %s", text_submit, inst)
1213     op = opcls(instance_name=inst)
1214     jex.QueueJob(inst, op)
1215
1216   results = jex.GetResults()
1217   bad_cnt = len([1 for (success, _) in results if not success])
1218
1219   if bad_cnt == 0:
1220     ToStdout("All instances have been %s successfully", text_success)
1221   else:
1222     ToStderr("There were errors while %s instances:\n"
1223              "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1224              len(results))
1225     return False
1226
1227   return True
1228
1229
1230 class _RunWhenNodesReachableHelper:
1231   """Helper class to make shared internal state sharing easier.
1232
1233   @ivar success: Indicates if all action_cb calls were successful
1234
1235   """
1236   def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1237                _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1238     """Init the object.
1239
1240     @param node_list: The list of nodes to be reachable
1241     @param action_cb: Callback called when a new host is reachable
1242     @type node2ip: dict
1243     @param node2ip: Node to ip mapping
1244     @param port: The port to use for the TCP ping
1245     @param feedback_fn: The function used for feedback
1246     @param _ping_fn: Function to check reachabilty (for unittest use only)
1247     @param _sleep_fn: Function to sleep (for unittest use only)
1248
1249     """
1250     self.down = set(node_list)
1251     self.up = set()
1252     self.node2ip = node2ip
1253     self.success = True
1254     self.action_cb = action_cb
1255     self.port = port
1256     self.feedback_fn = feedback_fn
1257     self._ping_fn = _ping_fn
1258     self._sleep_fn = _sleep_fn
1259
1260   def __call__(self):
1261     """When called we run action_cb.
1262
1263     @raises utils.RetryAgain: When there are still down nodes
1264
1265     """
1266     if not self.action_cb(self.up):
1267       self.success = False
1268
1269     if self.down:
1270       raise utils.RetryAgain()
1271     else:
1272       return self.success
1273
1274   def Wait(self, secs):
1275     """Checks if a host is up or waits remaining seconds.
1276
1277     @param secs: The secs remaining
1278
1279     """
1280     start = time.time()
1281     for node in self.down:
1282       if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1283                        live_port_needed=True):
1284         self.feedback_fn("Node %s became available" % node)
1285         self.up.add(node)
1286         self.down -= self.up
1287         # If we have a node available there is the possibility to run the
1288         # action callback successfully, therefore we don't wait and return
1289         return
1290
1291     self._sleep_fn(max(0.0, start + secs - time.time()))
1292
1293
1294 def _RunWhenNodesReachable(node_list, action_cb, interval):
1295   """Run action_cb when nodes become reachable.
1296
1297   @param node_list: The list of nodes to be reachable
1298   @param action_cb: Callback called when a new host is reachable
1299   @param interval: The earliest time to retry
1300
1301   """
1302   client = GetClient()
1303   cluster_info = client.QueryClusterInfo()
1304   if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1305     family = netutils.IPAddress.family
1306   else:
1307     family = netutils.IP6Address.family
1308
1309   node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1310                  for node in node_list)
1311
1312   port = netutils.GetDaemonPort(constants.NODED)
1313   helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1314                                         ToStdout)
1315
1316   try:
1317     return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1318                        wait_fn=helper.Wait)
1319   except utils.RetryTimeout:
1320     ToStderr("Time exceeded while waiting for nodes to become reachable"
1321              " again:\n  - %s", "  - ".join(helper.down))
1322     return False
1323
1324
1325 def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1326                           _instance_start_fn=_InstanceStart):
1327   """Start the instances conditional based on node_states.
1328
1329   @param opts: The command line options selected by the user
1330   @param inst_map: A dict of inst -> nodes mapping
1331   @param nodes_online: A list of nodes online
1332   @param _instance_start_fn: Callback to start instances (unittest use only)
1333   @return: Success of the operation on all instances
1334
1335   """
1336   start_inst_list = []
1337   for (inst, nodes) in inst_map.items():
1338     if not (nodes - nodes_online):
1339       # All nodes the instance lives on are back online
1340       start_inst_list.append(inst)
1341
1342   for inst in start_inst_list:
1343     del inst_map[inst]
1344
1345   if start_inst_list:
1346     return _instance_start_fn(opts, start_inst_list, True)
1347
1348   return True
1349
1350
1351 def _EpoOn(opts, full_node_list, node_list, inst_map):
1352   """Does the actual power on.
1353
1354   @param opts: The command line options selected by the user
1355   @param full_node_list: All nodes to operate on (includes nodes not supporting
1356                          OOB)
1357   @param node_list: The list of nodes to operate on (all need to support OOB)
1358   @param inst_map: A dict of inst -> nodes mapping
1359   @return: The desired exit status
1360
1361   """
1362   if node_list and not _OobPower(opts, node_list, False):
1363     ToStderr("Not all nodes seem to get back up, investigate and start"
1364              " manually if needed")
1365
1366   # Wait for the nodes to be back up
1367   action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1368
1369   ToStdout("Waiting until all nodes are available again")
1370   if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1371     ToStderr("Please investigate and start stopped instances manually")
1372     return constants.EXIT_FAILURE
1373
1374   return constants.EXIT_SUCCESS
1375
1376
1377 def _EpoOff(opts, node_list, inst_map):
1378   """Does the actual power off.
1379
1380   @param opts: The command line options selected by the user
1381   @param node_list: The list of nodes to operate on (all need to support OOB)
1382   @param inst_map: A dict of inst -> nodes mapping
1383   @return: The desired exit status
1384
1385   """
1386   if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1387     ToStderr("Please investigate and stop instances manually before continuing")
1388     return constants.EXIT_FAILURE
1389
1390   if not node_list:
1391     return constants.EXIT_SUCCESS
1392
1393   if _OobPower(opts, node_list, False):
1394     return constants.EXIT_SUCCESS
1395   else:
1396     return constants.EXIT_FAILURE
1397
1398
1399 def Epo(opts, args):
1400   """EPO operations.
1401
1402   @param opts: the command line options selected by the user
1403   @type args: list
1404   @param args: should contain only one element, the subcommand
1405   @rtype: int
1406   @return: the desired exit code
1407
1408   """
1409   if opts.groups and opts.show_all:
1410     ToStderr("Only one of --groups or --all are allowed")
1411     return constants.EXIT_FAILURE
1412   elif args and opts.show_all:
1413     ToStderr("Arguments in combination with --all are not allowed")
1414     return constants.EXIT_FAILURE
1415
1416   client = GetClient()
1417
1418   if opts.groups:
1419     node_query_list = itertools.chain(*client.QueryGroups(names=args,
1420                                                           fields=["node_list"],
1421                                                           use_locking=False))
1422   else:
1423     node_query_list = args
1424
1425   result = client.QueryNodes(names=node_query_list,
1426                              fields=["name", "master", "pinst_list",
1427                                      "sinst_list", "powered", "offline"],
1428                              use_locking=False)
1429   node_list = []
1430   inst_map = {}
1431   for (idx, (node, master, pinsts, sinsts, powered,
1432              offline)) in enumerate(result):
1433     # Normalize the node_query_list as well
1434     if not opts.show_all:
1435       node_query_list[idx] = node
1436     if not offline:
1437       for inst in (pinsts + sinsts):
1438         if inst in inst_map:
1439           if not master:
1440             inst_map[inst].add(node)
1441         elif master:
1442           inst_map[inst] = set()
1443         else:
1444           inst_map[inst] = set([node])
1445
1446     if master and opts.on:
1447       # We ignore the master for turning on the machines, in fact we are
1448       # already operating on the master at this point :)
1449       continue
1450     elif master and not opts.show_all:
1451       ToStderr("%s is the master node, please do a master-failover to another"
1452                " node not affected by the EPO or use --all if you intend to"
1453                " shutdown the whole cluster", node)
1454       return constants.EXIT_FAILURE
1455     elif powered is None:
1456       ToStdout("Node %s does not support out-of-band handling, it can not be"
1457                " handled in a fully automated manner", node)
1458     elif powered == opts.on:
1459       ToStdout("Node %s is already in desired power state, skipping", node)
1460     elif not offline or (offline and powered):
1461       node_list.append(node)
1462
1463   if not opts.force and not ConfirmOperation(node_query_list, "nodes", "epo"):
1464     return constants.EXIT_FAILURE
1465
1466   if opts.on:
1467     return _EpoOn(opts, node_query_list, node_list, inst_map)
1468   else:
1469     return _EpoOff(opts, node_list, inst_map)
1470
1471 commands = {
1472   "init": (
1473     InitCluster, [ArgHost(min=1, max=1)],
1474     [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1475      HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1476      NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1477      NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1478      MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1479      DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1480      NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1481      DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + INSTANCE_POLICY_OPTS,
1482     "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1483   "destroy": (
1484     DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1485     "", "Destroy cluster"),
1486   "rename": (
1487     RenameCluster, [ArgHost(min=1, max=1)],
1488     [FORCE_OPT, DRY_RUN_OPT],
1489     "<new_name>",
1490     "Renames the cluster"),
1491   "redist-conf": (
1492     RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1493     "", "Forces a push of the configuration file and ssconf files"
1494     " to the nodes in the cluster"),
1495   "verify": (
1496     VerifyCluster, ARGS_NONE,
1497     [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1498      DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1499     "", "Does a check on the cluster configuration"),
1500   "verify-disks": (
1501     VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1502     "", "Does a check on the cluster disk status"),
1503   "repair-disk-sizes": (
1504     RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1505     "[instance...]", "Updates mismatches in recorded disk sizes"),
1506   "master-failover": (
1507     MasterFailover, ARGS_NONE, [NOVOTING_OPT],
1508     "", "Makes the current node the master"),
1509   "master-ping": (
1510     MasterPing, ARGS_NONE, [],
1511     "", "Checks if the master is alive"),
1512   "version": (
1513     ShowClusterVersion, ARGS_NONE, [],
1514     "", "Shows the cluster version"),
1515   "getmaster": (
1516     ShowClusterMaster, ARGS_NONE, [],
1517     "", "Shows the cluster master"),
1518   "copyfile": (
1519     ClusterCopyFile, [ArgFile(min=1, max=1)],
1520     [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1521     "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1522   "command": (
1523     RunClusterCommand, [ArgCommand(min=1)],
1524     [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT],
1525     "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1526   "info": (
1527     ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1528     "[--roman]", "Show cluster configuration"),
1529   "list-tags": (
1530     ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1531   "add-tags": (
1532     AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1533     "tag...", "Add tags to the cluster"),
1534   "remove-tags": (
1535     RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1536     "tag...", "Remove tags from the cluster"),
1537   "search-tags": (
1538     SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1539     "Searches the tags on all objects on"
1540     " the cluster for a given pattern (regex)"),
1541   "queue": (
1542     QueueOps,
1543     [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1544     [], "drain|undrain|info", "Change queue properties"),
1545   "watcher": (
1546     WatcherOps,
1547     [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1548      ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1549     [],
1550     "{pause <timespec>|continue|info}", "Change watcher properties"),
1551   "modify": (
1552     SetClusterParams, ARGS_NONE,
1553     [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1554      MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1555      MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1556      DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1557      RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1558      NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1559      DISK_STATE_OPT, SUBMIT_OPT] +
1560     INSTANCE_POLICY_OPTS,
1561     "[opts...]",
1562     "Alters the parameters of the cluster"),
1563   "renew-crypto": (
1564     RenewCrypto, ARGS_NONE,
1565     [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1566      NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1567      NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1568      NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1569     "[opts...]",
1570     "Renews cluster certificates, keys and secrets"),
1571   "epo": (
1572     Epo, [ArgUnknown()],
1573     [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1574      SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1575     "[opts...] [args]",
1576     "Performs an emergency power-off on given args"),
1577   "activate-master-ip": (
1578     ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1579   "deactivate-master-ip": (
1580     DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1581     "Deactivates the master IP"),
1582   }
1583
1584
1585 #: dictionary with aliases for commands
1586 aliases = {
1587   "masterfailover": "master-failover",
1588   "show": "info",
1589 }
1590
1591
1592 def Main():
1593   return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1594                      aliases=aliases)