Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 2cc673a3

History | View | Annotate | Download (51.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45

    
46

    
47
ON_OPT = cli_option("--on", default=False,
48
                    action="store_true", dest="on",
49
                    help="Recover from an EPO")
50

    
51
GROUPS_OPT = cli_option("--groups", default=False,
52
                    action="store_true", dest="groups",
53
                    help="Arguments are node groups instead of nodes")
54

    
55
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
56
_EPO_PING_TIMEOUT = 1 # 1 second
57
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
58

    
59

    
60
@UsesRPC
61
def InitCluster(opts, args):
62
  """Initialize the cluster.
63

64
  @param opts: the command line options selected by the user
65
  @type args: list
66
  @param args: should contain only one element, the desired
67
      cluster name
68
  @rtype: int
69
  @return: the desired exit code
70

71
  """
72
  if not opts.lvm_storage and opts.vg_name:
73
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
74
    return 1
75

    
76
  vg_name = opts.vg_name
77
  if opts.lvm_storage and not opts.vg_name:
78
    vg_name = constants.DEFAULT_VG
79

    
80
  if not opts.drbd_storage and opts.drbd_helper:
81
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
82
    return 1
83

    
84
  drbd_helper = opts.drbd_helper
85
  if opts.drbd_storage and not opts.drbd_helper:
86
    drbd_helper = constants.DEFAULT_DRBD_HELPER
87

    
88
  master_netdev = opts.master_netdev
89
  if master_netdev is None:
90
    master_netdev = constants.DEFAULT_BRIDGE
91

    
92
  hvlist = opts.enabled_hypervisors
93
  if hvlist is None:
94
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
95
  hvlist = hvlist.split(",")
96

    
97
  hvparams = dict(opts.hvparams)
98
  beparams = opts.beparams
99
  nicparams = opts.nicparams
100

    
101
  diskparams = dict(opts.diskparams)
102

    
103
  # check the disk template types here, as we cannot rely on the type check done
104
  # by the opcode parameter types
105
  diskparams_keys = set(diskparams.keys())
106
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
107
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
108
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
109
    return 1
110

    
111
  # prepare beparams dict
112
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
113
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
114

    
115
  # prepare nicparams dict
116
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
117
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
118

    
119
  # prepare ndparams dict
120
  if opts.ndparams is None:
121
    ndparams = dict(constants.NDC_DEFAULTS)
122
  else:
123
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
124
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
125

    
126
  # prepare hvparams dict
127
  for hv in constants.HYPER_TYPES:
128
    if hv not in hvparams:
129
      hvparams[hv] = {}
130
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
131
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
132

    
133
  # prepare diskparams dict
134
  for templ in constants.DISK_TEMPLATES:
135
    if templ not in diskparams:
136
      diskparams[templ] = {}
137
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
138
                                         diskparams[templ])
139
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
140

    
141
  # prepare ipolicy dict
142
  ispecs_dts = opts.ispecs_disk_templates # hate long var names
143
  ipolicy_raw = \
144
    objects.CreateIPolicyFromOpts(ispecs_mem_size=opts.ispecs_mem_size,
145
                                  ispecs_cpu_count=opts.ispecs_cpu_count,
146
                                  ispecs_disk_count=opts.ispecs_disk_count,
147
                                  ispecs_disk_size=opts.ispecs_disk_size,
148
                                  ispecs_nic_count=opts.ispecs_nic_count,
149
                                  ispecs_disk_templates=ispecs_dts,
150
                                  fill_all=True)
151
  ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_raw)
152
  for value in ipolicy.values():
153
    utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES)
154

    
155
  if opts.candidate_pool_size is None:
156
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
157

    
158
  if opts.mac_prefix is None:
159
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
160

    
161
  uid_pool = opts.uid_pool
162
  if uid_pool is not None:
163
    uid_pool = uidpool.ParseUidPool(uid_pool)
164

    
165
  if opts.prealloc_wipe_disks is None:
166
    opts.prealloc_wipe_disks = False
167

    
168
  external_ip_setup_script = opts.use_external_mip_script
169
  if external_ip_setup_script is None:
170
    external_ip_setup_script = False
171

    
172
  try:
173
    primary_ip_version = int(opts.primary_ip_version)
174
  except (ValueError, TypeError), err:
175
    ToStderr("Invalid primary ip version value: %s" % str(err))
176
    return 1
177

    
178
  master_netmask = opts.master_netmask
179
  try:
180
    if master_netmask is not None:
181
      master_netmask = int(master_netmask)
182
  except (ValueError, TypeError), err:
183
    ToStderr("Invalid master netmask value: %s" % str(err))
184
    return 1
185

    
186
  if opts.disk_state:
187
    disk_state = utils.FlatToDict(opts.disk_state)
188
  else:
189
    disk_state = {}
190

    
191
  hv_state = dict(opts.hv_state)
192

    
193
  bootstrap.InitCluster(cluster_name=args[0],
194
                        secondary_ip=opts.secondary_ip,
195
                        vg_name=vg_name,
196
                        mac_prefix=opts.mac_prefix,
197
                        master_netmask=master_netmask,
198
                        master_netdev=master_netdev,
199
                        file_storage_dir=opts.file_storage_dir,
200
                        shared_file_storage_dir=opts.shared_file_storage_dir,
201
                        enabled_hypervisors=hvlist,
202
                        hvparams=hvparams,
203
                        beparams=beparams,
204
                        nicparams=nicparams,
205
                        ndparams=ndparams,
206
                        diskparams=diskparams,
207
                        ipolicy=ipolicy,
208
                        candidate_pool_size=opts.candidate_pool_size,
209
                        modify_etc_hosts=opts.modify_etc_hosts,
210
                        modify_ssh_setup=opts.modify_ssh_setup,
211
                        maintain_node_health=opts.maintain_node_health,
212
                        drbd_helper=drbd_helper,
213
                        uid_pool=uid_pool,
214
                        default_iallocator=opts.default_iallocator,
215
                        primary_ip_version=primary_ip_version,
216
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
217
                        use_external_mip_script=external_ip_setup_script,
218
                        hv_state=hv_state,
219
                        disk_state=disk_state,
220
                        )
221
  op = opcodes.OpClusterPostInit()
222
  SubmitOpCode(op, opts=opts)
223
  return 0
224

    
225

    
226
@UsesRPC
227
def DestroyCluster(opts, args):
228
  """Destroy the cluster.
229

230
  @param opts: the command line options selected by the user
231
  @type args: list
232
  @param args: should be an empty list
233
  @rtype: int
234
  @return: the desired exit code
235

236
  """
237
  if not opts.yes_do_it:
238
    ToStderr("Destroying a cluster is irreversible. If you really want"
239
             " destroy this cluster, supply the --yes-do-it option.")
240
    return 1
241

    
242
  op = opcodes.OpClusterDestroy()
243
  master = SubmitOpCode(op, opts=opts)
244
  # if we reached this, the opcode didn't fail; we can proceed to
245
  # shutdown all the daemons
246
  bootstrap.FinalizeClusterDestroy(master)
247
  return 0
248

    
249

    
250
def RenameCluster(opts, args):
251
  """Rename the cluster.
252

253
  @param opts: the command line options selected by the user
254
  @type args: list
255
  @param args: should contain only one element, the new cluster name
256
  @rtype: int
257
  @return: the desired exit code
258

259
  """
260
  cl = GetClient()
261

    
262
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
263

    
264
  new_name = args[0]
265
  if not opts.force:
266
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
267
                " connected over the network to the cluster name, the"
268
                " operation is very dangerous as the IP address will be"
269
                " removed from the node and the change may not go through."
270
                " Continue?") % (cluster_name, new_name)
271
    if not AskUser(usertext):
272
      return 1
273

    
274
  op = opcodes.OpClusterRename(name=new_name)
275
  result = SubmitOpCode(op, opts=opts, cl=cl)
276

    
277
  if result:
278
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
279

    
280
  return 0
281

    
282

    
283
def ActivateMasterIp(opts, args):
284
  """Activates the master IP.
285

286
  """
287
  op = opcodes.OpClusterActivateMasterIp()
288
  SubmitOpCode(op)
289
  return 0
290

    
291

    
292
def DeactivateMasterIp(opts, args):
293
  """Deactivates the master IP.
294

295
  """
296
  if not opts.confirm:
297
    usertext = ("This will disable the master IP. All the open connections to"
298
                " the master IP will be closed. To reach the master you will"
299
                " need to use its node IP."
300
                " Continue?")
301
    if not AskUser(usertext):
302
      return 1
303

    
304
  op = opcodes.OpClusterDeactivateMasterIp()
305
  SubmitOpCode(op)
306
  return 0
307

    
308

    
309
def RedistributeConfig(opts, args):
310
  """Forces push of the cluster configuration.
311

312
  @param opts: the command line options selected by the user
313
  @type args: list
314
  @param args: empty list
315
  @rtype: int
316
  @return: the desired exit code
317

318
  """
319
  op = opcodes.OpClusterRedistConf()
320
  SubmitOrSend(op, opts)
321
  return 0
322

    
323

    
324
def ShowClusterVersion(opts, args):
325
  """Write version of ganeti software to the standard output.
326

327
  @param opts: the command line options selected by the user
328
  @type args: list
329
  @param args: should be an empty list
330
  @rtype: int
331
  @return: the desired exit code
332

333
  """
334
  cl = GetClient()
335
  result = cl.QueryClusterInfo()
336
  ToStdout("Software version: %s", result["software_version"])
337
  ToStdout("Internode protocol: %s", result["protocol_version"])
338
  ToStdout("Configuration format: %s", result["config_version"])
339
  ToStdout("OS api version: %s", result["os_api_version"])
340
  ToStdout("Export interface: %s", result["export_version"])
341
  return 0
342

    
343

    
344
def ShowClusterMaster(opts, args):
345
  """Write name of master node to the standard output.
346

347
  @param opts: the command line options selected by the user
348
  @type args: list
349
  @param args: should be an empty list
350
  @rtype: int
351
  @return: the desired exit code
352

353
  """
354
  master = bootstrap.GetMaster()
355
  ToStdout(master)
356
  return 0
357

    
358

    
359
def _PrintGroupedParams(paramsdict, level=1, roman=False):
360
  """Print Grouped parameters (be, nic, disk) by group.
361

362
  @type paramsdict: dict of dicts
363
  @param paramsdict: {group: {param: value, ...}, ...}
364
  @type level: int
365
  @param level: Level of indention
366

367
  """
368
  indent = "  " * level
369
  for item, val in sorted(paramsdict.items()):
370
    if isinstance(val, dict):
371
      ToStdout("%s- %s:", indent, item)
372
      _PrintGroupedParams(val, level=level + 1, roman=roman)
373
    elif roman and isinstance(val, int):
374
      ToStdout("%s  %s: %s", indent, item, compat.TryToRoman(val))
375
    else:
376
      ToStdout("%s  %s: %s", indent, item, val)
377

    
378

    
379
def ShowClusterConfig(opts, args):
380
  """Shows cluster information.
381

382
  @param opts: the command line options selected by the user
383
  @type args: list
384
  @param args: should be an empty list
385
  @rtype: int
386
  @return: the desired exit code
387

388
  """
389
  cl = GetClient()
390
  result = cl.QueryClusterInfo()
391

    
392
  ToStdout("Cluster name: %s", result["name"])
393
  ToStdout("Cluster UUID: %s", result["uuid"])
394

    
395
  ToStdout("Creation time: %s", utils.FormatTime(result["ctime"]))
396
  ToStdout("Modification time: %s", utils.FormatTime(result["mtime"]))
397

    
398
  ToStdout("Master node: %s", result["master"])
399

    
400
  ToStdout("Architecture (this node): %s (%s)",
401
           result["architecture"][0], result["architecture"][1])
402

    
403
  if result["tags"]:
404
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
405
  else:
406
    tags = "(none)"
407

    
408
  ToStdout("Tags: %s", tags)
409

    
410
  ToStdout("Default hypervisor: %s", result["default_hypervisor"])
411
  ToStdout("Enabled hypervisors: %s",
412
           utils.CommaJoin(result["enabled_hypervisors"]))
413

    
414
  ToStdout("Hypervisor parameters:")
415
  _PrintGroupedParams(result["hvparams"])
416

    
417
  ToStdout("OS-specific hypervisor parameters:")
418
  _PrintGroupedParams(result["os_hvp"])
419

    
420
  ToStdout("OS parameters:")
421
  _PrintGroupedParams(result["osparams"])
422

    
423
  ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"]))
424
  ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"]))
425

    
426
  ToStdout("Cluster parameters:")
427
  ToStdout("  - candidate pool size: %s",
428
            compat.TryToRoman(result["candidate_pool_size"],
429
                              convert=opts.roman_integers))
430
  ToStdout("  - master netdev: %s", result["master_netdev"])
431
  ToStdout("  - master netmask: %s", result["master_netmask"])
432
  ToStdout("  - use external master IP address setup script: %s",
433
           result["use_external_mip_script"])
434
  ToStdout("  - lvm volume group: %s", result["volume_group_name"])
435
  if result["reserved_lvs"]:
436
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
437
  else:
438
    reserved_lvs = "(none)"
439
  ToStdout("  - lvm reserved volumes: %s", reserved_lvs)
440
  ToStdout("  - drbd usermode helper: %s", result["drbd_usermode_helper"])
441
  ToStdout("  - file storage path: %s", result["file_storage_dir"])
442
  ToStdout("  - shared file storage path: %s",
443
           result["shared_file_storage_dir"])
444
  ToStdout("  - maintenance of node health: %s",
445
           result["maintain_node_health"])
446
  ToStdout("  - uid pool: %s",
447
            uidpool.FormatUidPool(result["uid_pool"],
448
                                  roman=opts.roman_integers))
449
  ToStdout("  - default instance allocator: %s", result["default_iallocator"])
450
  ToStdout("  - primary ip version: %d", result["primary_ip_version"])
451
  ToStdout("  - preallocation wipe disks: %s", result["prealloc_wipe_disks"])
452
  ToStdout("  - OS search path: %s", utils.CommaJoin(constants.OS_SEARCH_PATH))
453

    
454
  ToStdout("Default node parameters:")
455
  _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers)
456

    
457
  ToStdout("Default instance parameters:")
458
  _PrintGroupedParams(result["beparams"], roman=opts.roman_integers)
459

    
460
  ToStdout("Default nic parameters:")
461
  _PrintGroupedParams(result["nicparams"], roman=opts.roman_integers)
462

    
463
  ToStdout("Instance policy - limits for instances:")
464
  for key in constants.IPOLICY_PARAMETERS:
465
    ToStdout("  - %s", key)
466
    _PrintGroupedParams(result["ipolicy"][key], roman=opts.roman_integers)
467
  ToStdout("  - enabled disk templates: %s",
468
           utils.CommaJoin(result["ipolicy"][constants.ISPECS_DTS]))
469

    
470
  return 0
471

    
472

    
473
def ClusterCopyFile(opts, args):
474
  """Copy a file from master to some nodes.
475

476
  @param opts: the command line options selected by the user
477
  @type args: list
478
  @param args: should contain only one element, the path of
479
      the file to be copied
480
  @rtype: int
481
  @return: the desired exit code
482

483
  """
484
  filename = args[0]
485
  if not os.path.exists(filename):
486
    raise errors.OpPrereqError("No such filename '%s'" % filename,
487
                               errors.ECODE_INVAL)
488

    
489
  cl = GetClient()
490

    
491
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
492

    
493
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
494
                           secondary_ips=opts.use_replication_network,
495
                           nodegroup=opts.nodegroup)
496

    
497
  srun = ssh.SshRunner(cluster_name=cluster_name)
498
  for node in results:
499
    if not srun.CopyFileToNode(node, filename):
500
      ToStderr("Copy of file %s to node %s failed", filename, node)
501

    
502
  return 0
503

    
504

    
505
def RunClusterCommand(opts, args):
506
  """Run a command on some nodes.
507

508
  @param opts: the command line options selected by the user
509
  @type args: list
510
  @param args: should contain the command to be run and its arguments
511
  @rtype: int
512
  @return: the desired exit code
513

514
  """
515
  cl = GetClient()
516

    
517
  command = " ".join(args)
518

    
519
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
520

    
521
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
522
                                                    "master_node"])
523

    
524
  srun = ssh.SshRunner(cluster_name=cluster_name)
525

    
526
  # Make sure master node is at list end
527
  if master_node in nodes:
528
    nodes.remove(master_node)
529
    nodes.append(master_node)
530

    
531
  for name in nodes:
532
    result = srun.Run(name, "root", command)
533
    ToStdout("------------------------------------------------")
534
    ToStdout("node: %s", name)
535
    ToStdout("%s", result.output)
536
    ToStdout("return code = %s", result.exit_code)
537

    
538
  return 0
539

    
540

    
541
def VerifyCluster(opts, args):
542
  """Verify integrity of cluster, performing various test on nodes.
543

544
  @param opts: the command line options selected by the user
545
  @type args: list
546
  @param args: should be an empty list
547
  @rtype: int
548
  @return: the desired exit code
549

550
  """
551
  skip_checks = []
552

    
553
  if opts.skip_nplusone_mem:
554
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
555

    
556
  cl = GetClient()
557

    
558
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
559
                               error_codes=opts.error_codes,
560
                               debug_simulate_errors=opts.simulate_errors,
561
                               skip_checks=skip_checks,
562
                               ignore_errors=opts.ignore_errors,
563
                               group_name=opts.nodegroup)
564
  result = SubmitOpCode(op, cl=cl, opts=opts)
565

    
566
  # Keep track of submitted jobs
567
  jex = JobExecutor(cl=cl, opts=opts)
568

    
569
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
570
    jex.AddJobId(None, status, job_id)
571

    
572
  results = jex.GetResults()
573

    
574
  (bad_jobs, bad_results) = \
575
    map(len,
576
        # Convert iterators to lists
577
        map(list,
578
            # Count errors
579
            map(compat.partial(itertools.ifilterfalse, bool),
580
                # Convert result to booleans in a tuple
581
                zip(*((job_success, len(op_results) == 1 and op_results[0])
582
                      for (job_success, op_results) in results)))))
583

    
584
  if bad_jobs == 0 and bad_results == 0:
585
    rcode = constants.EXIT_SUCCESS
586
  else:
587
    rcode = constants.EXIT_FAILURE
588
    if bad_jobs > 0:
589
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
590

    
591
  return rcode
592

    
593

    
594
def VerifyDisks(opts, args):
595
  """Verify integrity of cluster disks.
596

597
  @param opts: the command line options selected by the user
598
  @type args: list
599
  @param args: should be an empty list
600
  @rtype: int
601
  @return: the desired exit code
602

603
  """
604
  cl = GetClient()
605

    
606
  op = opcodes.OpClusterVerifyDisks()
607

    
608
  result = SubmitOpCode(op, cl=cl, opts=opts)
609

    
610
  # Keep track of submitted jobs
611
  jex = JobExecutor(cl=cl, opts=opts)
612

    
613
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
614
    jex.AddJobId(None, status, job_id)
615

    
616
  retcode = constants.EXIT_SUCCESS
617

    
618
  for (status, result) in jex.GetResults():
619
    if not status:
620
      ToStdout("Job failed: %s", result)
621
      continue
622

    
623
    ((bad_nodes, instances, missing), ) = result
624

    
625
    for node, text in bad_nodes.items():
626
      ToStdout("Error gathering data on node %s: %s",
627
               node, utils.SafeEncode(text[-400:]))
628
      retcode = constants.EXIT_FAILURE
629
      ToStdout("You need to fix these nodes first before fixing instances")
630

    
631
    for iname in instances:
632
      if iname in missing:
633
        continue
634
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
635
      try:
636
        ToStdout("Activating disks for instance '%s'", iname)
637
        SubmitOpCode(op, opts=opts, cl=cl)
638
      except errors.GenericError, err:
639
        nret, msg = FormatError(err)
640
        retcode |= nret
641
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
642

    
643
    if missing:
644
      for iname, ival in missing.iteritems():
645
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
646
        if all_missing:
647
          ToStdout("Instance %s cannot be verified as it lives on"
648
                   " broken nodes", iname)
649
        else:
650
          ToStdout("Instance %s has missing logical volumes:", iname)
651
          ival.sort()
652
          for node, vol in ival:
653
            if node in bad_nodes:
654
              ToStdout("\tbroken node %s /dev/%s", node, vol)
655
            else:
656
              ToStdout("\t%s /dev/%s", node, vol)
657

    
658
      ToStdout("You need to replace or recreate disks for all the above"
659
               " instances if this message persists after fixing broken nodes.")
660
      retcode = constants.EXIT_FAILURE
661

    
662
  return retcode
663

    
664

    
665
def RepairDiskSizes(opts, args):
666
  """Verify sizes of cluster disks.
667

668
  @param opts: the command line options selected by the user
669
  @type args: list
670
  @param args: optional list of instances to restrict check to
671
  @rtype: int
672
  @return: the desired exit code
673

674
  """
675
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
676
  SubmitOpCode(op, opts=opts)
677

    
678

    
679
@UsesRPC
680
def MasterFailover(opts, args):
681
  """Failover the master node.
682

683
  This command, when run on a non-master node, will cause the current
684
  master to cease being master, and the non-master to become new
685
  master.
686

687
  @param opts: the command line options selected by the user
688
  @type args: list
689
  @param args: should be an empty list
690
  @rtype: int
691
  @return: the desired exit code
692

693
  """
694
  if opts.no_voting:
695
    usertext = ("This will perform the failover even if most other nodes"
696
                " are down, or if this node is outdated. This is dangerous"
697
                " as it can lead to a non-consistent cluster. Check the"
698
                " gnt-cluster(8) man page before proceeding. Continue?")
699
    if not AskUser(usertext):
700
      return 1
701

    
702
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
703

    
704

    
705
def MasterPing(opts, args):
706
  """Checks if the master is alive.
707

708
  @param opts: the command line options selected by the user
709
  @type args: list
710
  @param args: should be an empty list
711
  @rtype: int
712
  @return: the desired exit code
713

714
  """
715
  try:
716
    cl = GetClient()
717
    cl.QueryClusterInfo()
718
    return 0
719
  except Exception: # pylint: disable=W0703
720
    return 1
721

    
722

    
723
def SearchTags(opts, args):
724
  """Searches the tags on all the cluster.
725

726
  @param opts: the command line options selected by the user
727
  @type args: list
728
  @param args: should contain only one element, the tag pattern
729
  @rtype: int
730
  @return: the desired exit code
731

732
  """
733
  op = opcodes.OpTagsSearch(pattern=args[0])
734
  result = SubmitOpCode(op, opts=opts)
735
  if not result:
736
    return 1
737
  result = list(result)
738
  result.sort()
739
  for path, tag in result:
740
    ToStdout("%s %s", path, tag)
741

    
742

    
743
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
744
  """Reads and verifies an X509 certificate.
745

746
  @type cert_filename: string
747
  @param cert_filename: the path of the file containing the certificate to
748
                        verify encoded in PEM format
749
  @type verify_private_key: bool
750
  @param verify_private_key: whether to verify the private key in addition to
751
                             the public certificate
752
  @rtype: string
753
  @return: a string containing the PEM-encoded certificate.
754

755
  """
756
  try:
757
    pem = utils.ReadFile(cert_filename)
758
  except IOError, err:
759
    raise errors.X509CertError(cert_filename,
760
                               "Unable to read certificate: %s" % str(err))
761

    
762
  try:
763
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
764
  except Exception, err:
765
    raise errors.X509CertError(cert_filename,
766
                               "Unable to load certificate: %s" % str(err))
767

    
768
  if verify_private_key:
769
    try:
770
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
771
    except Exception, err:
772
      raise errors.X509CertError(cert_filename,
773
                                 "Unable to load private key: %s" % str(err))
774

    
775
  return pem
776

    
777

    
778
def _RenewCrypto(new_cluster_cert, new_rapi_cert, #pylint: disable=R0911
779
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
780
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
781
                 cds_filename, force):
782
  """Renews cluster certificates, keys and secrets.
783

784
  @type new_cluster_cert: bool
785
  @param new_cluster_cert: Whether to generate a new cluster certificate
786
  @type new_rapi_cert: bool
787
  @param new_rapi_cert: Whether to generate a new RAPI certificate
788
  @type rapi_cert_filename: string
789
  @param rapi_cert_filename: Path to file containing new RAPI certificate
790
  @type new_spice_cert: bool
791
  @param new_spice_cert: Whether to generate a new SPICE certificate
792
  @type spice_cert_filename: string
793
  @param spice_cert_filename: Path to file containing new SPICE certificate
794
  @type spice_cacert_filename: string
795
  @param spice_cacert_filename: Path to file containing the certificate of the
796
                                CA that signed the SPICE certificate
797
  @type new_confd_hmac_key: bool
798
  @param new_confd_hmac_key: Whether to generate a new HMAC key
799
  @type new_cds: bool
800
  @param new_cds: Whether to generate a new cluster domain secret
801
  @type cds_filename: string
802
  @param cds_filename: Path to file containing new cluster domain secret
803
  @type force: bool
804
  @param force: Whether to ask user for confirmation
805

806
  """
807
  if new_rapi_cert and rapi_cert_filename:
808
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
809
             " options can be specified at the same time.")
810
    return 1
811

    
812
  if new_cds and cds_filename:
813
    ToStderr("Only one of the --new-cluster-domain-secret and"
814
             " --cluster-domain-secret options can be specified at"
815
             " the same time.")
816
    return 1
817

    
818
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
819
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
820
             " and --spice-ca-certificate must not be used.")
821
    return 1
822

    
823
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
824
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
825
             " specified.")
826
    return 1
827

    
828
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
829
  try:
830
    if rapi_cert_filename:
831
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
832
    if spice_cert_filename:
833
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
834
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
835
  except errors.X509CertError, err:
836
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
837
    return 1
838

    
839
  if cds_filename:
840
    try:
841
      cds = utils.ReadFile(cds_filename)
842
    except Exception, err: # pylint: disable=W0703
843
      ToStderr("Can't load new cluster domain secret from %s: %s" %
844
               (cds_filename, str(err)))
845
      return 1
846
  else:
847
    cds = None
848

    
849
  if not force:
850
    usertext = ("This requires all daemons on all nodes to be restarted and"
851
                " may take some time. Continue?")
852
    if not AskUser(usertext):
853
      return 1
854

    
855
  def _RenewCryptoInner(ctx):
856
    ctx.feedback_fn("Updating certificates and keys")
857
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
858
                                    new_rapi_cert,
859
                                    new_spice_cert,
860
                                    new_confd_hmac_key,
861
                                    new_cds,
862
                                    rapi_cert_pem=rapi_cert_pem,
863
                                    spice_cert_pem=spice_cert_pem,
864
                                    spice_cacert_pem=spice_cacert_pem,
865
                                    cds=cds)
866

    
867
    files_to_copy = []
868

    
869
    if new_cluster_cert:
870
      files_to_copy.append(constants.NODED_CERT_FILE)
871

    
872
    if new_rapi_cert or rapi_cert_pem:
873
      files_to_copy.append(constants.RAPI_CERT_FILE)
874

    
875
    if new_spice_cert or spice_cert_pem:
876
      files_to_copy.append(constants.SPICE_CERT_FILE)
877
      files_to_copy.append(constants.SPICE_CACERT_FILE)
878

    
879
    if new_confd_hmac_key:
880
      files_to_copy.append(constants.CONFD_HMAC_KEY)
881

    
882
    if new_cds or cds:
883
      files_to_copy.append(constants.CLUSTER_DOMAIN_SECRET_FILE)
884

    
885
    if files_to_copy:
886
      for node_name in ctx.nonmaster_nodes:
887
        ctx.feedback_fn("Copying %s to %s" %
888
                        (", ".join(files_to_copy), node_name))
889
        for file_name in files_to_copy:
890
          ctx.ssh.CopyFileToNode(node_name, file_name)
891

    
892
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
893

    
894
  ToStdout("All requested certificates and keys have been replaced."
895
           " Running \"gnt-cluster verify\" now is recommended.")
896

    
897
  return 0
898

    
899

    
900
def RenewCrypto(opts, args):
901
  """Renews cluster certificates, keys and secrets.
902

903
  """
904
  return _RenewCrypto(opts.new_cluster_cert,
905
                      opts.new_rapi_cert,
906
                      opts.rapi_cert,
907
                      opts.new_spice_cert,
908
                      opts.spice_cert,
909
                      opts.spice_cacert,
910
                      opts.new_confd_hmac_key,
911
                      opts.new_cluster_domain_secret,
912
                      opts.cluster_domain_secret,
913
                      opts.force)
914

    
915

    
916
def SetClusterParams(opts, args):
917
  """Modify the cluster.
918

919
  @param opts: the command line options selected by the user
920
  @type args: list
921
  @param args: should be an empty list
922
  @rtype: int
923
  @return: the desired exit code
924

925
  """
926
  if not (not opts.lvm_storage or opts.vg_name or
927
          not opts.drbd_storage or opts.drbd_helper or
928
          opts.enabled_hypervisors or opts.hvparams or
929
          opts.beparams or opts.nicparams or
930
          opts.ndparams or opts.diskparams or
931
          opts.candidate_pool_size is not None or
932
          opts.uid_pool is not None or
933
          opts.maintain_node_health is not None or
934
          opts.add_uids is not None or
935
          opts.remove_uids is not None or
936
          opts.default_iallocator is not None or
937
          opts.reserved_lvs is not None or
938
          opts.master_netdev is not None or
939
          opts.master_netmask is not None or
940
          opts.use_external_mip_script is not None or
941
          opts.prealloc_wipe_disks is not None or
942
          opts.hv_state or
943
          opts.disk_state or
944
          opts.ispecs_mem_size is not None or
945
          opts.ispecs_cpu_count is not None or
946
          opts.ispecs_disk_count is not None or
947
          opts.ispecs_disk_size is not None or
948
          opts.ispecs_nic_count is not None):
949
    ToStderr("Please give at least one of the parameters.")
950
    return 1
951

    
952
  vg_name = opts.vg_name
953
  if not opts.lvm_storage and opts.vg_name:
954
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
955
    return 1
956

    
957
  if not opts.lvm_storage:
958
    vg_name = ""
959

    
960
  drbd_helper = opts.drbd_helper
961
  if not opts.drbd_storage and opts.drbd_helper:
962
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
963
    return 1
964

    
965
  if not opts.drbd_storage:
966
    drbd_helper = ""
967

    
968
  hvlist = opts.enabled_hypervisors
969
  if hvlist is not None:
970
    hvlist = hvlist.split(",")
971

    
972
  # a list of (name, dict) we can pass directly to dict() (or [])
973
  hvparams = dict(opts.hvparams)
974
  for hv_params in hvparams.values():
975
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
976

    
977
  diskparams = dict(opts.diskparams)
978

    
979
  for dt_params in diskparams.values():
980
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
981

    
982
  beparams = opts.beparams
983
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
984

    
985
  nicparams = opts.nicparams
986
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
987

    
988
  ndparams = opts.ndparams
989
  if ndparams is not None:
990
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
991

    
992
  ispecs_dts = opts.ispecs_disk_templates
993
  ipolicy = \
994
    objects.CreateIPolicyFromOpts(ispecs_mem_size=opts.ispecs_mem_size,
995
                                  ispecs_cpu_count=opts.ispecs_cpu_count,
996
                                  ispecs_disk_count=opts.ispecs_disk_count,
997
                                  ispecs_disk_size=opts.ispecs_disk_size,
998
                                  ispecs_nic_count=opts.ispecs_nic_count,
999
                                  ispecs_disk_templates=ispecs_dts)
1000

    
1001
  mnh = opts.maintain_node_health
1002

    
1003
  uid_pool = opts.uid_pool
1004
  if uid_pool is not None:
1005
    uid_pool = uidpool.ParseUidPool(uid_pool)
1006

    
1007
  add_uids = opts.add_uids
1008
  if add_uids is not None:
1009
    add_uids = uidpool.ParseUidPool(add_uids)
1010

    
1011
  remove_uids = opts.remove_uids
1012
  if remove_uids is not None:
1013
    remove_uids = uidpool.ParseUidPool(remove_uids)
1014

    
1015
  if opts.reserved_lvs is not None:
1016
    if opts.reserved_lvs == "":
1017
      opts.reserved_lvs = []
1018
    else:
1019
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1020

    
1021
  if opts.master_netmask is not None:
1022
    try:
1023
      opts.master_netmask = int(opts.master_netmask)
1024
    except ValueError:
1025
      ToStderr("The --master-netmask option expects an int parameter.")
1026
      return 1
1027

    
1028
  ext_ip_script = opts.use_external_mip_script
1029

    
1030
  if opts.disk_state:
1031
    disk_state = utils.FlatToDict(opts.disk_state)
1032
  else:
1033
    disk_state = {}
1034

    
1035
  hv_state = dict(opts.hv_state)
1036

    
1037
  op = opcodes.OpClusterSetParams(vg_name=vg_name,
1038
                                  drbd_helper=drbd_helper,
1039
                                  enabled_hypervisors=hvlist,
1040
                                  hvparams=hvparams,
1041
                                  os_hvp=None,
1042
                                  beparams=beparams,
1043
                                  nicparams=nicparams,
1044
                                  ndparams=ndparams,
1045
                                  diskparams=diskparams,
1046
                                  ipolicy=ipolicy,
1047
                                  candidate_pool_size=opts.candidate_pool_size,
1048
                                  maintain_node_health=mnh,
1049
                                  uid_pool=uid_pool,
1050
                                  add_uids=add_uids,
1051
                                  remove_uids=remove_uids,
1052
                                  default_iallocator=opts.default_iallocator,
1053
                                  prealloc_wipe_disks=opts.prealloc_wipe_disks,
1054
                                  master_netdev=opts.master_netdev,
1055
                                  master_netmask=opts.master_netmask,
1056
                                  reserved_lvs=opts.reserved_lvs,
1057
                                  use_external_mip_script=ext_ip_script,
1058
                                  hv_state=hv_state,
1059
                                  disk_state=disk_state,
1060
                                  )
1061
  SubmitOpCode(op, opts=opts)
1062
  return 0
1063

    
1064

    
1065
def QueueOps(opts, args):
1066
  """Queue operations.
1067

1068
  @param opts: the command line options selected by the user
1069
  @type args: list
1070
  @param args: should contain only one element, the subcommand
1071
  @rtype: int
1072
  @return: the desired exit code
1073

1074
  """
1075
  command = args[0]
1076
  client = GetClient()
1077
  if command in ("drain", "undrain"):
1078
    drain_flag = command == "drain"
1079
    client.SetQueueDrainFlag(drain_flag)
1080
  elif command == "info":
1081
    result = client.QueryConfigValues(["drain_flag"])
1082
    if result[0]:
1083
      val = "set"
1084
    else:
1085
      val = "unset"
1086
    ToStdout("The drain flag is %s" % val)
1087
  else:
1088
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1089
                               errors.ECODE_INVAL)
1090

    
1091
  return 0
1092

    
1093

    
1094
def _ShowWatcherPause(until):
1095
  if until is None or until < time.time():
1096
    ToStdout("The watcher is not paused.")
1097
  else:
1098
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1099

    
1100

    
1101
def WatcherOps(opts, args):
1102
  """Watcher operations.
1103

1104
  @param opts: the command line options selected by the user
1105
  @type args: list
1106
  @param args: should contain only one element, the subcommand
1107
  @rtype: int
1108
  @return: the desired exit code
1109

1110
  """
1111
  command = args[0]
1112
  client = GetClient()
1113

    
1114
  if command == "continue":
1115
    client.SetWatcherPause(None)
1116
    ToStdout("The watcher is no longer paused.")
1117

    
1118
  elif command == "pause":
1119
    if len(args) < 2:
1120
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1121

    
1122
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1123
    _ShowWatcherPause(result)
1124

    
1125
  elif command == "info":
1126
    result = client.QueryConfigValues(["watcher_pause"])
1127
    _ShowWatcherPause(result[0])
1128

    
1129
  else:
1130
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1131
                               errors.ECODE_INVAL)
1132

    
1133
  return 0
1134

    
1135

    
1136
def _OobPower(opts, node_list, power):
1137
  """Puts the node in the list to desired power state.
1138

1139
  @param opts: The command line options selected by the user
1140
  @param node_list: The list of nodes to operate on
1141
  @param power: True if they should be powered on, False otherwise
1142
  @return: The success of the operation (none failed)
1143

1144
  """
1145
  if power:
1146
    command = constants.OOB_POWER_ON
1147
  else:
1148
    command = constants.OOB_POWER_OFF
1149

    
1150
  op = opcodes.OpOobCommand(node_names=node_list,
1151
                            command=command,
1152
                            ignore_status=True,
1153
                            timeout=opts.oob_timeout,
1154
                            power_delay=opts.power_delay)
1155
  result = SubmitOpCode(op, opts=opts)
1156
  errs = 0
1157
  for node_result in result:
1158
    (node_tuple, data_tuple) = node_result
1159
    (_, node_name) = node_tuple
1160
    (data_status, _) = data_tuple
1161
    if data_status != constants.RS_NORMAL:
1162
      assert data_status != constants.RS_UNAVAIL
1163
      errs += 1
1164
      ToStderr("There was a problem changing power for %s, please investigate",
1165
               node_name)
1166

    
1167
  if errs > 0:
1168
    return False
1169

    
1170
  return True
1171

    
1172

    
1173
def _InstanceStart(opts, inst_list, start):
1174
  """Puts the instances in the list to desired state.
1175

1176
  @param opts: The command line options selected by the user
1177
  @param inst_list: The list of instances to operate on
1178
  @param start: True if they should be started, False for shutdown
1179
  @return: The success of the operation (none failed)
1180

1181
  """
1182
  if start:
1183
    opcls = opcodes.OpInstanceStartup
1184
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1185
  else:
1186
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1187
                           timeout=opts.shutdown_timeout)
1188
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1189

    
1190
  jex = JobExecutor(opts=opts)
1191

    
1192
  for inst in inst_list:
1193
    ToStdout("Submit %s of instance %s", text_submit, inst)
1194
    op = opcls(instance_name=inst)
1195
    jex.QueueJob(inst, op)
1196

    
1197
  results = jex.GetResults()
1198
  bad_cnt = len([1 for (success, _) in results if not success])
1199

    
1200
  if bad_cnt == 0:
1201
    ToStdout("All instances have been %s successfully", text_success)
1202
  else:
1203
    ToStderr("There were errors while %s instances:\n"
1204
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1205
             len(results))
1206
    return False
1207

    
1208
  return True
1209

    
1210

    
1211
class _RunWhenNodesReachableHelper:
1212
  """Helper class to make shared internal state sharing easier.
1213

1214
  @ivar success: Indicates if all action_cb calls were successful
1215

1216
  """
1217
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1218
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1219
    """Init the object.
1220

1221
    @param node_list: The list of nodes to be reachable
1222
    @param action_cb: Callback called when a new host is reachable
1223
    @type node2ip: dict
1224
    @param node2ip: Node to ip mapping
1225
    @param port: The port to use for the TCP ping
1226
    @param feedback_fn: The function used for feedback
1227
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1228
    @param _sleep_fn: Function to sleep (for unittest use only)
1229

1230
    """
1231
    self.down = set(node_list)
1232
    self.up = set()
1233
    self.node2ip = node2ip
1234
    self.success = True
1235
    self.action_cb = action_cb
1236
    self.port = port
1237
    self.feedback_fn = feedback_fn
1238
    self._ping_fn = _ping_fn
1239
    self._sleep_fn = _sleep_fn
1240

    
1241
  def __call__(self):
1242
    """When called we run action_cb.
1243

1244
    @raises utils.RetryAgain: When there are still down nodes
1245

1246
    """
1247
    if not self.action_cb(self.up):
1248
      self.success = False
1249

    
1250
    if self.down:
1251
      raise utils.RetryAgain()
1252
    else:
1253
      return self.success
1254

    
1255
  def Wait(self, secs):
1256
    """Checks if a host is up or waits remaining seconds.
1257

1258
    @param secs: The secs remaining
1259

1260
    """
1261
    start = time.time()
1262
    for node in self.down:
1263
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1264
                       live_port_needed=True):
1265
        self.feedback_fn("Node %s became available" % node)
1266
        self.up.add(node)
1267
        self.down -= self.up
1268
        # If we have a node available there is the possibility to run the
1269
        # action callback successfully, therefore we don't wait and return
1270
        return
1271

    
1272
    self._sleep_fn(max(0.0, start + secs - time.time()))
1273

    
1274

    
1275
def _RunWhenNodesReachable(node_list, action_cb, interval):
1276
  """Run action_cb when nodes become reachable.
1277

1278
  @param node_list: The list of nodes to be reachable
1279
  @param action_cb: Callback called when a new host is reachable
1280
  @param interval: The earliest time to retry
1281

1282
  """
1283
  client = GetClient()
1284
  cluster_info = client.QueryClusterInfo()
1285
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1286
    family = netutils.IPAddress.family
1287
  else:
1288
    family = netutils.IP6Address.family
1289

    
1290
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1291
                 for node in node_list)
1292

    
1293
  port = netutils.GetDaemonPort(constants.NODED)
1294
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1295
                                        ToStdout)
1296

    
1297
  try:
1298
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1299
                       wait_fn=helper.Wait)
1300
  except utils.RetryTimeout:
1301
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1302
             " again:\n  - %s", "  - ".join(helper.down))
1303
    return False
1304

    
1305

    
1306
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1307
                          _instance_start_fn=_InstanceStart):
1308
  """Start the instances conditional based on node_states.
1309

1310
  @param opts: The command line options selected by the user
1311
  @param inst_map: A dict of inst -> nodes mapping
1312
  @param nodes_online: A list of nodes online
1313
  @param _instance_start_fn: Callback to start instances (unittest use only)
1314
  @return: Success of the operation on all instances
1315

1316
  """
1317
  start_inst_list = []
1318
  for (inst, nodes) in inst_map.items():
1319
    if not (nodes - nodes_online):
1320
      # All nodes the instance lives on are back online
1321
      start_inst_list.append(inst)
1322

    
1323
  for inst in start_inst_list:
1324
    del inst_map[inst]
1325

    
1326
  if start_inst_list:
1327
    return _instance_start_fn(opts, start_inst_list, True)
1328

    
1329
  return True
1330

    
1331

    
1332
def _EpoOn(opts, full_node_list, node_list, inst_map):
1333
  """Does the actual power on.
1334

1335
  @param opts: The command line options selected by the user
1336
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1337
                         OOB)
1338
  @param node_list: The list of nodes to operate on (all need to support OOB)
1339
  @param inst_map: A dict of inst -> nodes mapping
1340
  @return: The desired exit status
1341

1342
  """
1343
  if node_list and not _OobPower(opts, node_list, False):
1344
    ToStderr("Not all nodes seem to get back up, investigate and start"
1345
             " manually if needed")
1346

    
1347
  # Wait for the nodes to be back up
1348
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1349

    
1350
  ToStdout("Waiting until all nodes are available again")
1351
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1352
    ToStderr("Please investigate and start stopped instances manually")
1353
    return constants.EXIT_FAILURE
1354

    
1355
  return constants.EXIT_SUCCESS
1356

    
1357

    
1358
def _EpoOff(opts, node_list, inst_map):
1359
  """Does the actual power off.
1360

1361
  @param opts: The command line options selected by the user
1362
  @param node_list: The list of nodes to operate on (all need to support OOB)
1363
  @param inst_map: A dict of inst -> nodes mapping
1364
  @return: The desired exit status
1365

1366
  """
1367
  if not _InstanceStart(opts, inst_map.keys(), False):
1368
    ToStderr("Please investigate and stop instances manually before continuing")
1369
    return constants.EXIT_FAILURE
1370

    
1371
  if not node_list:
1372
    return constants.EXIT_SUCCESS
1373

    
1374
  if _OobPower(opts, node_list, False):
1375
    return constants.EXIT_SUCCESS
1376
  else:
1377
    return constants.EXIT_FAILURE
1378

    
1379

    
1380
def Epo(opts, args):
1381
  """EPO operations.
1382

1383
  @param opts: the command line options selected by the user
1384
  @type args: list
1385
  @param args: should contain only one element, the subcommand
1386
  @rtype: int
1387
  @return: the desired exit code
1388

1389
  """
1390
  if opts.groups and opts.show_all:
1391
    ToStderr("Only one of --groups or --all are allowed")
1392
    return constants.EXIT_FAILURE
1393
  elif args and opts.show_all:
1394
    ToStderr("Arguments in combination with --all are not allowed")
1395
    return constants.EXIT_FAILURE
1396

    
1397
  client = GetClient()
1398

    
1399
  if opts.groups:
1400
    node_query_list = itertools.chain(*client.QueryGroups(names=args,
1401
                                                          fields=["node_list"],
1402
                                                          use_locking=False))
1403
  else:
1404
    node_query_list = args
1405

    
1406
  result = client.QueryNodes(names=node_query_list,
1407
                             fields=["name", "master", "pinst_list",
1408
                                     "sinst_list", "powered", "offline"],
1409
                             use_locking=False)
1410
  node_list = []
1411
  inst_map = {}
1412
  for (idx, (node, master, pinsts, sinsts, powered,
1413
             offline)) in enumerate(result):
1414
    # Normalize the node_query_list as well
1415
    if not opts.show_all:
1416
      node_query_list[idx] = node
1417
    if not offline:
1418
      for inst in (pinsts + sinsts):
1419
        if inst in inst_map:
1420
          if not master:
1421
            inst_map[inst].add(node)
1422
        elif master:
1423
          inst_map[inst] = set()
1424
        else:
1425
          inst_map[inst] = set([node])
1426

    
1427
    if master and opts.on:
1428
      # We ignore the master for turning on the machines, in fact we are
1429
      # already operating on the master at this point :)
1430
      continue
1431
    elif master and not opts.show_all:
1432
      ToStderr("%s is the master node, please do a master-failover to another"
1433
               " node not affected by the EPO or use --all if you intend to"
1434
               " shutdown the whole cluster", node)
1435
      return constants.EXIT_FAILURE
1436
    elif powered is None:
1437
      ToStdout("Node %s does not support out-of-band handling, it can not be"
1438
               " handled in a fully automated manner", node)
1439
    elif powered == opts.on:
1440
      ToStdout("Node %s is already in desired power state, skipping", node)
1441
    elif not offline or (offline and powered):
1442
      node_list.append(node)
1443

    
1444
  if not opts.force and not ConfirmOperation(node_query_list, "nodes", "epo"):
1445
    return constants.EXIT_FAILURE
1446

    
1447
  if opts.on:
1448
    return _EpoOn(opts, node_query_list, node_list, inst_map)
1449
  else:
1450
    return _EpoOff(opts, node_list, inst_map)
1451

    
1452
commands = {
1453
  "init": (
1454
    InitCluster, [ArgHost(min=1, max=1)],
1455
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1456
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1457
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1458
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1459
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1460
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1461
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1462
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + INSTANCE_POLICY_OPTS,
1463
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1464
  "destroy": (
1465
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1466
    "", "Destroy cluster"),
1467
  "rename": (
1468
    RenameCluster, [ArgHost(min=1, max=1)],
1469
    [FORCE_OPT, DRY_RUN_OPT],
1470
    "<new_name>",
1471
    "Renames the cluster"),
1472
  "redist-conf": (
1473
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1474
    "", "Forces a push of the configuration file and ssconf files"
1475
    " to the nodes in the cluster"),
1476
  "verify": (
1477
    VerifyCluster, ARGS_NONE,
1478
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1479
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1480
    "", "Does a check on the cluster configuration"),
1481
  "verify-disks": (
1482
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1483
    "", "Does a check on the cluster disk status"),
1484
  "repair-disk-sizes": (
1485
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1486
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1487
  "master-failover": (
1488
    MasterFailover, ARGS_NONE, [NOVOTING_OPT],
1489
    "", "Makes the current node the master"),
1490
  "master-ping": (
1491
    MasterPing, ARGS_NONE, [],
1492
    "", "Checks if the master is alive"),
1493
  "version": (
1494
    ShowClusterVersion, ARGS_NONE, [],
1495
    "", "Shows the cluster version"),
1496
  "getmaster": (
1497
    ShowClusterMaster, ARGS_NONE, [],
1498
    "", "Shows the cluster master"),
1499
  "copyfile": (
1500
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1501
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1502
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1503
  "command": (
1504
    RunClusterCommand, [ArgCommand(min=1)],
1505
    [NODE_LIST_OPT, NODEGROUP_OPT],
1506
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1507
  "info": (
1508
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1509
    "[--roman]", "Show cluster configuration"),
1510
  "list-tags": (
1511
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1512
  "add-tags": (
1513
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT],
1514
    "tag...", "Add tags to the cluster"),
1515
  "remove-tags": (
1516
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT],
1517
    "tag...", "Remove tags from the cluster"),
1518
  "search-tags": (
1519
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1520
    "Searches the tags on all objects on"
1521
    " the cluster for a given pattern (regex)"),
1522
  "queue": (
1523
    QueueOps,
1524
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1525
    [], "drain|undrain|info", "Change queue properties"),
1526
  "watcher": (
1527
    WatcherOps,
1528
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1529
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1530
    [],
1531
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1532
  "modify": (
1533
    SetClusterParams, ARGS_NONE,
1534
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1535
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1536
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1537
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1538
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1539
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1540
     DISK_STATE_OPT] +
1541
    INSTANCE_POLICY_OPTS,
1542
    "[opts...]",
1543
    "Alters the parameters of the cluster"),
1544
  "renew-crypto": (
1545
    RenewCrypto, ARGS_NONE,
1546
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1547
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1548
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1549
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1550
    "[opts...]",
1551
    "Renews cluster certificates, keys and secrets"),
1552
  "epo": (
1553
    Epo, [ArgUnknown()],
1554
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1555
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1556
    "[opts...] [args]",
1557
    "Performs an emergency power-off on given args"),
1558
  "activate-master-ip": (
1559
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1560
  "deactivate-master-ip": (
1561
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1562
    "Deactivates the master IP"),
1563
  }
1564

    
1565

    
1566
#: dictionary with aliases for commands
1567
aliases = {
1568
  "masterfailover": "master-failover",
1569
}
1570

    
1571

    
1572
def Main():
1573
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1574
                     aliases=aliases)