Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 0ce212e5

History | View | Annotate | Download (51 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45

    
46

    
47
ON_OPT = cli_option("--on", default=False,
48
                    action="store_true", dest="on",
49
                    help="Recover from an EPO")
50

    
51
GROUPS_OPT = cli_option("--groups", default=False,
52
                    action="store_true", dest="groups",
53
                    help="Arguments are node groups instead of nodes")
54

    
55
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
56
_EPO_PING_TIMEOUT = 1 # 1 second
57
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
58

    
59

    
60
@UsesRPC
61
def InitCluster(opts, args):
62
  """Initialize the cluster.
63

64
  @param opts: the command line options selected by the user
65
  @type args: list
66
  @param args: should contain only one element, the desired
67
      cluster name
68
  @rtype: int
69
  @return: the desired exit code
70

71
  """
72
  if not opts.lvm_storage and opts.vg_name:
73
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
74
    return 1
75

    
76
  vg_name = opts.vg_name
77
  if opts.lvm_storage and not opts.vg_name:
78
    vg_name = constants.DEFAULT_VG
79

    
80
  if not opts.drbd_storage and opts.drbd_helper:
81
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
82
    return 1
83

    
84
  drbd_helper = opts.drbd_helper
85
  if opts.drbd_storage and not opts.drbd_helper:
86
    drbd_helper = constants.DEFAULT_DRBD_HELPER
87

    
88
  master_netdev = opts.master_netdev
89
  if master_netdev is None:
90
    master_netdev = constants.DEFAULT_BRIDGE
91

    
92
  hvlist = opts.enabled_hypervisors
93
  if hvlist is None:
94
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
95
  hvlist = hvlist.split(",")
96

    
97
  hvparams = dict(opts.hvparams)
98
  beparams = opts.beparams
99
  nicparams = opts.nicparams
100

    
101
  diskparams = dict(opts.diskparams)
102

    
103
  # check the disk template types here, as we cannot rely on the type check done
104
  # by the opcode parameter types
105
  diskparams_keys = set(diskparams.keys())
106
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
107
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
108
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
109
    return 1
110

    
111
  # prepare beparams dict
112
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
113
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
114

    
115
  # prepare nicparams dict
116
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
117
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
118

    
119
  # prepare ndparams dict
120
  if opts.ndparams is None:
121
    ndparams = dict(constants.NDC_DEFAULTS)
122
  else:
123
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
124
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
125

    
126
  # prepare hvparams dict
127
  for hv in constants.HYPER_TYPES:
128
    if hv not in hvparams:
129
      hvparams[hv] = {}
130
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
131
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
132

    
133
  # prepare diskparams dict
134
  for templ in constants.DISK_TEMPLATES:
135
    if templ not in diskparams:
136
      diskparams[templ] = {}
137
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
138
                                         diskparams[templ])
139
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
140

    
141
  # prepare ipolicy dict
142
  ipolicy_raw = \
143
    objects.CreateIPolicyFromOpts(ispecs_mem_size=opts.ispecs_mem_size,
144
                                  ispecs_cpu_count=opts.ispecs_cpu_count,
145
                                  ispecs_disk_count=opts.ispecs_disk_count,
146
                                  ispecs_disk_size=opts.ispecs_disk_size,
147
                                  ispecs_nic_count=opts.ispecs_nic_count)
148
  ipolicy = objects.FillDictOfDicts(constants.IPOLICY_DEFAULTS, ipolicy_raw)
149
  for value in ipolicy.values():
150
    utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES)
151

    
152
  if opts.candidate_pool_size is None:
153
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
154

    
155
  if opts.mac_prefix is None:
156
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
157

    
158
  uid_pool = opts.uid_pool
159
  if uid_pool is not None:
160
    uid_pool = uidpool.ParseUidPool(uid_pool)
161

    
162
  if opts.prealloc_wipe_disks is None:
163
    opts.prealloc_wipe_disks = False
164

    
165
  external_ip_setup_script = opts.use_external_mip_script
166
  if external_ip_setup_script is None:
167
    external_ip_setup_script = False
168

    
169
  try:
170
    primary_ip_version = int(opts.primary_ip_version)
171
  except (ValueError, TypeError), err:
172
    ToStderr("Invalid primary ip version value: %s" % str(err))
173
    return 1
174

    
175
  master_netmask = opts.master_netmask
176
  try:
177
    if master_netmask is not None:
178
      master_netmask = int(master_netmask)
179
  except (ValueError, TypeError), err:
180
    ToStderr("Invalid master netmask value: %s" % str(err))
181
    return 1
182

    
183
  if opts.disk_state:
184
    disk_state = utils.FlatToDict(opts.disk_state)
185
  else:
186
    disk_state = {}
187

    
188
  hv_state = dict(opts.hv_state)
189

    
190
  bootstrap.InitCluster(cluster_name=args[0],
191
                        secondary_ip=opts.secondary_ip,
192
                        vg_name=vg_name,
193
                        mac_prefix=opts.mac_prefix,
194
                        master_netmask=master_netmask,
195
                        master_netdev=master_netdev,
196
                        file_storage_dir=opts.file_storage_dir,
197
                        shared_file_storage_dir=opts.shared_file_storage_dir,
198
                        enabled_hypervisors=hvlist,
199
                        hvparams=hvparams,
200
                        beparams=beparams,
201
                        nicparams=nicparams,
202
                        ndparams=ndparams,
203
                        diskparams=diskparams,
204
                        ipolicy=ipolicy,
205
                        candidate_pool_size=opts.candidate_pool_size,
206
                        modify_etc_hosts=opts.modify_etc_hosts,
207
                        modify_ssh_setup=opts.modify_ssh_setup,
208
                        maintain_node_health=opts.maintain_node_health,
209
                        drbd_helper=drbd_helper,
210
                        uid_pool=uid_pool,
211
                        default_iallocator=opts.default_iallocator,
212
                        primary_ip_version=primary_ip_version,
213
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
214
                        use_external_mip_script=external_ip_setup_script,
215
                        hv_state=hv_state,
216
                        disk_state=disk_state,
217
                        )
218
  op = opcodes.OpClusterPostInit()
219
  SubmitOpCode(op, opts=opts)
220
  return 0
221

    
222

    
223
@UsesRPC
224
def DestroyCluster(opts, args):
225
  """Destroy the cluster.
226

227
  @param opts: the command line options selected by the user
228
  @type args: list
229
  @param args: should be an empty list
230
  @rtype: int
231
  @return: the desired exit code
232

233
  """
234
  if not opts.yes_do_it:
235
    ToStderr("Destroying a cluster is irreversible. If you really want"
236
             " destroy this cluster, supply the --yes-do-it option.")
237
    return 1
238

    
239
  op = opcodes.OpClusterDestroy()
240
  master = SubmitOpCode(op, opts=opts)
241
  # if we reached this, the opcode didn't fail; we can proceed to
242
  # shutdown all the daemons
243
  bootstrap.FinalizeClusterDestroy(master)
244
  return 0
245

    
246

    
247
def RenameCluster(opts, args):
248
  """Rename the cluster.
249

250
  @param opts: the command line options selected by the user
251
  @type args: list
252
  @param args: should contain only one element, the new cluster name
253
  @rtype: int
254
  @return: the desired exit code
255

256
  """
257
  cl = GetClient()
258

    
259
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
260

    
261
  new_name = args[0]
262
  if not opts.force:
263
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
264
                " connected over the network to the cluster name, the"
265
                " operation is very dangerous as the IP address will be"
266
                " removed from the node and the change may not go through."
267
                " Continue?") % (cluster_name, new_name)
268
    if not AskUser(usertext):
269
      return 1
270

    
271
  op = opcodes.OpClusterRename(name=new_name)
272
  result = SubmitOpCode(op, opts=opts, cl=cl)
273

    
274
  if result:
275
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
276

    
277
  return 0
278

    
279

    
280
def ActivateMasterIp(opts, args):
281
  """Activates the master IP.
282

283
  """
284
  op = opcodes.OpClusterActivateMasterIp()
285
  SubmitOpCode(op)
286
  return 0
287

    
288

    
289
def DeactivateMasterIp(opts, args):
290
  """Deactivates the master IP.
291

292
  """
293
  if not opts.confirm:
294
    usertext = ("This will disable the master IP. All the open connections to"
295
                " the master IP will be closed. To reach the master you will"
296
                " need to use its node IP."
297
                " Continue?")
298
    if not AskUser(usertext):
299
      return 1
300

    
301
  op = opcodes.OpClusterDeactivateMasterIp()
302
  SubmitOpCode(op)
303
  return 0
304

    
305

    
306
def RedistributeConfig(opts, args):
307
  """Forces push of the cluster configuration.
308

309
  @param opts: the command line options selected by the user
310
  @type args: list
311
  @param args: empty list
312
  @rtype: int
313
  @return: the desired exit code
314

315
  """
316
  op = opcodes.OpClusterRedistConf()
317
  SubmitOrSend(op, opts)
318
  return 0
319

    
320

    
321
def ShowClusterVersion(opts, args):
322
  """Write version of ganeti software to the standard output.
323

324
  @param opts: the command line options selected by the user
325
  @type args: list
326
  @param args: should be an empty list
327
  @rtype: int
328
  @return: the desired exit code
329

330
  """
331
  cl = GetClient()
332
  result = cl.QueryClusterInfo()
333
  ToStdout("Software version: %s", result["software_version"])
334
  ToStdout("Internode protocol: %s", result["protocol_version"])
335
  ToStdout("Configuration format: %s", result["config_version"])
336
  ToStdout("OS api version: %s", result["os_api_version"])
337
  ToStdout("Export interface: %s", result["export_version"])
338
  return 0
339

    
340

    
341
def ShowClusterMaster(opts, args):
342
  """Write name of master node to the standard output.
343

344
  @param opts: the command line options selected by the user
345
  @type args: list
346
  @param args: should be an empty list
347
  @rtype: int
348
  @return: the desired exit code
349

350
  """
351
  master = bootstrap.GetMaster()
352
  ToStdout(master)
353
  return 0
354

    
355

    
356
def _PrintGroupedParams(paramsdict, level=1, roman=False):
357
  """Print Grouped parameters (be, nic, disk) by group.
358

359
  @type paramsdict: dict of dicts
360
  @param paramsdict: {group: {param: value, ...}, ...}
361
  @type level: int
362
  @param level: Level of indention
363

364
  """
365
  indent = "  " * level
366
  for item, val in sorted(paramsdict.items()):
367
    if isinstance(val, dict):
368
      ToStdout("%s- %s:", indent, item)
369
      _PrintGroupedParams(val, level=level + 1, roman=roman)
370
    elif roman and isinstance(val, int):
371
      ToStdout("%s  %s: %s", indent, item, compat.TryToRoman(val))
372
    else:
373
      ToStdout("%s  %s: %s", indent, item, val)
374

    
375

    
376
def ShowClusterConfig(opts, args):
377
  """Shows cluster information.
378

379
  @param opts: the command line options selected by the user
380
  @type args: list
381
  @param args: should be an empty list
382
  @rtype: int
383
  @return: the desired exit code
384

385
  """
386
  cl = GetClient()
387
  result = cl.QueryClusterInfo()
388

    
389
  ToStdout("Cluster name: %s", result["name"])
390
  ToStdout("Cluster UUID: %s", result["uuid"])
391

    
392
  ToStdout("Creation time: %s", utils.FormatTime(result["ctime"]))
393
  ToStdout("Modification time: %s", utils.FormatTime(result["mtime"]))
394

    
395
  ToStdout("Master node: %s", result["master"])
396

    
397
  ToStdout("Architecture (this node): %s (%s)",
398
           result["architecture"][0], result["architecture"][1])
399

    
400
  if result["tags"]:
401
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
402
  else:
403
    tags = "(none)"
404

    
405
  ToStdout("Tags: %s", tags)
406

    
407
  ToStdout("Default hypervisor: %s", result["default_hypervisor"])
408
  ToStdout("Enabled hypervisors: %s",
409
           utils.CommaJoin(result["enabled_hypervisors"]))
410

    
411
  ToStdout("Hypervisor parameters:")
412
  _PrintGroupedParams(result["hvparams"])
413

    
414
  ToStdout("OS-specific hypervisor parameters:")
415
  _PrintGroupedParams(result["os_hvp"])
416

    
417
  ToStdout("OS parameters:")
418
  _PrintGroupedParams(result["osparams"])
419

    
420
  ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"]))
421
  ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"]))
422

    
423
  ToStdout("Cluster parameters:")
424
  ToStdout("  - candidate pool size: %s",
425
            compat.TryToRoman(result["candidate_pool_size"],
426
                              convert=opts.roman_integers))
427
  ToStdout("  - master netdev: %s", result["master_netdev"])
428
  ToStdout("  - master netmask: %s", result["master_netmask"])
429
  ToStdout("  - use external master IP address setup script: %s",
430
           result["use_external_mip_script"])
431
  ToStdout("  - lvm volume group: %s", result["volume_group_name"])
432
  if result["reserved_lvs"]:
433
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
434
  else:
435
    reserved_lvs = "(none)"
436
  ToStdout("  - lvm reserved volumes: %s", reserved_lvs)
437
  ToStdout("  - drbd usermode helper: %s", result["drbd_usermode_helper"])
438
  ToStdout("  - file storage path: %s", result["file_storage_dir"])
439
  ToStdout("  - shared file storage path: %s",
440
           result["shared_file_storage_dir"])
441
  ToStdout("  - maintenance of node health: %s",
442
           result["maintain_node_health"])
443
  ToStdout("  - uid pool: %s",
444
            uidpool.FormatUidPool(result["uid_pool"],
445
                                  roman=opts.roman_integers))
446
  ToStdout("  - default instance allocator: %s", result["default_iallocator"])
447
  ToStdout("  - primary ip version: %d", result["primary_ip_version"])
448
  ToStdout("  - preallocation wipe disks: %s", result["prealloc_wipe_disks"])
449
  ToStdout("  - OS search path: %s", utils.CommaJoin(constants.OS_SEARCH_PATH))
450

    
451
  ToStdout("Default node parameters:")
452
  _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers)
453

    
454
  ToStdout("Default instance parameters:")
455
  _PrintGroupedParams(result["beparams"], roman=opts.roman_integers)
456

    
457
  ToStdout("Default nic parameters:")
458
  _PrintGroupedParams(result["nicparams"], roman=opts.roman_integers)
459

    
460
  ToStdout("Instance policy - limits for instances:")
461
  for key in constants.IPOLICY_PARAMETERS:
462
    ToStdout("  - %s", key)
463
    _PrintGroupedParams(result["ipolicy"][key], roman=opts.roman_integers)
464

    
465
  return 0
466

    
467

    
468
def ClusterCopyFile(opts, args):
469
  """Copy a file from master to some nodes.
470

471
  @param opts: the command line options selected by the user
472
  @type args: list
473
  @param args: should contain only one element, the path of
474
      the file to be copied
475
  @rtype: int
476
  @return: the desired exit code
477

478
  """
479
  filename = args[0]
480
  if not os.path.exists(filename):
481
    raise errors.OpPrereqError("No such filename '%s'" % filename,
482
                               errors.ECODE_INVAL)
483

    
484
  cl = GetClient()
485

    
486
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
487

    
488
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
489
                           secondary_ips=opts.use_replication_network,
490
                           nodegroup=opts.nodegroup)
491

    
492
  srun = ssh.SshRunner(cluster_name=cluster_name)
493
  for node in results:
494
    if not srun.CopyFileToNode(node, filename):
495
      ToStderr("Copy of file %s to node %s failed", filename, node)
496

    
497
  return 0
498

    
499

    
500
def RunClusterCommand(opts, args):
501
  """Run a command on some nodes.
502

503
  @param opts: the command line options selected by the user
504
  @type args: list
505
  @param args: should contain the command to be run and its arguments
506
  @rtype: int
507
  @return: the desired exit code
508

509
  """
510
  cl = GetClient()
511

    
512
  command = " ".join(args)
513

    
514
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
515

    
516
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
517
                                                    "master_node"])
518

    
519
  srun = ssh.SshRunner(cluster_name=cluster_name)
520

    
521
  # Make sure master node is at list end
522
  if master_node in nodes:
523
    nodes.remove(master_node)
524
    nodes.append(master_node)
525

    
526
  for name in nodes:
527
    result = srun.Run(name, "root", command)
528
    ToStdout("------------------------------------------------")
529
    ToStdout("node: %s", name)
530
    ToStdout("%s", result.output)
531
    ToStdout("return code = %s", result.exit_code)
532

    
533
  return 0
534

    
535

    
536
def VerifyCluster(opts, args):
537
  """Verify integrity of cluster, performing various test on nodes.
538

539
  @param opts: the command line options selected by the user
540
  @type args: list
541
  @param args: should be an empty list
542
  @rtype: int
543
  @return: the desired exit code
544

545
  """
546
  skip_checks = []
547

    
548
  if opts.skip_nplusone_mem:
549
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
550

    
551
  cl = GetClient()
552

    
553
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
554
                               error_codes=opts.error_codes,
555
                               debug_simulate_errors=opts.simulate_errors,
556
                               skip_checks=skip_checks,
557
                               ignore_errors=opts.ignore_errors,
558
                               group_name=opts.nodegroup)
559
  result = SubmitOpCode(op, cl=cl, opts=opts)
560

    
561
  # Keep track of submitted jobs
562
  jex = JobExecutor(cl=cl, opts=opts)
563

    
564
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
565
    jex.AddJobId(None, status, job_id)
566

    
567
  results = jex.GetResults()
568

    
569
  (bad_jobs, bad_results) = \
570
    map(len,
571
        # Convert iterators to lists
572
        map(list,
573
            # Count errors
574
            map(compat.partial(itertools.ifilterfalse, bool),
575
                # Convert result to booleans in a tuple
576
                zip(*((job_success, len(op_results) == 1 and op_results[0])
577
                      for (job_success, op_results) in results)))))
578

    
579
  if bad_jobs == 0 and bad_results == 0:
580
    rcode = constants.EXIT_SUCCESS
581
  else:
582
    rcode = constants.EXIT_FAILURE
583
    if bad_jobs > 0:
584
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
585

    
586
  return rcode
587

    
588

    
589
def VerifyDisks(opts, args):
590
  """Verify integrity of cluster disks.
591

592
  @param opts: the command line options selected by the user
593
  @type args: list
594
  @param args: should be an empty list
595
  @rtype: int
596
  @return: the desired exit code
597

598
  """
599
  cl = GetClient()
600

    
601
  op = opcodes.OpClusterVerifyDisks()
602

    
603
  result = SubmitOpCode(op, cl=cl, opts=opts)
604

    
605
  # Keep track of submitted jobs
606
  jex = JobExecutor(cl=cl, opts=opts)
607

    
608
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
609
    jex.AddJobId(None, status, job_id)
610

    
611
  retcode = constants.EXIT_SUCCESS
612

    
613
  for (status, result) in jex.GetResults():
614
    if not status:
615
      ToStdout("Job failed: %s", result)
616
      continue
617

    
618
    ((bad_nodes, instances, missing), ) = result
619

    
620
    for node, text in bad_nodes.items():
621
      ToStdout("Error gathering data on node %s: %s",
622
               node, utils.SafeEncode(text[-400:]))
623
      retcode = constants.EXIT_FAILURE
624
      ToStdout("You need to fix these nodes first before fixing instances")
625

    
626
    for iname in instances:
627
      if iname in missing:
628
        continue
629
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
630
      try:
631
        ToStdout("Activating disks for instance '%s'", iname)
632
        SubmitOpCode(op, opts=opts, cl=cl)
633
      except errors.GenericError, err:
634
        nret, msg = FormatError(err)
635
        retcode |= nret
636
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
637

    
638
    if missing:
639
      for iname, ival in missing.iteritems():
640
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
641
        if all_missing:
642
          ToStdout("Instance %s cannot be verified as it lives on"
643
                   " broken nodes", iname)
644
        else:
645
          ToStdout("Instance %s has missing logical volumes:", iname)
646
          ival.sort()
647
          for node, vol in ival:
648
            if node in bad_nodes:
649
              ToStdout("\tbroken node %s /dev/%s", node, vol)
650
            else:
651
              ToStdout("\t%s /dev/%s", node, vol)
652

    
653
      ToStdout("You need to replace or recreate disks for all the above"
654
               " instances if this message persists after fixing broken nodes.")
655
      retcode = constants.EXIT_FAILURE
656

    
657
  return retcode
658

    
659

    
660
def RepairDiskSizes(opts, args):
661
  """Verify sizes of cluster disks.
662

663
  @param opts: the command line options selected by the user
664
  @type args: list
665
  @param args: optional list of instances to restrict check to
666
  @rtype: int
667
  @return: the desired exit code
668

669
  """
670
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
671
  SubmitOpCode(op, opts=opts)
672

    
673

    
674
@UsesRPC
675
def MasterFailover(opts, args):
676
  """Failover the master node.
677

678
  This command, when run on a non-master node, will cause the current
679
  master to cease being master, and the non-master to become new
680
  master.
681

682
  @param opts: the command line options selected by the user
683
  @type args: list
684
  @param args: should be an empty list
685
  @rtype: int
686
  @return: the desired exit code
687

688
  """
689
  if opts.no_voting:
690
    usertext = ("This will perform the failover even if most other nodes"
691
                " are down, or if this node is outdated. This is dangerous"
692
                " as it can lead to a non-consistent cluster. Check the"
693
                " gnt-cluster(8) man page before proceeding. Continue?")
694
    if not AskUser(usertext):
695
      return 1
696

    
697
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
698

    
699

    
700
def MasterPing(opts, args):
701
  """Checks if the master is alive.
702

703
  @param opts: the command line options selected by the user
704
  @type args: list
705
  @param args: should be an empty list
706
  @rtype: int
707
  @return: the desired exit code
708

709
  """
710
  try:
711
    cl = GetClient()
712
    cl.QueryClusterInfo()
713
    return 0
714
  except Exception: # pylint: disable=W0703
715
    return 1
716

    
717

    
718
def SearchTags(opts, args):
719
  """Searches the tags on all the cluster.
720

721
  @param opts: the command line options selected by the user
722
  @type args: list
723
  @param args: should contain only one element, the tag pattern
724
  @rtype: int
725
  @return: the desired exit code
726

727
  """
728
  op = opcodes.OpTagsSearch(pattern=args[0])
729
  result = SubmitOpCode(op, opts=opts)
730
  if not result:
731
    return 1
732
  result = list(result)
733
  result.sort()
734
  for path, tag in result:
735
    ToStdout("%s %s", path, tag)
736

    
737

    
738
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
739
  """Reads and verifies an X509 certificate.
740

741
  @type cert_filename: string
742
  @param cert_filename: the path of the file containing the certificate to
743
                        verify encoded in PEM format
744
  @type verify_private_key: bool
745
  @param verify_private_key: whether to verify the private key in addition to
746
                             the public certificate
747
  @rtype: string
748
  @return: a string containing the PEM-encoded certificate.
749

750
  """
751
  try:
752
    pem = utils.ReadFile(cert_filename)
753
  except IOError, err:
754
    raise errors.X509CertError(cert_filename,
755
                               "Unable to read certificate: %s" % str(err))
756

    
757
  try:
758
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
759
  except Exception, err:
760
    raise errors.X509CertError(cert_filename,
761
                               "Unable to load certificate: %s" % str(err))
762

    
763
  if verify_private_key:
764
    try:
765
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
766
    except Exception, err:
767
      raise errors.X509CertError(cert_filename,
768
                                 "Unable to load private key: %s" % str(err))
769

    
770
  return pem
771

    
772

    
773
def _RenewCrypto(new_cluster_cert, new_rapi_cert, #pylint: disable=R0911
774
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
775
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
776
                 cds_filename, force):
777
  """Renews cluster certificates, keys and secrets.
778

779
  @type new_cluster_cert: bool
780
  @param new_cluster_cert: Whether to generate a new cluster certificate
781
  @type new_rapi_cert: bool
782
  @param new_rapi_cert: Whether to generate a new RAPI certificate
783
  @type rapi_cert_filename: string
784
  @param rapi_cert_filename: Path to file containing new RAPI certificate
785
  @type new_spice_cert: bool
786
  @param new_spice_cert: Whether to generate a new SPICE certificate
787
  @type spice_cert_filename: string
788
  @param spice_cert_filename: Path to file containing new SPICE certificate
789
  @type spice_cacert_filename: string
790
  @param spice_cacert_filename: Path to file containing the certificate of the
791
                                CA that signed the SPICE certificate
792
  @type new_confd_hmac_key: bool
793
  @param new_confd_hmac_key: Whether to generate a new HMAC key
794
  @type new_cds: bool
795
  @param new_cds: Whether to generate a new cluster domain secret
796
  @type cds_filename: string
797
  @param cds_filename: Path to file containing new cluster domain secret
798
  @type force: bool
799
  @param force: Whether to ask user for confirmation
800

801
  """
802
  if new_rapi_cert and rapi_cert_filename:
803
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
804
             " options can be specified at the same time.")
805
    return 1
806

    
807
  if new_cds and cds_filename:
808
    ToStderr("Only one of the --new-cluster-domain-secret and"
809
             " --cluster-domain-secret options can be specified at"
810
             " the same time.")
811
    return 1
812

    
813
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
814
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
815
             " and --spice-ca-certificate must not be used.")
816
    return 1
817

    
818
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
819
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
820
             " specified.")
821
    return 1
822

    
823
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
824
  try:
825
    if rapi_cert_filename:
826
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
827
    if spice_cert_filename:
828
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
829
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
830
  except errors.X509CertError, err:
831
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
832
    return 1
833

    
834
  if cds_filename:
835
    try:
836
      cds = utils.ReadFile(cds_filename)
837
    except Exception, err: # pylint: disable=W0703
838
      ToStderr("Can't load new cluster domain secret from %s: %s" %
839
               (cds_filename, str(err)))
840
      return 1
841
  else:
842
    cds = None
843

    
844
  if not force:
845
    usertext = ("This requires all daemons on all nodes to be restarted and"
846
                " may take some time. Continue?")
847
    if not AskUser(usertext):
848
      return 1
849

    
850
  def _RenewCryptoInner(ctx):
851
    ctx.feedback_fn("Updating certificates and keys")
852
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
853
                                    new_rapi_cert,
854
                                    new_spice_cert,
855
                                    new_confd_hmac_key,
856
                                    new_cds,
857
                                    rapi_cert_pem=rapi_cert_pem,
858
                                    spice_cert_pem=spice_cert_pem,
859
                                    spice_cacert_pem=spice_cacert_pem,
860
                                    cds=cds)
861

    
862
    files_to_copy = []
863

    
864
    if new_cluster_cert:
865
      files_to_copy.append(constants.NODED_CERT_FILE)
866

    
867
    if new_rapi_cert or rapi_cert_pem:
868
      files_to_copy.append(constants.RAPI_CERT_FILE)
869

    
870
    if new_spice_cert or spice_cert_pem:
871
      files_to_copy.append(constants.SPICE_CERT_FILE)
872
      files_to_copy.append(constants.SPICE_CACERT_FILE)
873

    
874
    if new_confd_hmac_key:
875
      files_to_copy.append(constants.CONFD_HMAC_KEY)
876

    
877
    if new_cds or cds:
878
      files_to_copy.append(constants.CLUSTER_DOMAIN_SECRET_FILE)
879

    
880
    if files_to_copy:
881
      for node_name in ctx.nonmaster_nodes:
882
        ctx.feedback_fn("Copying %s to %s" %
883
                        (", ".join(files_to_copy), node_name))
884
        for file_name in files_to_copy:
885
          ctx.ssh.CopyFileToNode(node_name, file_name)
886

    
887
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
888

    
889
  ToStdout("All requested certificates and keys have been replaced."
890
           " Running \"gnt-cluster verify\" now is recommended.")
891

    
892
  return 0
893

    
894

    
895
def RenewCrypto(opts, args):
896
  """Renews cluster certificates, keys and secrets.
897

898
  """
899
  return _RenewCrypto(opts.new_cluster_cert,
900
                      opts.new_rapi_cert,
901
                      opts.rapi_cert,
902
                      opts.new_spice_cert,
903
                      opts.spice_cert,
904
                      opts.spice_cacert,
905
                      opts.new_confd_hmac_key,
906
                      opts.new_cluster_domain_secret,
907
                      opts.cluster_domain_secret,
908
                      opts.force)
909

    
910

    
911
def SetClusterParams(opts, args):
912
  """Modify the cluster.
913

914
  @param opts: the command line options selected by the user
915
  @type args: list
916
  @param args: should be an empty list
917
  @rtype: int
918
  @return: the desired exit code
919

920
  """
921
  if not (not opts.lvm_storage or opts.vg_name or
922
          not opts.drbd_storage or opts.drbd_helper or
923
          opts.enabled_hypervisors or opts.hvparams or
924
          opts.beparams or opts.nicparams or
925
          opts.ndparams or opts.diskparams or
926
          opts.candidate_pool_size is not None or
927
          opts.uid_pool is not None or
928
          opts.maintain_node_health is not None or
929
          opts.add_uids is not None or
930
          opts.remove_uids is not None or
931
          opts.default_iallocator is not None or
932
          opts.reserved_lvs is not None or
933
          opts.master_netdev is not None or
934
          opts.master_netmask is not None or
935
          opts.use_external_mip_script is not None or
936
          opts.prealloc_wipe_disks is not None or
937
          opts.hv_state or
938
          opts.disk_state or
939
          opts.ispecs_mem_size is not None or
940
          opts.ispecs_cpu_count is not None or
941
          opts.ispecs_disk_count is not None or
942
          opts.ispecs_disk_size is not None or
943
          opts.ispecs_nic_count is not None):
944
    ToStderr("Please give at least one of the parameters.")
945
    return 1
946

    
947
  vg_name = opts.vg_name
948
  if not opts.lvm_storage and opts.vg_name:
949
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
950
    return 1
951

    
952
  if not opts.lvm_storage:
953
    vg_name = ""
954

    
955
  drbd_helper = opts.drbd_helper
956
  if not opts.drbd_storage and opts.drbd_helper:
957
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
958
    return 1
959

    
960
  if not opts.drbd_storage:
961
    drbd_helper = ""
962

    
963
  hvlist = opts.enabled_hypervisors
964
  if hvlist is not None:
965
    hvlist = hvlist.split(",")
966

    
967
  # a list of (name, dict) we can pass directly to dict() (or [])
968
  hvparams = dict(opts.hvparams)
969
  for hv_params in hvparams.values():
970
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
971

    
972
  diskparams = dict(opts.diskparams)
973

    
974
  for dt_params in diskparams.values():
975
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
976

    
977
  beparams = opts.beparams
978
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
979

    
980
  nicparams = opts.nicparams
981
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
982

    
983
  ndparams = opts.ndparams
984
  if ndparams is not None:
985
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
986

    
987
  ipolicy = \
988
    objects.CreateIPolicyFromOpts(ispecs_mem_size=opts.ispecs_mem_size,
989
                                  ispecs_cpu_count=opts.ispecs_cpu_count,
990
                                  ispecs_disk_count=opts.ispecs_disk_count,
991
                                  ispecs_disk_size=opts.ispecs_disk_size,
992
                                  ispecs_nic_count=opts.ispecs_nic_count)
993
  for value in ipolicy.values():
994
    utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES)
995

    
996
  mnh = opts.maintain_node_health
997

    
998
  uid_pool = opts.uid_pool
999
  if uid_pool is not None:
1000
    uid_pool = uidpool.ParseUidPool(uid_pool)
1001

    
1002
  add_uids = opts.add_uids
1003
  if add_uids is not None:
1004
    add_uids = uidpool.ParseUidPool(add_uids)
1005

    
1006
  remove_uids = opts.remove_uids
1007
  if remove_uids is not None:
1008
    remove_uids = uidpool.ParseUidPool(remove_uids)
1009

    
1010
  if opts.reserved_lvs is not None:
1011
    if opts.reserved_lvs == "":
1012
      opts.reserved_lvs = []
1013
    else:
1014
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1015

    
1016
  if opts.master_netmask is not None:
1017
    try:
1018
      opts.master_netmask = int(opts.master_netmask)
1019
    except ValueError:
1020
      ToStderr("The --master-netmask option expects an int parameter.")
1021
      return 1
1022

    
1023
  ext_ip_script = opts.use_external_mip_script
1024

    
1025
  if opts.disk_state:
1026
    disk_state = utils.FlatToDict(opts.disk_state)
1027
  else:
1028
    disk_state = {}
1029

    
1030
  hv_state = dict(opts.hv_state)
1031

    
1032
  op = opcodes.OpClusterSetParams(vg_name=vg_name,
1033
                                  drbd_helper=drbd_helper,
1034
                                  enabled_hypervisors=hvlist,
1035
                                  hvparams=hvparams,
1036
                                  os_hvp=None,
1037
                                  beparams=beparams,
1038
                                  nicparams=nicparams,
1039
                                  ndparams=ndparams,
1040
                                  diskparams=diskparams,
1041
                                  ipolicy=ipolicy,
1042
                                  candidate_pool_size=opts.candidate_pool_size,
1043
                                  maintain_node_health=mnh,
1044
                                  uid_pool=uid_pool,
1045
                                  add_uids=add_uids,
1046
                                  remove_uids=remove_uids,
1047
                                  default_iallocator=opts.default_iallocator,
1048
                                  prealloc_wipe_disks=opts.prealloc_wipe_disks,
1049
                                  master_netdev=opts.master_netdev,
1050
                                  master_netmask=opts.master_netmask,
1051
                                  reserved_lvs=opts.reserved_lvs,
1052
                                  use_external_mip_script=ext_ip_script,
1053
                                  hv_state=hv_state,
1054
                                  disk_state=disk_state,
1055
                                  )
1056
  SubmitOpCode(op, opts=opts)
1057
  return 0
1058

    
1059

    
1060
def QueueOps(opts, args):
1061
  """Queue operations.
1062

1063
  @param opts: the command line options selected by the user
1064
  @type args: list
1065
  @param args: should contain only one element, the subcommand
1066
  @rtype: int
1067
  @return: the desired exit code
1068

1069
  """
1070
  command = args[0]
1071
  client = GetClient()
1072
  if command in ("drain", "undrain"):
1073
    drain_flag = command == "drain"
1074
    client.SetQueueDrainFlag(drain_flag)
1075
  elif command == "info":
1076
    result = client.QueryConfigValues(["drain_flag"])
1077
    if result[0]:
1078
      val = "set"
1079
    else:
1080
      val = "unset"
1081
    ToStdout("The drain flag is %s" % val)
1082
  else:
1083
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1084
                               errors.ECODE_INVAL)
1085

    
1086
  return 0
1087

    
1088

    
1089
def _ShowWatcherPause(until):
1090
  if until is None or until < time.time():
1091
    ToStdout("The watcher is not paused.")
1092
  else:
1093
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1094

    
1095

    
1096
def WatcherOps(opts, args):
1097
  """Watcher operations.
1098

1099
  @param opts: the command line options selected by the user
1100
  @type args: list
1101
  @param args: should contain only one element, the subcommand
1102
  @rtype: int
1103
  @return: the desired exit code
1104

1105
  """
1106
  command = args[0]
1107
  client = GetClient()
1108

    
1109
  if command == "continue":
1110
    client.SetWatcherPause(None)
1111
    ToStdout("The watcher is no longer paused.")
1112

    
1113
  elif command == "pause":
1114
    if len(args) < 2:
1115
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1116

    
1117
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1118
    _ShowWatcherPause(result)
1119

    
1120
  elif command == "info":
1121
    result = client.QueryConfigValues(["watcher_pause"])
1122
    _ShowWatcherPause(result[0])
1123

    
1124
  else:
1125
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1126
                               errors.ECODE_INVAL)
1127

    
1128
  return 0
1129

    
1130

    
1131
def _OobPower(opts, node_list, power):
1132
  """Puts the node in the list to desired power state.
1133

1134
  @param opts: The command line options selected by the user
1135
  @param node_list: The list of nodes to operate on
1136
  @param power: True if they should be powered on, False otherwise
1137
  @return: The success of the operation (none failed)
1138

1139
  """
1140
  if power:
1141
    command = constants.OOB_POWER_ON
1142
  else:
1143
    command = constants.OOB_POWER_OFF
1144

    
1145
  op = opcodes.OpOobCommand(node_names=node_list,
1146
                            command=command,
1147
                            ignore_status=True,
1148
                            timeout=opts.oob_timeout,
1149
                            power_delay=opts.power_delay)
1150
  result = SubmitOpCode(op, opts=opts)
1151
  errs = 0
1152
  for node_result in result:
1153
    (node_tuple, data_tuple) = node_result
1154
    (_, node_name) = node_tuple
1155
    (data_status, _) = data_tuple
1156
    if data_status != constants.RS_NORMAL:
1157
      assert data_status != constants.RS_UNAVAIL
1158
      errs += 1
1159
      ToStderr("There was a problem changing power for %s, please investigate",
1160
               node_name)
1161

    
1162
  if errs > 0:
1163
    return False
1164

    
1165
  return True
1166

    
1167

    
1168
def _InstanceStart(opts, inst_list, start):
1169
  """Puts the instances in the list to desired state.
1170

1171
  @param opts: The command line options selected by the user
1172
  @param inst_list: The list of instances to operate on
1173
  @param start: True if they should be started, False for shutdown
1174
  @return: The success of the operation (none failed)
1175

1176
  """
1177
  if start:
1178
    opcls = opcodes.OpInstanceStartup
1179
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1180
  else:
1181
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1182
                           timeout=opts.shutdown_timeout)
1183
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1184

    
1185
  jex = JobExecutor(opts=opts)
1186

    
1187
  for inst in inst_list:
1188
    ToStdout("Submit %s of instance %s", text_submit, inst)
1189
    op = opcls(instance_name=inst)
1190
    jex.QueueJob(inst, op)
1191

    
1192
  results = jex.GetResults()
1193
  bad_cnt = len([1 for (success, _) in results if not success])
1194

    
1195
  if bad_cnt == 0:
1196
    ToStdout("All instances have been %s successfully", text_success)
1197
  else:
1198
    ToStderr("There were errors while %s instances:\n"
1199
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1200
             len(results))
1201
    return False
1202

    
1203
  return True
1204

    
1205

    
1206
class _RunWhenNodesReachableHelper:
1207
  """Helper class to make shared internal state sharing easier.
1208

1209
  @ivar success: Indicates if all action_cb calls were successful
1210

1211
  """
1212
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1213
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1214
    """Init the object.
1215

1216
    @param node_list: The list of nodes to be reachable
1217
    @param action_cb: Callback called when a new host is reachable
1218
    @type node2ip: dict
1219
    @param node2ip: Node to ip mapping
1220
    @param port: The port to use for the TCP ping
1221
    @param feedback_fn: The function used for feedback
1222
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1223
    @param _sleep_fn: Function to sleep (for unittest use only)
1224

1225
    """
1226
    self.down = set(node_list)
1227
    self.up = set()
1228
    self.node2ip = node2ip
1229
    self.success = True
1230
    self.action_cb = action_cb
1231
    self.port = port
1232
    self.feedback_fn = feedback_fn
1233
    self._ping_fn = _ping_fn
1234
    self._sleep_fn = _sleep_fn
1235

    
1236
  def __call__(self):
1237
    """When called we run action_cb.
1238

1239
    @raises utils.RetryAgain: When there are still down nodes
1240

1241
    """
1242
    if not self.action_cb(self.up):
1243
      self.success = False
1244

    
1245
    if self.down:
1246
      raise utils.RetryAgain()
1247
    else:
1248
      return self.success
1249

    
1250
  def Wait(self, secs):
1251
    """Checks if a host is up or waits remaining seconds.
1252

1253
    @param secs: The secs remaining
1254

1255
    """
1256
    start = time.time()
1257
    for node in self.down:
1258
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1259
                       live_port_needed=True):
1260
        self.feedback_fn("Node %s became available" % node)
1261
        self.up.add(node)
1262
        self.down -= self.up
1263
        # If we have a node available there is the possibility to run the
1264
        # action callback successfully, therefore we don't wait and return
1265
        return
1266

    
1267
    self._sleep_fn(max(0.0, start + secs - time.time()))
1268

    
1269

    
1270
def _RunWhenNodesReachable(node_list, action_cb, interval):
1271
  """Run action_cb when nodes become reachable.
1272

1273
  @param node_list: The list of nodes to be reachable
1274
  @param action_cb: Callback called when a new host is reachable
1275
  @param interval: The earliest time to retry
1276

1277
  """
1278
  client = GetClient()
1279
  cluster_info = client.QueryClusterInfo()
1280
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1281
    family = netutils.IPAddress.family
1282
  else:
1283
    family = netutils.IP6Address.family
1284

    
1285
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1286
                 for node in node_list)
1287

    
1288
  port = netutils.GetDaemonPort(constants.NODED)
1289
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1290
                                        ToStdout)
1291

    
1292
  try:
1293
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1294
                       wait_fn=helper.Wait)
1295
  except utils.RetryTimeout:
1296
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1297
             " again:\n  - %s", "  - ".join(helper.down))
1298
    return False
1299

    
1300

    
1301
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1302
                          _instance_start_fn=_InstanceStart):
1303
  """Start the instances conditional based on node_states.
1304

1305
  @param opts: The command line options selected by the user
1306
  @param inst_map: A dict of inst -> nodes mapping
1307
  @param nodes_online: A list of nodes online
1308
  @param _instance_start_fn: Callback to start instances (unittest use only)
1309
  @return: Success of the operation on all instances
1310

1311
  """
1312
  start_inst_list = []
1313
  for (inst, nodes) in inst_map.items():
1314
    if not (nodes - nodes_online):
1315
      # All nodes the instance lives on are back online
1316
      start_inst_list.append(inst)
1317

    
1318
  for inst in start_inst_list:
1319
    del inst_map[inst]
1320

    
1321
  if start_inst_list:
1322
    return _instance_start_fn(opts, start_inst_list, True)
1323

    
1324
  return True
1325

    
1326

    
1327
def _EpoOn(opts, full_node_list, node_list, inst_map):
1328
  """Does the actual power on.
1329

1330
  @param opts: The command line options selected by the user
1331
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1332
                         OOB)
1333
  @param node_list: The list of nodes to operate on (all need to support OOB)
1334
  @param inst_map: A dict of inst -> nodes mapping
1335
  @return: The desired exit status
1336

1337
  """
1338
  if node_list and not _OobPower(opts, node_list, False):
1339
    ToStderr("Not all nodes seem to get back up, investigate and start"
1340
             " manually if needed")
1341

    
1342
  # Wait for the nodes to be back up
1343
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1344

    
1345
  ToStdout("Waiting until all nodes are available again")
1346
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1347
    ToStderr("Please investigate and start stopped instances manually")
1348
    return constants.EXIT_FAILURE
1349

    
1350
  return constants.EXIT_SUCCESS
1351

    
1352

    
1353
def _EpoOff(opts, node_list, inst_map):
1354
  """Does the actual power off.
1355

1356
  @param opts: The command line options selected by the user
1357
  @param node_list: The list of nodes to operate on (all need to support OOB)
1358
  @param inst_map: A dict of inst -> nodes mapping
1359
  @return: The desired exit status
1360

1361
  """
1362
  if not _InstanceStart(opts, inst_map.keys(), False):
1363
    ToStderr("Please investigate and stop instances manually before continuing")
1364
    return constants.EXIT_FAILURE
1365

    
1366
  if not node_list:
1367
    return constants.EXIT_SUCCESS
1368

    
1369
  if _OobPower(opts, node_list, False):
1370
    return constants.EXIT_SUCCESS
1371
  else:
1372
    return constants.EXIT_FAILURE
1373

    
1374

    
1375
def Epo(opts, args):
1376
  """EPO operations.
1377

1378
  @param opts: the command line options selected by the user
1379
  @type args: list
1380
  @param args: should contain only one element, the subcommand
1381
  @rtype: int
1382
  @return: the desired exit code
1383

1384
  """
1385
  if opts.groups and opts.show_all:
1386
    ToStderr("Only one of --groups or --all are allowed")
1387
    return constants.EXIT_FAILURE
1388
  elif args and opts.show_all:
1389
    ToStderr("Arguments in combination with --all are not allowed")
1390
    return constants.EXIT_FAILURE
1391

    
1392
  client = GetClient()
1393

    
1394
  if opts.groups:
1395
    node_query_list = itertools.chain(*client.QueryGroups(names=args,
1396
                                                          fields=["node_list"],
1397
                                                          use_locking=False))
1398
  else:
1399
    node_query_list = args
1400

    
1401
  result = client.QueryNodes(names=node_query_list,
1402
                             fields=["name", "master", "pinst_list",
1403
                                     "sinst_list", "powered", "offline"],
1404
                             use_locking=False)
1405
  node_list = []
1406
  inst_map = {}
1407
  for (idx, (node, master, pinsts, sinsts, powered,
1408
             offline)) in enumerate(result):
1409
    # Normalize the node_query_list as well
1410
    if not opts.show_all:
1411
      node_query_list[idx] = node
1412
    if not offline:
1413
      for inst in (pinsts + sinsts):
1414
        if inst in inst_map:
1415
          if not master:
1416
            inst_map[inst].add(node)
1417
        elif master:
1418
          inst_map[inst] = set()
1419
        else:
1420
          inst_map[inst] = set([node])
1421

    
1422
    if master and opts.on:
1423
      # We ignore the master for turning on the machines, in fact we are
1424
      # already operating on the master at this point :)
1425
      continue
1426
    elif master and not opts.show_all:
1427
      ToStderr("%s is the master node, please do a master-failover to another"
1428
               " node not affected by the EPO or use --all if you intend to"
1429
               " shutdown the whole cluster", node)
1430
      return constants.EXIT_FAILURE
1431
    elif powered is None:
1432
      ToStdout("Node %s does not support out-of-band handling, it can not be"
1433
               " handled in a fully automated manner", node)
1434
    elif powered == opts.on:
1435
      ToStdout("Node %s is already in desired power state, skipping", node)
1436
    elif not offline or (offline and powered):
1437
      node_list.append(node)
1438

    
1439
  if not opts.force and not ConfirmOperation(node_query_list, "nodes", "epo"):
1440
    return constants.EXIT_FAILURE
1441

    
1442
  if opts.on:
1443
    return _EpoOn(opts, node_query_list, node_list, inst_map)
1444
  else:
1445
    return _EpoOff(opts, node_list, inst_map)
1446

    
1447
commands = {
1448
  "init": (
1449
    InitCluster, [ArgHost(min=1, max=1)],
1450
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1451
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1452
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1453
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1454
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1455
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1456
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1457
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + INSTANCE_POLICY_OPTS,
1458
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1459
  "destroy": (
1460
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1461
    "", "Destroy cluster"),
1462
  "rename": (
1463
    RenameCluster, [ArgHost(min=1, max=1)],
1464
    [FORCE_OPT, DRY_RUN_OPT],
1465
    "<new_name>",
1466
    "Renames the cluster"),
1467
  "redist-conf": (
1468
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1469
    "", "Forces a push of the configuration file and ssconf files"
1470
    " to the nodes in the cluster"),
1471
  "verify": (
1472
    VerifyCluster, ARGS_NONE,
1473
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1474
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1475
    "", "Does a check on the cluster configuration"),
1476
  "verify-disks": (
1477
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1478
    "", "Does a check on the cluster disk status"),
1479
  "repair-disk-sizes": (
1480
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1481
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1482
  "master-failover": (
1483
    MasterFailover, ARGS_NONE, [NOVOTING_OPT],
1484
    "", "Makes the current node the master"),
1485
  "master-ping": (
1486
    MasterPing, ARGS_NONE, [],
1487
    "", "Checks if the master is alive"),
1488
  "version": (
1489
    ShowClusterVersion, ARGS_NONE, [],
1490
    "", "Shows the cluster version"),
1491
  "getmaster": (
1492
    ShowClusterMaster, ARGS_NONE, [],
1493
    "", "Shows the cluster master"),
1494
  "copyfile": (
1495
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1496
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1497
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1498
  "command": (
1499
    RunClusterCommand, [ArgCommand(min=1)],
1500
    [NODE_LIST_OPT, NODEGROUP_OPT],
1501
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1502
  "info": (
1503
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1504
    "[--roman]", "Show cluster configuration"),
1505
  "list-tags": (
1506
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1507
  "add-tags": (
1508
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT],
1509
    "tag...", "Add tags to the cluster"),
1510
  "remove-tags": (
1511
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT],
1512
    "tag...", "Remove tags from the cluster"),
1513
  "search-tags": (
1514
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1515
    "Searches the tags on all objects on"
1516
    " the cluster for a given pattern (regex)"),
1517
  "queue": (
1518
    QueueOps,
1519
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1520
    [], "drain|undrain|info", "Change queue properties"),
1521
  "watcher": (
1522
    WatcherOps,
1523
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1524
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1525
    [],
1526
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1527
  "modify": (
1528
    SetClusterParams, ARGS_NONE,
1529
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1530
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1531
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1532
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1533
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1534
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1535
     DISK_STATE_OPT] +
1536
    INSTANCE_POLICY_OPTS,
1537
    "[opts...]",
1538
    "Alters the parameters of the cluster"),
1539
  "renew-crypto": (
1540
    RenewCrypto, ARGS_NONE,
1541
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1542
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1543
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1544
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1545
    "[opts...]",
1546
    "Renews cluster certificates, keys and secrets"),
1547
  "epo": (
1548
    Epo, [ArgUnknown()],
1549
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1550
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1551
    "[opts...] [args]",
1552
    "Performs an emergency power-off on given args"),
1553
  "activate-master-ip": (
1554
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1555
  "deactivate-master-ip": (
1556
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1557
    "Deactivates the master IP"),
1558
  }
1559

    
1560

    
1561
#: dictionary with aliases for commands
1562
aliases = {
1563
  "masterfailover": "master-failover",
1564
}
1565

    
1566

    
1567
def Main():
1568
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1569
                     aliases=aliases)