Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ a24aed2a

History | View | Annotate | Download (51.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45

    
46

    
47
ON_OPT = cli_option("--on", default=False,
48
                    action="store_true", dest="on",
49
                    help="Recover from an EPO")
50

    
51
GROUPS_OPT = cli_option("--groups", default=False,
52
                    action="store_true", dest="groups",
53
                    help="Arguments are node groups instead of nodes")
54

    
55
SHOW_MACHINE_OPT = cli_option("-M", "--show-machine-names", default=False,
56
                              action="store_true",
57
                              help="Show machine name for every line in output")
58

    
59
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
60
_EPO_PING_TIMEOUT = 1 # 1 second
61
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
62

    
63

    
64
@UsesRPC
65
def InitCluster(opts, args):
66
  """Initialize the cluster.
67

68
  @param opts: the command line options selected by the user
69
  @type args: list
70
  @param args: should contain only one element, the desired
71
      cluster name
72
  @rtype: int
73
  @return: the desired exit code
74

75
  """
76
  if not opts.lvm_storage and opts.vg_name:
77
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
78
    return 1
79

    
80
  vg_name = opts.vg_name
81
  if opts.lvm_storage and not opts.vg_name:
82
    vg_name = constants.DEFAULT_VG
83

    
84
  if not opts.drbd_storage and opts.drbd_helper:
85
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
86
    return 1
87

    
88
  drbd_helper = opts.drbd_helper
89
  if opts.drbd_storage and not opts.drbd_helper:
90
    drbd_helper = constants.DEFAULT_DRBD_HELPER
91

    
92
  master_netdev = opts.master_netdev
93
  if master_netdev is None:
94
    master_netdev = constants.DEFAULT_BRIDGE
95

    
96
  hvlist = opts.enabled_hypervisors
97
  if hvlist is None:
98
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
99
  hvlist = hvlist.split(",")
100

    
101
  hvparams = dict(opts.hvparams)
102
  beparams = opts.beparams
103
  nicparams = opts.nicparams
104

    
105
  diskparams = dict(opts.diskparams)
106

    
107
  # check the disk template types here, as we cannot rely on the type check done
108
  # by the opcode parameter types
109
  diskparams_keys = set(diskparams.keys())
110
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
111
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
112
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
113
    return 1
114

    
115
  # prepare beparams dict
116
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
117
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
118

    
119
  # prepare nicparams dict
120
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
121
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
122

    
123
  # prepare ndparams dict
124
  if opts.ndparams is None:
125
    ndparams = dict(constants.NDC_DEFAULTS)
126
  else:
127
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
128
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
129

    
130
  # prepare hvparams dict
131
  for hv in constants.HYPER_TYPES:
132
    if hv not in hvparams:
133
      hvparams[hv] = {}
134
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
135
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
136

    
137
  # prepare diskparams dict
138
  for templ in constants.DISK_TEMPLATES:
139
    if templ not in diskparams:
140
      diskparams[templ] = {}
141
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
142
                                         diskparams[templ])
143
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
144

    
145
  # prepare ipolicy dict
146
  ispecs_dts = opts.ispecs_disk_templates # hate long var names
147
  ipolicy_raw = \
148
    objects.CreateIPolicyFromOpts(ispecs_mem_size=opts.ispecs_mem_size,
149
                                  ispecs_cpu_count=opts.ispecs_cpu_count,
150
                                  ispecs_disk_count=opts.ispecs_disk_count,
151
                                  ispecs_disk_size=opts.ispecs_disk_size,
152
                                  ispecs_nic_count=opts.ispecs_nic_count,
153
                                  ispecs_disk_templates=ispecs_dts,
154
                                  fill_all=True)
155
  ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_raw)
156

    
157
  if opts.candidate_pool_size is None:
158
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
159

    
160
  if opts.mac_prefix is None:
161
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
162

    
163
  uid_pool = opts.uid_pool
164
  if uid_pool is not None:
165
    uid_pool = uidpool.ParseUidPool(uid_pool)
166

    
167
  if opts.prealloc_wipe_disks is None:
168
    opts.prealloc_wipe_disks = False
169

    
170
  external_ip_setup_script = opts.use_external_mip_script
171
  if external_ip_setup_script is None:
172
    external_ip_setup_script = False
173

    
174
  try:
175
    primary_ip_version = int(opts.primary_ip_version)
176
  except (ValueError, TypeError), err:
177
    ToStderr("Invalid primary ip version value: %s" % str(err))
178
    return 1
179

    
180
  master_netmask = opts.master_netmask
181
  try:
182
    if master_netmask is not None:
183
      master_netmask = int(master_netmask)
184
  except (ValueError, TypeError), err:
185
    ToStderr("Invalid master netmask value: %s" % str(err))
186
    return 1
187

    
188
  if opts.disk_state:
189
    disk_state = utils.FlatToDict(opts.disk_state)
190
  else:
191
    disk_state = {}
192

    
193
  hv_state = dict(opts.hv_state)
194

    
195
  bootstrap.InitCluster(cluster_name=args[0],
196
                        secondary_ip=opts.secondary_ip,
197
                        vg_name=vg_name,
198
                        mac_prefix=opts.mac_prefix,
199
                        master_netmask=master_netmask,
200
                        master_netdev=master_netdev,
201
                        file_storage_dir=opts.file_storage_dir,
202
                        shared_file_storage_dir=opts.shared_file_storage_dir,
203
                        enabled_hypervisors=hvlist,
204
                        hvparams=hvparams,
205
                        beparams=beparams,
206
                        nicparams=nicparams,
207
                        ndparams=ndparams,
208
                        diskparams=diskparams,
209
                        ipolicy=ipolicy,
210
                        candidate_pool_size=opts.candidate_pool_size,
211
                        modify_etc_hosts=opts.modify_etc_hosts,
212
                        modify_ssh_setup=opts.modify_ssh_setup,
213
                        maintain_node_health=opts.maintain_node_health,
214
                        drbd_helper=drbd_helper,
215
                        uid_pool=uid_pool,
216
                        default_iallocator=opts.default_iallocator,
217
                        primary_ip_version=primary_ip_version,
218
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
219
                        use_external_mip_script=external_ip_setup_script,
220
                        hv_state=hv_state,
221
                        disk_state=disk_state,
222
                        )
223
  op = opcodes.OpClusterPostInit()
224
  SubmitOpCode(op, opts=opts)
225
  return 0
226

    
227

    
228
@UsesRPC
229
def DestroyCluster(opts, args):
230
  """Destroy the cluster.
231

232
  @param opts: the command line options selected by the user
233
  @type args: list
234
  @param args: should be an empty list
235
  @rtype: int
236
  @return: the desired exit code
237

238
  """
239
  if not opts.yes_do_it:
240
    ToStderr("Destroying a cluster is irreversible. If you really want"
241
             " destroy this cluster, supply the --yes-do-it option.")
242
    return 1
243

    
244
  op = opcodes.OpClusterDestroy()
245
  master = SubmitOpCode(op, opts=opts)
246
  # if we reached this, the opcode didn't fail; we can proceed to
247
  # shutdown all the daemons
248
  bootstrap.FinalizeClusterDestroy(master)
249
  return 0
250

    
251

    
252
def RenameCluster(opts, args):
253
  """Rename the cluster.
254

255
  @param opts: the command line options selected by the user
256
  @type args: list
257
  @param args: should contain only one element, the new cluster name
258
  @rtype: int
259
  @return: the desired exit code
260

261
  """
262
  cl = GetClient()
263

    
264
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
265

    
266
  new_name = args[0]
267
  if not opts.force:
268
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
269
                " connected over the network to the cluster name, the"
270
                " operation is very dangerous as the IP address will be"
271
                " removed from the node and the change may not go through."
272
                " Continue?") % (cluster_name, new_name)
273
    if not AskUser(usertext):
274
      return 1
275

    
276
  op = opcodes.OpClusterRename(name=new_name)
277
  result = SubmitOpCode(op, opts=opts, cl=cl)
278

    
279
  if result:
280
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
281

    
282
  return 0
283

    
284

    
285
def ActivateMasterIp(opts, args):
286
  """Activates the master IP.
287

288
  """
289
  op = opcodes.OpClusterActivateMasterIp()
290
  SubmitOpCode(op)
291
  return 0
292

    
293

    
294
def DeactivateMasterIp(opts, args):
295
  """Deactivates the master IP.
296

297
  """
298
  if not opts.confirm:
299
    usertext = ("This will disable the master IP. All the open connections to"
300
                " the master IP will be closed. To reach the master you will"
301
                " need to use its node IP."
302
                " Continue?")
303
    if not AskUser(usertext):
304
      return 1
305

    
306
  op = opcodes.OpClusterDeactivateMasterIp()
307
  SubmitOpCode(op)
308
  return 0
309

    
310

    
311
def RedistributeConfig(opts, args):
312
  """Forces push of the cluster configuration.
313

314
  @param opts: the command line options selected by the user
315
  @type args: list
316
  @param args: empty list
317
  @rtype: int
318
  @return: the desired exit code
319

320
  """
321
  op = opcodes.OpClusterRedistConf()
322
  SubmitOrSend(op, opts)
323
  return 0
324

    
325

    
326
def ShowClusterVersion(opts, args):
327
  """Write version of ganeti software to the standard output.
328

329
  @param opts: the command line options selected by the user
330
  @type args: list
331
  @param args: should be an empty list
332
  @rtype: int
333
  @return: the desired exit code
334

335
  """
336
  cl = GetClient()
337
  result = cl.QueryClusterInfo()
338
  ToStdout("Software version: %s", result["software_version"])
339
  ToStdout("Internode protocol: %s", result["protocol_version"])
340
  ToStdout("Configuration format: %s", result["config_version"])
341
  ToStdout("OS api version: %s", result["os_api_version"])
342
  ToStdout("Export interface: %s", result["export_version"])
343
  return 0
344

    
345

    
346
def ShowClusterMaster(opts, args):
347
  """Write name of master node to the standard output.
348

349
  @param opts: the command line options selected by the user
350
  @type args: list
351
  @param args: should be an empty list
352
  @rtype: int
353
  @return: the desired exit code
354

355
  """
356
  master = bootstrap.GetMaster()
357
  ToStdout(master)
358
  return 0
359

    
360

    
361
def _PrintGroupedParams(paramsdict, level=1, roman=False):
362
  """Print Grouped parameters (be, nic, disk) by group.
363

364
  @type paramsdict: dict of dicts
365
  @param paramsdict: {group: {param: value, ...}, ...}
366
  @type level: int
367
  @param level: Level of indention
368

369
  """
370
  indent = "  " * level
371
  for item, val in sorted(paramsdict.items()):
372
    if isinstance(val, dict):
373
      ToStdout("%s- %s:", indent, item)
374
      _PrintGroupedParams(val, level=level + 1, roman=roman)
375
    elif roman and isinstance(val, int):
376
      ToStdout("%s  %s: %s", indent, item, compat.TryToRoman(val))
377
    else:
378
      ToStdout("%s  %s: %s", indent, item, val)
379

    
380

    
381
def ShowClusterConfig(opts, args):
382
  """Shows cluster information.
383

384
  @param opts: the command line options selected by the user
385
  @type args: list
386
  @param args: should be an empty list
387
  @rtype: int
388
  @return: the desired exit code
389

390
  """
391
  cl = GetClient()
392
  result = cl.QueryClusterInfo()
393

    
394
  ToStdout("Cluster name: %s", result["name"])
395
  ToStdout("Cluster UUID: %s", result["uuid"])
396

    
397
  ToStdout("Creation time: %s", utils.FormatTime(result["ctime"]))
398
  ToStdout("Modification time: %s", utils.FormatTime(result["mtime"]))
399

    
400
  ToStdout("Master node: %s", result["master"])
401

    
402
  ToStdout("Architecture (this node): %s (%s)",
403
           result["architecture"][0], result["architecture"][1])
404

    
405
  if result["tags"]:
406
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
407
  else:
408
    tags = "(none)"
409

    
410
  ToStdout("Tags: %s", tags)
411

    
412
  ToStdout("Default hypervisor: %s", result["default_hypervisor"])
413
  ToStdout("Enabled hypervisors: %s",
414
           utils.CommaJoin(result["enabled_hypervisors"]))
415

    
416
  ToStdout("Hypervisor parameters:")
417
  _PrintGroupedParams(result["hvparams"])
418

    
419
  ToStdout("OS-specific hypervisor parameters:")
420
  _PrintGroupedParams(result["os_hvp"])
421

    
422
  ToStdout("OS parameters:")
423
  _PrintGroupedParams(result["osparams"])
424

    
425
  ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"]))
426
  ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"]))
427

    
428
  ToStdout("Cluster parameters:")
429
  ToStdout("  - candidate pool size: %s",
430
            compat.TryToRoman(result["candidate_pool_size"],
431
                              convert=opts.roman_integers))
432
  ToStdout("  - master netdev: %s", result["master_netdev"])
433
  ToStdout("  - master netmask: %s", result["master_netmask"])
434
  ToStdout("  - use external master IP address setup script: %s",
435
           result["use_external_mip_script"])
436
  ToStdout("  - lvm volume group: %s", result["volume_group_name"])
437
  if result["reserved_lvs"]:
438
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
439
  else:
440
    reserved_lvs = "(none)"
441
  ToStdout("  - lvm reserved volumes: %s", reserved_lvs)
442
  ToStdout("  - drbd usermode helper: %s", result["drbd_usermode_helper"])
443
  ToStdout("  - file storage path: %s", result["file_storage_dir"])
444
  ToStdout("  - shared file storage path: %s",
445
           result["shared_file_storage_dir"])
446
  ToStdout("  - maintenance of node health: %s",
447
           result["maintain_node_health"])
448
  ToStdout("  - uid pool: %s",
449
            uidpool.FormatUidPool(result["uid_pool"],
450
                                  roman=opts.roman_integers))
451
  ToStdout("  - default instance allocator: %s", result["default_iallocator"])
452
  ToStdout("  - primary ip version: %d", result["primary_ip_version"])
453
  ToStdout("  - preallocation wipe disks: %s", result["prealloc_wipe_disks"])
454
  ToStdout("  - OS search path: %s", utils.CommaJoin(constants.OS_SEARCH_PATH))
455

    
456
  ToStdout("Default node parameters:")
457
  _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers)
458

    
459
  ToStdout("Default instance parameters:")
460
  _PrintGroupedParams(result["beparams"], roman=opts.roman_integers)
461

    
462
  ToStdout("Default nic parameters:")
463
  _PrintGroupedParams(result["nicparams"], roman=opts.roman_integers)
464

    
465
  ToStdout("Instance policy - limits for instances:")
466
  for key in constants.IPOLICY_PARAMETERS:
467
    ToStdout("  - %s", key)
468
    _PrintGroupedParams(result["ipolicy"][key], roman=opts.roman_integers)
469
  ToStdout("  - enabled disk templates: %s",
470
           utils.CommaJoin(result["ipolicy"][constants.ISPECS_DTS]))
471

    
472
  return 0
473

    
474

    
475
def ClusterCopyFile(opts, args):
476
  """Copy a file from master to some nodes.
477

478
  @param opts: the command line options selected by the user
479
  @type args: list
480
  @param args: should contain only one element, the path of
481
      the file to be copied
482
  @rtype: int
483
  @return: the desired exit code
484

485
  """
486
  filename = args[0]
487
  if not os.path.exists(filename):
488
    raise errors.OpPrereqError("No such filename '%s'" % filename,
489
                               errors.ECODE_INVAL)
490

    
491
  cl = GetClient()
492

    
493
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
494

    
495
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
496
                           secondary_ips=opts.use_replication_network,
497
                           nodegroup=opts.nodegroup)
498

    
499
  srun = ssh.SshRunner(cluster_name=cluster_name)
500
  for node in results:
501
    if not srun.CopyFileToNode(node, filename):
502
      ToStderr("Copy of file %s to node %s failed", filename, node)
503

    
504
  return 0
505

    
506

    
507
def RunClusterCommand(opts, args):
508
  """Run a command on some nodes.
509

510
  @param opts: the command line options selected by the user
511
  @type args: list
512
  @param args: should contain the command to be run and its arguments
513
  @rtype: int
514
  @return: the desired exit code
515

516
  """
517
  cl = GetClient()
518

    
519
  command = " ".join(args)
520

    
521
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
522

    
523
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
524
                                                    "master_node"])
525

    
526
  srun = ssh.SshRunner(cluster_name=cluster_name)
527

    
528
  # Make sure master node is at list end
529
  if master_node in nodes:
530
    nodes.remove(master_node)
531
    nodes.append(master_node)
532

    
533
  for name in nodes:
534
    result = srun.Run(name, "root", command)
535
    ToStdout("------------------------------------------------")
536
    if opts.show_machine_names:
537
      for line in result.output.splitlines():
538
        ToStdout("%s: %s", name, line)
539
    else:
540
      ToStdout("node: %s", name)
541
      ToStdout("%s", result.output)
542
    ToStdout("return code = %s", result.exit_code)
543

    
544
  return 0
545

    
546

    
547
def VerifyCluster(opts, args):
548
  """Verify integrity of cluster, performing various test on nodes.
549

550
  @param opts: the command line options selected by the user
551
  @type args: list
552
  @param args: should be an empty list
553
  @rtype: int
554
  @return: the desired exit code
555

556
  """
557
  skip_checks = []
558

    
559
  if opts.skip_nplusone_mem:
560
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
561

    
562
  cl = GetClient()
563

    
564
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
565
                               error_codes=opts.error_codes,
566
                               debug_simulate_errors=opts.simulate_errors,
567
                               skip_checks=skip_checks,
568
                               ignore_errors=opts.ignore_errors,
569
                               group_name=opts.nodegroup)
570
  result = SubmitOpCode(op, cl=cl, opts=opts)
571

    
572
  # Keep track of submitted jobs
573
  jex = JobExecutor(cl=cl, opts=opts)
574

    
575
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
576
    jex.AddJobId(None, status, job_id)
577

    
578
  results = jex.GetResults()
579

    
580
  (bad_jobs, bad_results) = \
581
    map(len,
582
        # Convert iterators to lists
583
        map(list,
584
            # Count errors
585
            map(compat.partial(itertools.ifilterfalse, bool),
586
                # Convert result to booleans in a tuple
587
                zip(*((job_success, len(op_results) == 1 and op_results[0])
588
                      for (job_success, op_results) in results)))))
589

    
590
  if bad_jobs == 0 and bad_results == 0:
591
    rcode = constants.EXIT_SUCCESS
592
  else:
593
    rcode = constants.EXIT_FAILURE
594
    if bad_jobs > 0:
595
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
596

    
597
  return rcode
598

    
599

    
600
def VerifyDisks(opts, args):
601
  """Verify integrity of cluster disks.
602

603
  @param opts: the command line options selected by the user
604
  @type args: list
605
  @param args: should be an empty list
606
  @rtype: int
607
  @return: the desired exit code
608

609
  """
610
  cl = GetClient()
611

    
612
  op = opcodes.OpClusterVerifyDisks()
613

    
614
  result = SubmitOpCode(op, cl=cl, opts=opts)
615

    
616
  # Keep track of submitted jobs
617
  jex = JobExecutor(cl=cl, opts=opts)
618

    
619
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
620
    jex.AddJobId(None, status, job_id)
621

    
622
  retcode = constants.EXIT_SUCCESS
623

    
624
  for (status, result) in jex.GetResults():
625
    if not status:
626
      ToStdout("Job failed: %s", result)
627
      continue
628

    
629
    ((bad_nodes, instances, missing), ) = result
630

    
631
    for node, text in bad_nodes.items():
632
      ToStdout("Error gathering data on node %s: %s",
633
               node, utils.SafeEncode(text[-400:]))
634
      retcode = constants.EXIT_FAILURE
635
      ToStdout("You need to fix these nodes first before fixing instances")
636

    
637
    for iname in instances:
638
      if iname in missing:
639
        continue
640
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
641
      try:
642
        ToStdout("Activating disks for instance '%s'", iname)
643
        SubmitOpCode(op, opts=opts, cl=cl)
644
      except errors.GenericError, err:
645
        nret, msg = FormatError(err)
646
        retcode |= nret
647
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
648

    
649
    if missing:
650
      for iname, ival in missing.iteritems():
651
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
652
        if all_missing:
653
          ToStdout("Instance %s cannot be verified as it lives on"
654
                   " broken nodes", iname)
655
        else:
656
          ToStdout("Instance %s has missing logical volumes:", iname)
657
          ival.sort()
658
          for node, vol in ival:
659
            if node in bad_nodes:
660
              ToStdout("\tbroken node %s /dev/%s", node, vol)
661
            else:
662
              ToStdout("\t%s /dev/%s", node, vol)
663

    
664
      ToStdout("You need to replace or recreate disks for all the above"
665
               " instances if this message persists after fixing broken nodes.")
666
      retcode = constants.EXIT_FAILURE
667

    
668
  return retcode
669

    
670

    
671
def RepairDiskSizes(opts, args):
672
  """Verify sizes of cluster disks.
673

674
  @param opts: the command line options selected by the user
675
  @type args: list
676
  @param args: optional list of instances to restrict check to
677
  @rtype: int
678
  @return: the desired exit code
679

680
  """
681
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
682
  SubmitOpCode(op, opts=opts)
683

    
684

    
685
@UsesRPC
686
def MasterFailover(opts, args):
687
  """Failover the master node.
688

689
  This command, when run on a non-master node, will cause the current
690
  master to cease being master, and the non-master to become new
691
  master.
692

693
  @param opts: the command line options selected by the user
694
  @type args: list
695
  @param args: should be an empty list
696
  @rtype: int
697
  @return: the desired exit code
698

699
  """
700
  if opts.no_voting:
701
    usertext = ("This will perform the failover even if most other nodes"
702
                " are down, or if this node is outdated. This is dangerous"
703
                " as it can lead to a non-consistent cluster. Check the"
704
                " gnt-cluster(8) man page before proceeding. Continue?")
705
    if not AskUser(usertext):
706
      return 1
707

    
708
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
709

    
710

    
711
def MasterPing(opts, args):
712
  """Checks if the master is alive.
713

714
  @param opts: the command line options selected by the user
715
  @type args: list
716
  @param args: should be an empty list
717
  @rtype: int
718
  @return: the desired exit code
719

720
  """
721
  try:
722
    cl = GetClient()
723
    cl.QueryClusterInfo()
724
    return 0
725
  except Exception: # pylint: disable=W0703
726
    return 1
727

    
728

    
729
def SearchTags(opts, args):
730
  """Searches the tags on all the cluster.
731

732
  @param opts: the command line options selected by the user
733
  @type args: list
734
  @param args: should contain only one element, the tag pattern
735
  @rtype: int
736
  @return: the desired exit code
737

738
  """
739
  op = opcodes.OpTagsSearch(pattern=args[0])
740
  result = SubmitOpCode(op, opts=opts)
741
  if not result:
742
    return 1
743
  result = list(result)
744
  result.sort()
745
  for path, tag in result:
746
    ToStdout("%s %s", path, tag)
747

    
748

    
749
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
750
  """Reads and verifies an X509 certificate.
751

752
  @type cert_filename: string
753
  @param cert_filename: the path of the file containing the certificate to
754
                        verify encoded in PEM format
755
  @type verify_private_key: bool
756
  @param verify_private_key: whether to verify the private key in addition to
757
                             the public certificate
758
  @rtype: string
759
  @return: a string containing the PEM-encoded certificate.
760

761
  """
762
  try:
763
    pem = utils.ReadFile(cert_filename)
764
  except IOError, err:
765
    raise errors.X509CertError(cert_filename,
766
                               "Unable to read certificate: %s" % str(err))
767

    
768
  try:
769
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
770
  except Exception, err:
771
    raise errors.X509CertError(cert_filename,
772
                               "Unable to load certificate: %s" % str(err))
773

    
774
  if verify_private_key:
775
    try:
776
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
777
    except Exception, err:
778
      raise errors.X509CertError(cert_filename,
779
                                 "Unable to load private key: %s" % str(err))
780

    
781
  return pem
782

    
783

    
784
def _RenewCrypto(new_cluster_cert, new_rapi_cert, #pylint: disable=R0911
785
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
786
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
787
                 cds_filename, force):
788
  """Renews cluster certificates, keys and secrets.
789

790
  @type new_cluster_cert: bool
791
  @param new_cluster_cert: Whether to generate a new cluster certificate
792
  @type new_rapi_cert: bool
793
  @param new_rapi_cert: Whether to generate a new RAPI certificate
794
  @type rapi_cert_filename: string
795
  @param rapi_cert_filename: Path to file containing new RAPI certificate
796
  @type new_spice_cert: bool
797
  @param new_spice_cert: Whether to generate a new SPICE certificate
798
  @type spice_cert_filename: string
799
  @param spice_cert_filename: Path to file containing new SPICE certificate
800
  @type spice_cacert_filename: string
801
  @param spice_cacert_filename: Path to file containing the certificate of the
802
                                CA that signed the SPICE certificate
803
  @type new_confd_hmac_key: bool
804
  @param new_confd_hmac_key: Whether to generate a new HMAC key
805
  @type new_cds: bool
806
  @param new_cds: Whether to generate a new cluster domain secret
807
  @type cds_filename: string
808
  @param cds_filename: Path to file containing new cluster domain secret
809
  @type force: bool
810
  @param force: Whether to ask user for confirmation
811

812
  """
813
  if new_rapi_cert and rapi_cert_filename:
814
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
815
             " options can be specified at the same time.")
816
    return 1
817

    
818
  if new_cds and cds_filename:
819
    ToStderr("Only one of the --new-cluster-domain-secret and"
820
             " --cluster-domain-secret options can be specified at"
821
             " the same time.")
822
    return 1
823

    
824
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
825
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
826
             " and --spice-ca-certificate must not be used.")
827
    return 1
828

    
829
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
830
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
831
             " specified.")
832
    return 1
833

    
834
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
835
  try:
836
    if rapi_cert_filename:
837
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
838
    if spice_cert_filename:
839
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
840
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
841
  except errors.X509CertError, err:
842
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
843
    return 1
844

    
845
  if cds_filename:
846
    try:
847
      cds = utils.ReadFile(cds_filename)
848
    except Exception, err: # pylint: disable=W0703
849
      ToStderr("Can't load new cluster domain secret from %s: %s" %
850
               (cds_filename, str(err)))
851
      return 1
852
  else:
853
    cds = None
854

    
855
  if not force:
856
    usertext = ("This requires all daemons on all nodes to be restarted and"
857
                " may take some time. Continue?")
858
    if not AskUser(usertext):
859
      return 1
860

    
861
  def _RenewCryptoInner(ctx):
862
    ctx.feedback_fn("Updating certificates and keys")
863
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
864
                                    new_rapi_cert,
865
                                    new_spice_cert,
866
                                    new_confd_hmac_key,
867
                                    new_cds,
868
                                    rapi_cert_pem=rapi_cert_pem,
869
                                    spice_cert_pem=spice_cert_pem,
870
                                    spice_cacert_pem=spice_cacert_pem,
871
                                    cds=cds)
872

    
873
    files_to_copy = []
874

    
875
    if new_cluster_cert:
876
      files_to_copy.append(constants.NODED_CERT_FILE)
877

    
878
    if new_rapi_cert or rapi_cert_pem:
879
      files_to_copy.append(constants.RAPI_CERT_FILE)
880

    
881
    if new_spice_cert or spice_cert_pem:
882
      files_to_copy.append(constants.SPICE_CERT_FILE)
883
      files_to_copy.append(constants.SPICE_CACERT_FILE)
884

    
885
    if new_confd_hmac_key:
886
      files_to_copy.append(constants.CONFD_HMAC_KEY)
887

    
888
    if new_cds or cds:
889
      files_to_copy.append(constants.CLUSTER_DOMAIN_SECRET_FILE)
890

    
891
    if files_to_copy:
892
      for node_name in ctx.nonmaster_nodes:
893
        ctx.feedback_fn("Copying %s to %s" %
894
                        (", ".join(files_to_copy), node_name))
895
        for file_name in files_to_copy:
896
          ctx.ssh.CopyFileToNode(node_name, file_name)
897

    
898
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
899

    
900
  ToStdout("All requested certificates and keys have been replaced."
901
           " Running \"gnt-cluster verify\" now is recommended.")
902

    
903
  return 0
904

    
905

    
906
def RenewCrypto(opts, args):
907
  """Renews cluster certificates, keys and secrets.
908

909
  """
910
  return _RenewCrypto(opts.new_cluster_cert,
911
                      opts.new_rapi_cert,
912
                      opts.rapi_cert,
913
                      opts.new_spice_cert,
914
                      opts.spice_cert,
915
                      opts.spice_cacert,
916
                      opts.new_confd_hmac_key,
917
                      opts.new_cluster_domain_secret,
918
                      opts.cluster_domain_secret,
919
                      opts.force)
920

    
921

    
922
def SetClusterParams(opts, args):
923
  """Modify the cluster.
924

925
  @param opts: the command line options selected by the user
926
  @type args: list
927
  @param args: should be an empty list
928
  @rtype: int
929
  @return: the desired exit code
930

931
  """
932
  if not (not opts.lvm_storage or opts.vg_name or
933
          not opts.drbd_storage or opts.drbd_helper or
934
          opts.enabled_hypervisors or opts.hvparams or
935
          opts.beparams or opts.nicparams or
936
          opts.ndparams or opts.diskparams or
937
          opts.candidate_pool_size is not None or
938
          opts.uid_pool is not None or
939
          opts.maintain_node_health is not None or
940
          opts.add_uids is not None or
941
          opts.remove_uids is not None or
942
          opts.default_iallocator is not None or
943
          opts.reserved_lvs is not None or
944
          opts.master_netdev is not None or
945
          opts.master_netmask is not None or
946
          opts.use_external_mip_script is not None or
947
          opts.prealloc_wipe_disks is not None or
948
          opts.hv_state or
949
          opts.disk_state or
950
          opts.ispecs_mem_size is not None or
951
          opts.ispecs_cpu_count is not None or
952
          opts.ispecs_disk_count is not None or
953
          opts.ispecs_disk_size is not None or
954
          opts.ispecs_nic_count is not None):
955
    ToStderr("Please give at least one of the parameters.")
956
    return 1
957

    
958
  vg_name = opts.vg_name
959
  if not opts.lvm_storage and opts.vg_name:
960
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
961
    return 1
962

    
963
  if not opts.lvm_storage:
964
    vg_name = ""
965

    
966
  drbd_helper = opts.drbd_helper
967
  if not opts.drbd_storage and opts.drbd_helper:
968
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
969
    return 1
970

    
971
  if not opts.drbd_storage:
972
    drbd_helper = ""
973

    
974
  hvlist = opts.enabled_hypervisors
975
  if hvlist is not None:
976
    hvlist = hvlist.split(",")
977

    
978
  # a list of (name, dict) we can pass directly to dict() (or [])
979
  hvparams = dict(opts.hvparams)
980
  for hv_params in hvparams.values():
981
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
982

    
983
  diskparams = dict(opts.diskparams)
984

    
985
  for dt_params in diskparams.values():
986
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
987

    
988
  beparams = opts.beparams
989
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
990

    
991
  nicparams = opts.nicparams
992
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
993

    
994
  ndparams = opts.ndparams
995
  if ndparams is not None:
996
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
997

    
998
  ispecs_dts = opts.ispecs_disk_templates
999
  ipolicy = \
1000
    objects.CreateIPolicyFromOpts(ispecs_mem_size=opts.ispecs_mem_size,
1001
                                  ispecs_cpu_count=opts.ispecs_cpu_count,
1002
                                  ispecs_disk_count=opts.ispecs_disk_count,
1003
                                  ispecs_disk_size=opts.ispecs_disk_size,
1004
                                  ispecs_nic_count=opts.ispecs_nic_count,
1005
                                  ispecs_disk_templates=ispecs_dts)
1006

    
1007
  mnh = opts.maintain_node_health
1008

    
1009
  uid_pool = opts.uid_pool
1010
  if uid_pool is not None:
1011
    uid_pool = uidpool.ParseUidPool(uid_pool)
1012

    
1013
  add_uids = opts.add_uids
1014
  if add_uids is not None:
1015
    add_uids = uidpool.ParseUidPool(add_uids)
1016

    
1017
  remove_uids = opts.remove_uids
1018
  if remove_uids is not None:
1019
    remove_uids = uidpool.ParseUidPool(remove_uids)
1020

    
1021
  if opts.reserved_lvs is not None:
1022
    if opts.reserved_lvs == "":
1023
      opts.reserved_lvs = []
1024
    else:
1025
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1026

    
1027
  if opts.master_netmask is not None:
1028
    try:
1029
      opts.master_netmask = int(opts.master_netmask)
1030
    except ValueError:
1031
      ToStderr("The --master-netmask option expects an int parameter.")
1032
      return 1
1033

    
1034
  ext_ip_script = opts.use_external_mip_script
1035

    
1036
  if opts.disk_state:
1037
    disk_state = utils.FlatToDict(opts.disk_state)
1038
  else:
1039
    disk_state = {}
1040

    
1041
  hv_state = dict(opts.hv_state)
1042

    
1043
  op = opcodes.OpClusterSetParams(vg_name=vg_name,
1044
                                  drbd_helper=drbd_helper,
1045
                                  enabled_hypervisors=hvlist,
1046
                                  hvparams=hvparams,
1047
                                  os_hvp=None,
1048
                                  beparams=beparams,
1049
                                  nicparams=nicparams,
1050
                                  ndparams=ndparams,
1051
                                  diskparams=diskparams,
1052
                                  ipolicy=ipolicy,
1053
                                  candidate_pool_size=opts.candidate_pool_size,
1054
                                  maintain_node_health=mnh,
1055
                                  uid_pool=uid_pool,
1056
                                  add_uids=add_uids,
1057
                                  remove_uids=remove_uids,
1058
                                  default_iallocator=opts.default_iallocator,
1059
                                  prealloc_wipe_disks=opts.prealloc_wipe_disks,
1060
                                  master_netdev=opts.master_netdev,
1061
                                  master_netmask=opts.master_netmask,
1062
                                  reserved_lvs=opts.reserved_lvs,
1063
                                  use_external_mip_script=ext_ip_script,
1064
                                  hv_state=hv_state,
1065
                                  disk_state=disk_state,
1066
                                  )
1067
  SubmitOpCode(op, opts=opts)
1068
  return 0
1069

    
1070

    
1071
def QueueOps(opts, args):
1072
  """Queue operations.
1073

1074
  @param opts: the command line options selected by the user
1075
  @type args: list
1076
  @param args: should contain only one element, the subcommand
1077
  @rtype: int
1078
  @return: the desired exit code
1079

1080
  """
1081
  command = args[0]
1082
  client = GetClient()
1083
  if command in ("drain", "undrain"):
1084
    drain_flag = command == "drain"
1085
    client.SetQueueDrainFlag(drain_flag)
1086
  elif command == "info":
1087
    result = client.QueryConfigValues(["drain_flag"])
1088
    if result[0]:
1089
      val = "set"
1090
    else:
1091
      val = "unset"
1092
    ToStdout("The drain flag is %s" % val)
1093
  else:
1094
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1095
                               errors.ECODE_INVAL)
1096

    
1097
  return 0
1098

    
1099

    
1100
def _ShowWatcherPause(until):
1101
  if until is None or until < time.time():
1102
    ToStdout("The watcher is not paused.")
1103
  else:
1104
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1105

    
1106

    
1107
def WatcherOps(opts, args):
1108
  """Watcher operations.
1109

1110
  @param opts: the command line options selected by the user
1111
  @type args: list
1112
  @param args: should contain only one element, the subcommand
1113
  @rtype: int
1114
  @return: the desired exit code
1115

1116
  """
1117
  command = args[0]
1118
  client = GetClient()
1119

    
1120
  if command == "continue":
1121
    client.SetWatcherPause(None)
1122
    ToStdout("The watcher is no longer paused.")
1123

    
1124
  elif command == "pause":
1125
    if len(args) < 2:
1126
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1127

    
1128
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1129
    _ShowWatcherPause(result)
1130

    
1131
  elif command == "info":
1132
    result = client.QueryConfigValues(["watcher_pause"])
1133
    _ShowWatcherPause(result[0])
1134

    
1135
  else:
1136
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1137
                               errors.ECODE_INVAL)
1138

    
1139
  return 0
1140

    
1141

    
1142
def _OobPower(opts, node_list, power):
1143
  """Puts the node in the list to desired power state.
1144

1145
  @param opts: The command line options selected by the user
1146
  @param node_list: The list of nodes to operate on
1147
  @param power: True if they should be powered on, False otherwise
1148
  @return: The success of the operation (none failed)
1149

1150
  """
1151
  if power:
1152
    command = constants.OOB_POWER_ON
1153
  else:
1154
    command = constants.OOB_POWER_OFF
1155

    
1156
  op = opcodes.OpOobCommand(node_names=node_list,
1157
                            command=command,
1158
                            ignore_status=True,
1159
                            timeout=opts.oob_timeout,
1160
                            power_delay=opts.power_delay)
1161
  result = SubmitOpCode(op, opts=opts)
1162
  errs = 0
1163
  for node_result in result:
1164
    (node_tuple, data_tuple) = node_result
1165
    (_, node_name) = node_tuple
1166
    (data_status, _) = data_tuple
1167
    if data_status != constants.RS_NORMAL:
1168
      assert data_status != constants.RS_UNAVAIL
1169
      errs += 1
1170
      ToStderr("There was a problem changing power for %s, please investigate",
1171
               node_name)
1172

    
1173
  if errs > 0:
1174
    return False
1175

    
1176
  return True
1177

    
1178

    
1179
def _InstanceStart(opts, inst_list, start):
1180
  """Puts the instances in the list to desired state.
1181

1182
  @param opts: The command line options selected by the user
1183
  @param inst_list: The list of instances to operate on
1184
  @param start: True if they should be started, False for shutdown
1185
  @return: The success of the operation (none failed)
1186

1187
  """
1188
  if start:
1189
    opcls = opcodes.OpInstanceStartup
1190
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1191
  else:
1192
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1193
                           timeout=opts.shutdown_timeout)
1194
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1195

    
1196
  jex = JobExecutor(opts=opts)
1197

    
1198
  for inst in inst_list:
1199
    ToStdout("Submit %s of instance %s", text_submit, inst)
1200
    op = opcls(instance_name=inst)
1201
    jex.QueueJob(inst, op)
1202

    
1203
  results = jex.GetResults()
1204
  bad_cnt = len([1 for (success, _) in results if not success])
1205

    
1206
  if bad_cnt == 0:
1207
    ToStdout("All instances have been %s successfully", text_success)
1208
  else:
1209
    ToStderr("There were errors while %s instances:\n"
1210
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1211
             len(results))
1212
    return False
1213

    
1214
  return True
1215

    
1216

    
1217
class _RunWhenNodesReachableHelper:
1218
  """Helper class to make shared internal state sharing easier.
1219

1220
  @ivar success: Indicates if all action_cb calls were successful
1221

1222
  """
1223
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1224
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1225
    """Init the object.
1226

1227
    @param node_list: The list of nodes to be reachable
1228
    @param action_cb: Callback called when a new host is reachable
1229
    @type node2ip: dict
1230
    @param node2ip: Node to ip mapping
1231
    @param port: The port to use for the TCP ping
1232
    @param feedback_fn: The function used for feedback
1233
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1234
    @param _sleep_fn: Function to sleep (for unittest use only)
1235

1236
    """
1237
    self.down = set(node_list)
1238
    self.up = set()
1239
    self.node2ip = node2ip
1240
    self.success = True
1241
    self.action_cb = action_cb
1242
    self.port = port
1243
    self.feedback_fn = feedback_fn
1244
    self._ping_fn = _ping_fn
1245
    self._sleep_fn = _sleep_fn
1246

    
1247
  def __call__(self):
1248
    """When called we run action_cb.
1249

1250
    @raises utils.RetryAgain: When there are still down nodes
1251

1252
    """
1253
    if not self.action_cb(self.up):
1254
      self.success = False
1255

    
1256
    if self.down:
1257
      raise utils.RetryAgain()
1258
    else:
1259
      return self.success
1260

    
1261
  def Wait(self, secs):
1262
    """Checks if a host is up or waits remaining seconds.
1263

1264
    @param secs: The secs remaining
1265

1266
    """
1267
    start = time.time()
1268
    for node in self.down:
1269
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1270
                       live_port_needed=True):
1271
        self.feedback_fn("Node %s became available" % node)
1272
        self.up.add(node)
1273
        self.down -= self.up
1274
        # If we have a node available there is the possibility to run the
1275
        # action callback successfully, therefore we don't wait and return
1276
        return
1277

    
1278
    self._sleep_fn(max(0.0, start + secs - time.time()))
1279

    
1280

    
1281
def _RunWhenNodesReachable(node_list, action_cb, interval):
1282
  """Run action_cb when nodes become reachable.
1283

1284
  @param node_list: The list of nodes to be reachable
1285
  @param action_cb: Callback called when a new host is reachable
1286
  @param interval: The earliest time to retry
1287

1288
  """
1289
  client = GetClient()
1290
  cluster_info = client.QueryClusterInfo()
1291
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1292
    family = netutils.IPAddress.family
1293
  else:
1294
    family = netutils.IP6Address.family
1295

    
1296
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1297
                 for node in node_list)
1298

    
1299
  port = netutils.GetDaemonPort(constants.NODED)
1300
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1301
                                        ToStdout)
1302

    
1303
  try:
1304
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1305
                       wait_fn=helper.Wait)
1306
  except utils.RetryTimeout:
1307
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1308
             " again:\n  - %s", "  - ".join(helper.down))
1309
    return False
1310

    
1311

    
1312
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1313
                          _instance_start_fn=_InstanceStart):
1314
  """Start the instances conditional based on node_states.
1315

1316
  @param opts: The command line options selected by the user
1317
  @param inst_map: A dict of inst -> nodes mapping
1318
  @param nodes_online: A list of nodes online
1319
  @param _instance_start_fn: Callback to start instances (unittest use only)
1320
  @return: Success of the operation on all instances
1321

1322
  """
1323
  start_inst_list = []
1324
  for (inst, nodes) in inst_map.items():
1325
    if not (nodes - nodes_online):
1326
      # All nodes the instance lives on are back online
1327
      start_inst_list.append(inst)
1328

    
1329
  for inst in start_inst_list:
1330
    del inst_map[inst]
1331

    
1332
  if start_inst_list:
1333
    return _instance_start_fn(opts, start_inst_list, True)
1334

    
1335
  return True
1336

    
1337

    
1338
def _EpoOn(opts, full_node_list, node_list, inst_map):
1339
  """Does the actual power on.
1340

1341
  @param opts: The command line options selected by the user
1342
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1343
                         OOB)
1344
  @param node_list: The list of nodes to operate on (all need to support OOB)
1345
  @param inst_map: A dict of inst -> nodes mapping
1346
  @return: The desired exit status
1347

1348
  """
1349
  if node_list and not _OobPower(opts, node_list, False):
1350
    ToStderr("Not all nodes seem to get back up, investigate and start"
1351
             " manually if needed")
1352

    
1353
  # Wait for the nodes to be back up
1354
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1355

    
1356
  ToStdout("Waiting until all nodes are available again")
1357
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1358
    ToStderr("Please investigate and start stopped instances manually")
1359
    return constants.EXIT_FAILURE
1360

    
1361
  return constants.EXIT_SUCCESS
1362

    
1363

    
1364
def _EpoOff(opts, node_list, inst_map):
1365
  """Does the actual power off.
1366

1367
  @param opts: The command line options selected by the user
1368
  @param node_list: The list of nodes to operate on (all need to support OOB)
1369
  @param inst_map: A dict of inst -> nodes mapping
1370
  @return: The desired exit status
1371

1372
  """
1373
  if not _InstanceStart(opts, inst_map.keys(), False):
1374
    ToStderr("Please investigate and stop instances manually before continuing")
1375
    return constants.EXIT_FAILURE
1376

    
1377
  if not node_list:
1378
    return constants.EXIT_SUCCESS
1379

    
1380
  if _OobPower(opts, node_list, False):
1381
    return constants.EXIT_SUCCESS
1382
  else:
1383
    return constants.EXIT_FAILURE
1384

    
1385

    
1386
def Epo(opts, args):
1387
  """EPO operations.
1388

1389
  @param opts: the command line options selected by the user
1390
  @type args: list
1391
  @param args: should contain only one element, the subcommand
1392
  @rtype: int
1393
  @return: the desired exit code
1394

1395
  """
1396
  if opts.groups and opts.show_all:
1397
    ToStderr("Only one of --groups or --all are allowed")
1398
    return constants.EXIT_FAILURE
1399
  elif args and opts.show_all:
1400
    ToStderr("Arguments in combination with --all are not allowed")
1401
    return constants.EXIT_FAILURE
1402

    
1403
  client = GetClient()
1404

    
1405
  if opts.groups:
1406
    node_query_list = itertools.chain(*client.QueryGroups(names=args,
1407
                                                          fields=["node_list"],
1408
                                                          use_locking=False))
1409
  else:
1410
    node_query_list = args
1411

    
1412
  result = client.QueryNodes(names=node_query_list,
1413
                             fields=["name", "master", "pinst_list",
1414
                                     "sinst_list", "powered", "offline"],
1415
                             use_locking=False)
1416
  node_list = []
1417
  inst_map = {}
1418
  for (idx, (node, master, pinsts, sinsts, powered,
1419
             offline)) in enumerate(result):
1420
    # Normalize the node_query_list as well
1421
    if not opts.show_all:
1422
      node_query_list[idx] = node
1423
    if not offline:
1424
      for inst in (pinsts + sinsts):
1425
        if inst in inst_map:
1426
          if not master:
1427
            inst_map[inst].add(node)
1428
        elif master:
1429
          inst_map[inst] = set()
1430
        else:
1431
          inst_map[inst] = set([node])
1432

    
1433
    if master and opts.on:
1434
      # We ignore the master for turning on the machines, in fact we are
1435
      # already operating on the master at this point :)
1436
      continue
1437
    elif master and not opts.show_all:
1438
      ToStderr("%s is the master node, please do a master-failover to another"
1439
               " node not affected by the EPO or use --all if you intend to"
1440
               " shutdown the whole cluster", node)
1441
      return constants.EXIT_FAILURE
1442
    elif powered is None:
1443
      ToStdout("Node %s does not support out-of-band handling, it can not be"
1444
               " handled in a fully automated manner", node)
1445
    elif powered == opts.on:
1446
      ToStdout("Node %s is already in desired power state, skipping", node)
1447
    elif not offline or (offline and powered):
1448
      node_list.append(node)
1449

    
1450
  if not opts.force and not ConfirmOperation(node_query_list, "nodes", "epo"):
1451
    return constants.EXIT_FAILURE
1452

    
1453
  if opts.on:
1454
    return _EpoOn(opts, node_query_list, node_list, inst_map)
1455
  else:
1456
    return _EpoOff(opts, node_list, inst_map)
1457

    
1458
commands = {
1459
  "init": (
1460
    InitCluster, [ArgHost(min=1, max=1)],
1461
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1462
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1463
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1464
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1465
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1466
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1467
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1468
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + INSTANCE_POLICY_OPTS,
1469
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1470
  "destroy": (
1471
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1472
    "", "Destroy cluster"),
1473
  "rename": (
1474
    RenameCluster, [ArgHost(min=1, max=1)],
1475
    [FORCE_OPT, DRY_RUN_OPT],
1476
    "<new_name>",
1477
    "Renames the cluster"),
1478
  "redist-conf": (
1479
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1480
    "", "Forces a push of the configuration file and ssconf files"
1481
    " to the nodes in the cluster"),
1482
  "verify": (
1483
    VerifyCluster, ARGS_NONE,
1484
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1485
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1486
    "", "Does a check on the cluster configuration"),
1487
  "verify-disks": (
1488
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1489
    "", "Does a check on the cluster disk status"),
1490
  "repair-disk-sizes": (
1491
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1492
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1493
  "master-failover": (
1494
    MasterFailover, ARGS_NONE, [NOVOTING_OPT],
1495
    "", "Makes the current node the master"),
1496
  "master-ping": (
1497
    MasterPing, ARGS_NONE, [],
1498
    "", "Checks if the master is alive"),
1499
  "version": (
1500
    ShowClusterVersion, ARGS_NONE, [],
1501
    "", "Shows the cluster version"),
1502
  "getmaster": (
1503
    ShowClusterMaster, ARGS_NONE, [],
1504
    "", "Shows the cluster master"),
1505
  "copyfile": (
1506
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1507
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1508
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1509
  "command": (
1510
    RunClusterCommand, [ArgCommand(min=1)],
1511
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT],
1512
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1513
  "info": (
1514
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1515
    "[--roman]", "Show cluster configuration"),
1516
  "list-tags": (
1517
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1518
  "add-tags": (
1519
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT],
1520
    "tag...", "Add tags to the cluster"),
1521
  "remove-tags": (
1522
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT],
1523
    "tag...", "Remove tags from the cluster"),
1524
  "search-tags": (
1525
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1526
    "Searches the tags on all objects on"
1527
    " the cluster for a given pattern (regex)"),
1528
  "queue": (
1529
    QueueOps,
1530
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1531
    [], "drain|undrain|info", "Change queue properties"),
1532
  "watcher": (
1533
    WatcherOps,
1534
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1535
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1536
    [],
1537
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1538
  "modify": (
1539
    SetClusterParams, ARGS_NONE,
1540
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1541
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1542
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1543
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1544
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1545
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1546
     DISK_STATE_OPT] +
1547
    INSTANCE_POLICY_OPTS,
1548
    "[opts...]",
1549
    "Alters the parameters of the cluster"),
1550
  "renew-crypto": (
1551
    RenewCrypto, ARGS_NONE,
1552
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1553
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1554
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1555
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1556
    "[opts...]",
1557
    "Renews cluster certificates, keys and secrets"),
1558
  "epo": (
1559
    Epo, [ArgUnknown()],
1560
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1561
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1562
    "[opts...] [args]",
1563
    "Performs an emergency power-off on given args"),
1564
  "activate-master-ip": (
1565
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1566
  "deactivate-master-ip": (
1567
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1568
    "Deactivates the master IP"),
1569
  }
1570

    
1571

    
1572
#: dictionary with aliases for commands
1573
aliases = {
1574
  "masterfailover": "master-failover",
1575
}
1576

    
1577

    
1578
def Main():
1579
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1580
                     aliases=aliases)