Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 38969795

History | View | Annotate | Download (55.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os.path
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import pathutils
47

    
48

    
49
ON_OPT = cli_option("--on", default=False,
50
                    action="store_true", dest="on",
51
                    help="Recover from an EPO")
52

    
53
GROUPS_OPT = cli_option("--groups", default=False,
54
                        action="store_true", dest="groups",
55
                        help="Arguments are node groups instead of nodes")
56

    
57
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
58
                            help="Override interactive check for --no-voting",
59
                            default=False, action="store_true")
60

    
61
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
62
_EPO_PING_TIMEOUT = 1 # 1 second
63
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
64

    
65

    
66
def _CheckNoLvmStorageOptDeprecated(opts):
67
  """Checks if the legacy option '--no-lvm-storage' is used.
68

69
  """
70
  if not opts.lvm_storage:
71
    ToStderr("The option --no-lvm-storage is no longer supported. If you want"
72
             " to disable lvm-based storage cluster-wide, use the option"
73
             " --enabled-disk-templates to disable all of these lvm-base disk "
74
             "  templates: %s" %
75
             utils.CommaJoin(utils.GetLvmDiskTemplates()))
76
    return 1
77

    
78

    
79
def _InitEnabledDiskTemplates(opts):
80
  """Initialize the list of enabled disk templates.
81

82
  """
83
  if opts.enabled_disk_templates:
84
    return opts.enabled_disk_templates.split(",")
85
  else:
86
    return constants.DEFAULT_ENABLED_DISK_TEMPLATES
87

    
88

    
89
def _InitVgName(opts, enabled_disk_templates):
90
  """Initialize the volume group name.
91

92
  @type enabled_disk_templates: list of strings
93
  @param enabled_disk_templates: cluster-wide enabled disk templates
94

95
  """
96
  vg_name = None
97
  if opts.vg_name is not None:
98
    vg_name = opts.vg_name
99
    if vg_name:
100
      if not utils.IsLvmEnabled(enabled_disk_templates):
101
        ToStdout("You specified a volume group with --vg-name, but you did not"
102
                 " enable any disk template that uses lvm.")
103
    elif utils.IsLvmEnabled(enabled_disk_templates):
104
      raise errors.OpPrereqError(
105
          "LVM disk templates are enabled, but vg name not set.")
106
  elif utils.IsLvmEnabled(enabled_disk_templates):
107
    vg_name = constants.DEFAULT_VG
108
  return vg_name
109

    
110

    
111
def _InitDrbdHelper(opts, enabled_disk_templates):
112
  """Initialize the DRBD usermode helper.
113

114
  """
115
  drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
116

    
117
  if not drbd_enabled and opts.drbd_helper is not None:
118
    ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
119
             " is not enabled.")
120

    
121
  if drbd_enabled:
122
    if opts.drbd_helper is None:
123
      return constants.DEFAULT_DRBD_HELPER
124
    if opts.drbd_helper == '':
125
      raise errors.OpPrereqError(
126
          "Unsetting the drbd usermode helper while enabling DRBD is not"
127
          " allowed.")
128

    
129
  return opts.drbd_helper
130

    
131

    
132
@UsesRPC
133
def InitCluster(opts, args):
134
  """Initialize the cluster.
135

136
  @param opts: the command line options selected by the user
137
  @type args: list
138
  @param args: should contain only one element, the desired
139
      cluster name
140
  @rtype: int
141
  @return: the desired exit code
142

143
  """
144
  if _CheckNoLvmStorageOptDeprecated(opts):
145
    return 1
146

    
147
  enabled_disk_templates = _InitEnabledDiskTemplates(opts)
148

    
149
  try:
150
    vg_name = _InitVgName(opts, enabled_disk_templates)
151
    drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
152
  except errors.OpPrereqError, e:
153
    ToStderr(str(e))
154
    return 1
155

    
156
  master_netdev = opts.master_netdev
157
  if master_netdev is None:
158
    nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
159
    if not nic_mode:
160
      # default case, use bridging
161
      master_netdev = constants.DEFAULT_BRIDGE
162
    elif nic_mode == constants.NIC_MODE_OVS:
163
      # default ovs is different from default bridge
164
      master_netdev = constants.DEFAULT_OVS
165
      opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
166

    
167
  hvlist = opts.enabled_hypervisors
168
  if hvlist is None:
169
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
170
  hvlist = hvlist.split(",")
171

    
172
  hvparams = dict(opts.hvparams)
173
  beparams = opts.beparams
174
  nicparams = opts.nicparams
175

    
176
  diskparams = dict(opts.diskparams)
177

    
178
  # check the disk template types here, as we cannot rely on the type check done
179
  # by the opcode parameter types
180
  diskparams_keys = set(diskparams.keys())
181
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
182
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
183
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
184
    return 1
185

    
186
  # prepare beparams dict
187
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
188
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
189

    
190
  # prepare nicparams dict
191
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
192
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
193

    
194
  # prepare ndparams dict
195
  if opts.ndparams is None:
196
    ndparams = dict(constants.NDC_DEFAULTS)
197
  else:
198
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
199
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
200

    
201
  # prepare hvparams dict
202
  for hv in constants.HYPER_TYPES:
203
    if hv not in hvparams:
204
      hvparams[hv] = {}
205
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
206
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
207

    
208
  # prepare diskparams dict
209
  for templ in constants.DISK_TEMPLATES:
210
    if templ not in diskparams:
211
      diskparams[templ] = {}
212
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
213
                                         diskparams[templ])
214
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
215

    
216
  # prepare ipolicy dict
217
  ipolicy = CreateIPolicyFromOpts(
218
    ispecs_mem_size=opts.ispecs_mem_size,
219
    ispecs_cpu_count=opts.ispecs_cpu_count,
220
    ispecs_disk_count=opts.ispecs_disk_count,
221
    ispecs_disk_size=opts.ispecs_disk_size,
222
    ispecs_nic_count=opts.ispecs_nic_count,
223
    minmax_ispecs=opts.ipolicy_bounds_specs,
224
    std_ispecs=opts.ipolicy_std_specs,
225
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
226
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
227
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
228
    fill_all=True)
229

    
230
  if opts.candidate_pool_size is None:
231
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
232

    
233
  if opts.mac_prefix is None:
234
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
235

    
236
  uid_pool = opts.uid_pool
237
  if uid_pool is not None:
238
    uid_pool = uidpool.ParseUidPool(uid_pool)
239

    
240
  if opts.prealloc_wipe_disks is None:
241
    opts.prealloc_wipe_disks = False
242

    
243
  external_ip_setup_script = opts.use_external_mip_script
244
  if external_ip_setup_script is None:
245
    external_ip_setup_script = False
246

    
247
  try:
248
    primary_ip_version = int(opts.primary_ip_version)
249
  except (ValueError, TypeError), err:
250
    ToStderr("Invalid primary ip version value: %s" % str(err))
251
    return 1
252

    
253
  master_netmask = opts.master_netmask
254
  try:
255
    if master_netmask is not None:
256
      master_netmask = int(master_netmask)
257
  except (ValueError, TypeError), err:
258
    ToStderr("Invalid master netmask value: %s" % str(err))
259
    return 1
260

    
261
  if opts.disk_state:
262
    disk_state = utils.FlatToDict(opts.disk_state)
263
  else:
264
    disk_state = {}
265

    
266
  hv_state = dict(opts.hv_state)
267

    
268
  bootstrap.InitCluster(cluster_name=args[0],
269
                        secondary_ip=opts.secondary_ip,
270
                        vg_name=vg_name,
271
                        mac_prefix=opts.mac_prefix,
272
                        master_netmask=master_netmask,
273
                        master_netdev=master_netdev,
274
                        file_storage_dir=opts.file_storage_dir,
275
                        shared_file_storage_dir=opts.shared_file_storage_dir,
276
                        enabled_hypervisors=hvlist,
277
                        hvparams=hvparams,
278
                        beparams=beparams,
279
                        nicparams=nicparams,
280
                        ndparams=ndparams,
281
                        diskparams=diskparams,
282
                        ipolicy=ipolicy,
283
                        candidate_pool_size=opts.candidate_pool_size,
284
                        modify_etc_hosts=opts.modify_etc_hosts,
285
                        modify_ssh_setup=opts.modify_ssh_setup,
286
                        maintain_node_health=opts.maintain_node_health,
287
                        drbd_helper=drbd_helper,
288
                        uid_pool=uid_pool,
289
                        default_iallocator=opts.default_iallocator,
290
                        primary_ip_version=primary_ip_version,
291
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
292
                        use_external_mip_script=external_ip_setup_script,
293
                        hv_state=hv_state,
294
                        disk_state=disk_state,
295
                        enabled_disk_templates=enabled_disk_templates,
296
                        )
297
  op = opcodes.OpClusterPostInit()
298
  SubmitOpCode(op, opts=opts)
299
  return 0
300

    
301

    
302
@UsesRPC
303
def DestroyCluster(opts, args):
304
  """Destroy the cluster.
305

306
  @param opts: the command line options selected by the user
307
  @type args: list
308
  @param args: should be an empty list
309
  @rtype: int
310
  @return: the desired exit code
311

312
  """
313
  if not opts.yes_do_it:
314
    ToStderr("Destroying a cluster is irreversible. If you really want"
315
             " destroy this cluster, supply the --yes-do-it option.")
316
    return 1
317

    
318
  op = opcodes.OpClusterDestroy()
319
  master_uuid = SubmitOpCode(op, opts=opts)
320
  # if we reached this, the opcode didn't fail; we can proceed to
321
  # shutdown all the daemons
322
  bootstrap.FinalizeClusterDestroy(master_uuid)
323
  return 0
324

    
325

    
326
def RenameCluster(opts, args):
327
  """Rename the cluster.
328

329
  @param opts: the command line options selected by the user
330
  @type args: list
331
  @param args: should contain only one element, the new cluster name
332
  @rtype: int
333
  @return: the desired exit code
334

335
  """
336
  cl = GetClient()
337

    
338
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
339

    
340
  new_name = args[0]
341
  if not opts.force:
342
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
343
                " connected over the network to the cluster name, the"
344
                " operation is very dangerous as the IP address will be"
345
                " removed from the node and the change may not go through."
346
                " Continue?") % (cluster_name, new_name)
347
    if not AskUser(usertext):
348
      return 1
349

    
350
  op = opcodes.OpClusterRename(name=new_name)
351
  result = SubmitOpCode(op, opts=opts, cl=cl)
352

    
353
  if result:
354
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
355

    
356
  return 0
357

    
358

    
359
def ActivateMasterIp(opts, args):
360
  """Activates the master IP.
361

362
  """
363
  op = opcodes.OpClusterActivateMasterIp()
364
  SubmitOpCode(op)
365
  return 0
366

    
367

    
368
def DeactivateMasterIp(opts, args):
369
  """Deactivates the master IP.
370

371
  """
372
  if not opts.confirm:
373
    usertext = ("This will disable the master IP. All the open connections to"
374
                " the master IP will be closed. To reach the master you will"
375
                " need to use its node IP."
376
                " Continue?")
377
    if not AskUser(usertext):
378
      return 1
379

    
380
  op = opcodes.OpClusterDeactivateMasterIp()
381
  SubmitOpCode(op)
382
  return 0
383

    
384

    
385
def RedistributeConfig(opts, args):
386
  """Forces push of the cluster configuration.
387

388
  @param opts: the command line options selected by the user
389
  @type args: list
390
  @param args: empty list
391
  @rtype: int
392
  @return: the desired exit code
393

394
  """
395
  op = opcodes.OpClusterRedistConf()
396
  SubmitOrSend(op, opts)
397
  return 0
398

    
399

    
400
def ShowClusterVersion(opts, args):
401
  """Write version of ganeti software to the standard output.
402

403
  @param opts: the command line options selected by the user
404
  @type args: list
405
  @param args: should be an empty list
406
  @rtype: int
407
  @return: the desired exit code
408

409
  """
410
  cl = GetClient(query=True)
411
  result = cl.QueryClusterInfo()
412
  ToStdout("Software version: %s", result["software_version"])
413
  ToStdout("Internode protocol: %s", result["protocol_version"])
414
  ToStdout("Configuration format: %s", result["config_version"])
415
  ToStdout("OS api version: %s", result["os_api_version"])
416
  ToStdout("Export interface: %s", result["export_version"])
417
  ToStdout("VCS version: %s", result["vcs_version"])
418
  return 0
419

    
420

    
421
def ShowClusterMaster(opts, args):
422
  """Write name of master node to the standard output.
423

424
  @param opts: the command line options selected by the user
425
  @type args: list
426
  @param args: should be an empty list
427
  @rtype: int
428
  @return: the desired exit code
429

430
  """
431
  master = bootstrap.GetMaster()
432
  ToStdout(master)
433
  return 0
434

    
435

    
436
def _FormatGroupedParams(paramsdict, roman=False):
437
  """Format Grouped parameters (be, nic, disk) by group.
438

439
  @type paramsdict: dict of dicts
440
  @param paramsdict: {group: {param: value, ...}, ...}
441
  @rtype: dict of dicts
442
  @return: copy of the input dictionaries with strings as values
443

444
  """
445
  ret = {}
446
  for (item, val) in paramsdict.items():
447
    if isinstance(val, dict):
448
      ret[item] = _FormatGroupedParams(val, roman=roman)
449
    elif roman and isinstance(val, int):
450
      ret[item] = compat.TryToRoman(val)
451
    else:
452
      ret[item] = str(val)
453
  return ret
454

    
455

    
456
def ShowClusterConfig(opts, args):
457
  """Shows cluster information.
458

459
  @param opts: the command line options selected by the user
460
  @type args: list
461
  @param args: should be an empty list
462
  @rtype: int
463
  @return: the desired exit code
464

465
  """
466
  cl = GetClient(query=True)
467
  result = cl.QueryClusterInfo()
468

    
469
  if result["tags"]:
470
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
471
  else:
472
    tags = "(none)"
473
  if result["reserved_lvs"]:
474
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
475
  else:
476
    reserved_lvs = "(none)"
477

    
478
  enabled_hv = result["enabled_hypervisors"]
479
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
480
                  if k in enabled_hv)
481

    
482
  info = [
483
    ("Cluster name", result["name"]),
484
    ("Cluster UUID", result["uuid"]),
485

    
486
    ("Creation time", utils.FormatTime(result["ctime"])),
487
    ("Modification time", utils.FormatTime(result["mtime"])),
488

    
489
    ("Master node", result["master"]),
490

    
491
    ("Architecture (this node)",
492
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
493

    
494
    ("Tags", tags),
495

    
496
    ("Default hypervisor", result["default_hypervisor"]),
497
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
498

    
499
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
500

    
501
    ("OS-specific hypervisor parameters",
502
     _FormatGroupedParams(result["os_hvp"])),
503

    
504
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
505

    
506
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
507
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
508

    
509
    ("Cluster parameters", [
510
      ("candidate pool size",
511
       compat.TryToRoman(result["candidate_pool_size"],
512
                         convert=opts.roman_integers)),
513
      ("master netdev", result["master_netdev"]),
514
      ("master netmask", result["master_netmask"]),
515
      ("use external master IP address setup script",
516
       result["use_external_mip_script"]),
517
      ("lvm volume group", result["volume_group_name"]),
518
      ("lvm reserved volumes", reserved_lvs),
519
      ("drbd usermode helper", result["drbd_usermode_helper"]),
520
      ("file storage path", result["file_storage_dir"]),
521
      ("shared file storage path", result["shared_file_storage_dir"]),
522
      ("maintenance of node health", result["maintain_node_health"]),
523
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
524
      ("default instance allocator", result["default_iallocator"]),
525
      ("primary ip version", result["primary_ip_version"]),
526
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
527
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
528
      ("ExtStorage Providers search path",
529
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
530
      ("enabled disk templates",
531
       utils.CommaJoin(result["enabled_disk_templates"])),
532
      ]),
533

    
534
    ("Default node parameters",
535
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
536

    
537
    ("Default instance parameters",
538
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
539

    
540
    ("Default nic parameters",
541
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
542

    
543
    ("Default disk parameters",
544
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
545

    
546
    ("Instance policy - limits for instances",
547
     FormatPolicyInfo(result["ipolicy"], None, True)),
548
    ]
549

    
550
  PrintGenericInfo(info)
551
  return 0
552

    
553

    
554
def ClusterCopyFile(opts, args):
555
  """Copy a file from master to some nodes.
556

557
  @param opts: the command line options selected by the user
558
  @type args: list
559
  @param args: should contain only one element, the path of
560
      the file to be copied
561
  @rtype: int
562
  @return: the desired exit code
563

564
  """
565
  filename = args[0]
566
  if not os.path.exists(filename):
567
    raise errors.OpPrereqError("No such filename '%s'" % filename,
568
                               errors.ECODE_INVAL)
569

    
570
  cl = GetClient()
571

    
572
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
573

    
574
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
575
                           secondary_ips=opts.use_replication_network,
576
                           nodegroup=opts.nodegroup)
577

    
578
  srun = ssh.SshRunner(cluster_name)
579
  for node in results:
580
    if not srun.CopyFileToNode(node, filename):
581
      ToStderr("Copy of file %s to node %s failed", filename, node)
582

    
583
  return 0
584

    
585

    
586
def RunClusterCommand(opts, args):
587
  """Run a command on some nodes.
588

589
  @param opts: the command line options selected by the user
590
  @type args: list
591
  @param args: should contain the command to be run and its arguments
592
  @rtype: int
593
  @return: the desired exit code
594

595
  """
596
  cl = GetClient()
597

    
598
  command = " ".join(args)
599

    
600
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
601

    
602
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
603
                                                    "master_node"])
604

    
605
  srun = ssh.SshRunner(cluster_name=cluster_name)
606

    
607
  # Make sure master node is at list end
608
  if master_node in nodes:
609
    nodes.remove(master_node)
610
    nodes.append(master_node)
611

    
612
  for name in nodes:
613
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
614

    
615
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
616
      # Do not output anything for successful commands
617
      continue
618

    
619
    ToStdout("------------------------------------------------")
620
    if opts.show_machine_names:
621
      for line in result.output.splitlines():
622
        ToStdout("%s: %s", name, line)
623
    else:
624
      ToStdout("node: %s", name)
625
      ToStdout("%s", result.output)
626
    ToStdout("return code = %s", result.exit_code)
627

    
628
  return 0
629

    
630

    
631
def VerifyCluster(opts, args):
632
  """Verify integrity of cluster, performing various test on nodes.
633

634
  @param opts: the command line options selected by the user
635
  @type args: list
636
  @param args: should be an empty list
637
  @rtype: int
638
  @return: the desired exit code
639

640
  """
641
  skip_checks = []
642

    
643
  if opts.skip_nplusone_mem:
644
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
645

    
646
  cl = GetClient()
647

    
648
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
649
                               error_codes=opts.error_codes,
650
                               debug_simulate_errors=opts.simulate_errors,
651
                               skip_checks=skip_checks,
652
                               ignore_errors=opts.ignore_errors,
653
                               group_name=opts.nodegroup)
654
  result = SubmitOpCode(op, cl=cl, opts=opts)
655

    
656
  # Keep track of submitted jobs
657
  jex = JobExecutor(cl=cl, opts=opts)
658

    
659
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
660
    jex.AddJobId(None, status, job_id)
661

    
662
  results = jex.GetResults()
663

    
664
  (bad_jobs, bad_results) = \
665
    map(len,
666
        # Convert iterators to lists
667
        map(list,
668
            # Count errors
669
            map(compat.partial(itertools.ifilterfalse, bool),
670
                # Convert result to booleans in a tuple
671
                zip(*((job_success, len(op_results) == 1 and op_results[0])
672
                      for (job_success, op_results) in results)))))
673

    
674
  if bad_jobs == 0 and bad_results == 0:
675
    rcode = constants.EXIT_SUCCESS
676
  else:
677
    rcode = constants.EXIT_FAILURE
678
    if bad_jobs > 0:
679
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
680

    
681
  return rcode
682

    
683

    
684
def VerifyDisks(opts, args):
685
  """Verify integrity of cluster disks.
686

687
  @param opts: the command line options selected by the user
688
  @type args: list
689
  @param args: should be an empty list
690
  @rtype: int
691
  @return: the desired exit code
692

693
  """
694
  cl = GetClient()
695

    
696
  op = opcodes.OpClusterVerifyDisks()
697

    
698
  result = SubmitOpCode(op, cl=cl, opts=opts)
699

    
700
  # Keep track of submitted jobs
701
  jex = JobExecutor(cl=cl, opts=opts)
702

    
703
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
704
    jex.AddJobId(None, status, job_id)
705

    
706
  retcode = constants.EXIT_SUCCESS
707

    
708
  for (status, result) in jex.GetResults():
709
    if not status:
710
      ToStdout("Job failed: %s", result)
711
      continue
712

    
713
    ((bad_nodes, instances, missing), ) = result
714

    
715
    for node, text in bad_nodes.items():
716
      ToStdout("Error gathering data on node %s: %s",
717
               node, utils.SafeEncode(text[-400:]))
718
      retcode = constants.EXIT_FAILURE
719
      ToStdout("You need to fix these nodes first before fixing instances")
720

    
721
    for iname in instances:
722
      if iname in missing:
723
        continue
724
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
725
      try:
726
        ToStdout("Activating disks for instance '%s'", iname)
727
        SubmitOpCode(op, opts=opts, cl=cl)
728
      except errors.GenericError, err:
729
        nret, msg = FormatError(err)
730
        retcode |= nret
731
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
732

    
733
    if missing:
734
      for iname, ival in missing.iteritems():
735
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
736
        if all_missing:
737
          ToStdout("Instance %s cannot be verified as it lives on"
738
                   " broken nodes", iname)
739
        else:
740
          ToStdout("Instance %s has missing logical volumes:", iname)
741
          ival.sort()
742
          for node, vol in ival:
743
            if node in bad_nodes:
744
              ToStdout("\tbroken node %s /dev/%s", node, vol)
745
            else:
746
              ToStdout("\t%s /dev/%s", node, vol)
747

    
748
      ToStdout("You need to replace or recreate disks for all the above"
749
               " instances if this message persists after fixing broken nodes.")
750
      retcode = constants.EXIT_FAILURE
751
    elif not instances:
752
      ToStdout("No disks need to be activated.")
753

    
754
  return retcode
755

    
756

    
757
def RepairDiskSizes(opts, args):
758
  """Verify sizes of cluster disks.
759

760
  @param opts: the command line options selected by the user
761
  @type args: list
762
  @param args: optional list of instances to restrict check to
763
  @rtype: int
764
  @return: the desired exit code
765

766
  """
767
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
768
  SubmitOpCode(op, opts=opts)
769

    
770

    
771
@UsesRPC
772
def MasterFailover(opts, args):
773
  """Failover the master node.
774

775
  This command, when run on a non-master node, will cause the current
776
  master to cease being master, and the non-master to become new
777
  master.
778

779
  @param opts: the command line options selected by the user
780
  @type args: list
781
  @param args: should be an empty list
782
  @rtype: int
783
  @return: the desired exit code
784

785
  """
786
  if opts.no_voting and not opts.yes_do_it:
787
    usertext = ("This will perform the failover even if most other nodes"
788
                " are down, or if this node is outdated. This is dangerous"
789
                " as it can lead to a non-consistent cluster. Check the"
790
                " gnt-cluster(8) man page before proceeding. Continue?")
791
    if not AskUser(usertext):
792
      return 1
793

    
794
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
795

    
796

    
797
def MasterPing(opts, args):
798
  """Checks if the master is alive.
799

800
  @param opts: the command line options selected by the user
801
  @type args: list
802
  @param args: should be an empty list
803
  @rtype: int
804
  @return: the desired exit code
805

806
  """
807
  try:
808
    cl = GetClient()
809
    cl.QueryClusterInfo()
810
    return 0
811
  except Exception: # pylint: disable=W0703
812
    return 1
813

    
814

    
815
def SearchTags(opts, args):
816
  """Searches the tags on all the cluster.
817

818
  @param opts: the command line options selected by the user
819
  @type args: list
820
  @param args: should contain only one element, the tag pattern
821
  @rtype: int
822
  @return: the desired exit code
823

824
  """
825
  op = opcodes.OpTagsSearch(pattern=args[0])
826
  result = SubmitOpCode(op, opts=opts)
827
  if not result:
828
    return 1
829
  result = list(result)
830
  result.sort()
831
  for path, tag in result:
832
    ToStdout("%s %s", path, tag)
833

    
834

    
835
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
836
  """Reads and verifies an X509 certificate.
837

838
  @type cert_filename: string
839
  @param cert_filename: the path of the file containing the certificate to
840
                        verify encoded in PEM format
841
  @type verify_private_key: bool
842
  @param verify_private_key: whether to verify the private key in addition to
843
                             the public certificate
844
  @rtype: string
845
  @return: a string containing the PEM-encoded certificate.
846

847
  """
848
  try:
849
    pem = utils.ReadFile(cert_filename)
850
  except IOError, err:
851
    raise errors.X509CertError(cert_filename,
852
                               "Unable to read certificate: %s" % str(err))
853

    
854
  try:
855
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
856
  except Exception, err:
857
    raise errors.X509CertError(cert_filename,
858
                               "Unable to load certificate: %s" % str(err))
859

    
860
  if verify_private_key:
861
    try:
862
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
863
    except Exception, err:
864
      raise errors.X509CertError(cert_filename,
865
                                 "Unable to load private key: %s" % str(err))
866

    
867
  return pem
868

    
869

    
870
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
871
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
872
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
873
                 cds_filename, force):
874
  """Renews cluster certificates, keys and secrets.
875

876
  @type new_cluster_cert: bool
877
  @param new_cluster_cert: Whether to generate a new cluster certificate
878
  @type new_rapi_cert: bool
879
  @param new_rapi_cert: Whether to generate a new RAPI certificate
880
  @type rapi_cert_filename: string
881
  @param rapi_cert_filename: Path to file containing new RAPI certificate
882
  @type new_spice_cert: bool
883
  @param new_spice_cert: Whether to generate a new SPICE certificate
884
  @type spice_cert_filename: string
885
  @param spice_cert_filename: Path to file containing new SPICE certificate
886
  @type spice_cacert_filename: string
887
  @param spice_cacert_filename: Path to file containing the certificate of the
888
                                CA that signed the SPICE certificate
889
  @type new_confd_hmac_key: bool
890
  @param new_confd_hmac_key: Whether to generate a new HMAC key
891
  @type new_cds: bool
892
  @param new_cds: Whether to generate a new cluster domain secret
893
  @type cds_filename: string
894
  @param cds_filename: Path to file containing new cluster domain secret
895
  @type force: bool
896
  @param force: Whether to ask user for confirmation
897

898
  """
899
  if new_rapi_cert and rapi_cert_filename:
900
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
901
             " options can be specified at the same time.")
902
    return 1
903

    
904
  if new_cds and cds_filename:
905
    ToStderr("Only one of the --new-cluster-domain-secret and"
906
             " --cluster-domain-secret options can be specified at"
907
             " the same time.")
908
    return 1
909

    
910
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
911
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
912
             " and --spice-ca-certificate must not be used.")
913
    return 1
914

    
915
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
916
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
917
             " specified.")
918
    return 1
919

    
920
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
921
  try:
922
    if rapi_cert_filename:
923
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
924
    if spice_cert_filename:
925
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
926
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
927
  except errors.X509CertError, err:
928
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
929
    return 1
930

    
931
  if cds_filename:
932
    try:
933
      cds = utils.ReadFile(cds_filename)
934
    except Exception, err: # pylint: disable=W0703
935
      ToStderr("Can't load new cluster domain secret from %s: %s" %
936
               (cds_filename, str(err)))
937
      return 1
938
  else:
939
    cds = None
940

    
941
  if not force:
942
    usertext = ("This requires all daemons on all nodes to be restarted and"
943
                " may take some time. Continue?")
944
    if not AskUser(usertext):
945
      return 1
946

    
947
  def _RenewCryptoInner(ctx):
948
    ctx.feedback_fn("Updating certificates and keys")
949
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
950
                                    new_rapi_cert,
951
                                    new_spice_cert,
952
                                    new_confd_hmac_key,
953
                                    new_cds,
954
                                    rapi_cert_pem=rapi_cert_pem,
955
                                    spice_cert_pem=spice_cert_pem,
956
                                    spice_cacert_pem=spice_cacert_pem,
957
                                    cds=cds)
958

    
959
    files_to_copy = []
960

    
961
    if new_cluster_cert:
962
      files_to_copy.append(pathutils.NODED_CERT_FILE)
963

    
964
    if new_rapi_cert or rapi_cert_pem:
965
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
966

    
967
    if new_spice_cert or spice_cert_pem:
968
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
969
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
970

    
971
    if new_confd_hmac_key:
972
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
973

    
974
    if new_cds or cds:
975
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
976

    
977
    if files_to_copy:
978
      for node_name in ctx.nonmaster_nodes:
979
        ctx.feedback_fn("Copying %s to %s" %
980
                        (", ".join(files_to_copy), node_name))
981
        for file_name in files_to_copy:
982
          ctx.ssh.CopyFileToNode(node_name, file_name)
983

    
984
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
985

    
986
  ToStdout("All requested certificates and keys have been replaced."
987
           " Running \"gnt-cluster verify\" now is recommended.")
988

    
989
  return 0
990

    
991

    
992
def RenewCrypto(opts, args):
993
  """Renews cluster certificates, keys and secrets.
994

995
  """
996
  return _RenewCrypto(opts.new_cluster_cert,
997
                      opts.new_rapi_cert,
998
                      opts.rapi_cert,
999
                      opts.new_spice_cert,
1000
                      opts.spice_cert,
1001
                      opts.spice_cacert,
1002
                      opts.new_confd_hmac_key,
1003
                      opts.new_cluster_domain_secret,
1004
                      opts.cluster_domain_secret,
1005
                      opts.force)
1006

    
1007

    
1008
def _GetEnabledDiskTemplates(opts):
1009
  """Determine the list of enabled disk templates.
1010

1011
  """
1012
  if opts.enabled_disk_templates:
1013
    return opts.enabled_disk_templates.split(",")
1014
  else:
1015
    return None
1016

    
1017

    
1018
def _GetVgName(opts, enabled_disk_templates):
1019
  """Determine the volume group name.
1020

1021
  @type enabled_disk_templates: list of strings
1022
  @param enabled_disk_templates: cluster-wide enabled disk-templates
1023

1024
  """
1025
  # consistency between vg name and enabled disk templates
1026
  vg_name = None
1027
  if opts.vg_name is not None:
1028
    vg_name = opts.vg_name
1029
  if enabled_disk_templates:
1030
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1031
      ToStdout("You specified a volume group with --vg-name, but you did not"
1032
               " enable any of the following lvm-based disk templates: %s" %
1033
               utils.CommaJoin(utils.GetLvmDiskTemplates()))
1034
  return vg_name
1035

    
1036

    
1037
def _GetDrbdHelper(opts, enabled_disk_templates):
1038
  """Determine the DRBD usermode helper.
1039

1040
  """
1041
  drbd_helper = opts.drbd_helper
1042
  if enabled_disk_templates:
1043
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1044
    if not drbd_enabled and opts.drbd_helper:
1045
      ToStdout("You specified a DRBD usermode helper with "
1046
               " --drbd-usermode-helper while DRBD is not enabled.")
1047
  return drbd_helper
1048

    
1049

    
1050
def SetClusterParams(opts, args):
1051
  """Modify the cluster.
1052

1053
  @param opts: the command line options selected by the user
1054
  @type args: list
1055
  @param args: should be an empty list
1056
  @rtype: int
1057
  @return: the desired exit code
1058

1059
  """
1060
  if not (opts.vg_name is not None or
1061
          opts.drbd_helper is not None or
1062
          opts.enabled_hypervisors or opts.hvparams or
1063
          opts.beparams or opts.nicparams or
1064
          opts.ndparams or opts.diskparams or
1065
          opts.candidate_pool_size is not None or
1066
          opts.uid_pool is not None or
1067
          opts.maintain_node_health is not None or
1068
          opts.add_uids is not None or
1069
          opts.remove_uids is not None or
1070
          opts.default_iallocator is not None or
1071
          opts.reserved_lvs is not None or
1072
          opts.master_netdev is not None or
1073
          opts.master_netmask is not None or
1074
          opts.use_external_mip_script is not None or
1075
          opts.prealloc_wipe_disks is not None or
1076
          opts.hv_state or
1077
          opts.enabled_disk_templates or
1078
          opts.disk_state or
1079
          opts.ipolicy_bounds_specs is not None or
1080
          opts.ipolicy_std_specs is not None or
1081
          opts.ipolicy_disk_templates is not None or
1082
          opts.ipolicy_vcpu_ratio is not None or
1083
          opts.ipolicy_spindle_ratio is not None or
1084
          opts.modify_etc_hosts is not None or
1085
          opts.file_storage_dir is not None):
1086
    ToStderr("Please give at least one of the parameters.")
1087
    return 1
1088

    
1089
  if _CheckNoLvmStorageOptDeprecated(opts):
1090
    return 1
1091

    
1092
  enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1093
  vg_name = _GetVgName(opts, enabled_disk_templates)
1094

    
1095
  try:
1096
    drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1097
  except errors.OpPrereqError, e:
1098
    ToStderr(str(e))
1099
    return 1
1100

    
1101
  hvlist = opts.enabled_hypervisors
1102
  if hvlist is not None:
1103
    hvlist = hvlist.split(",")
1104

    
1105
  # a list of (name, dict) we can pass directly to dict() (or [])
1106
  hvparams = dict(opts.hvparams)
1107
  for hv_params in hvparams.values():
1108
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1109

    
1110
  diskparams = dict(opts.diskparams)
1111

    
1112
  for dt_params in diskparams.values():
1113
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1114

    
1115
  beparams = opts.beparams
1116
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1117

    
1118
  nicparams = opts.nicparams
1119
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1120

    
1121
  ndparams = opts.ndparams
1122
  if ndparams is not None:
1123
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1124

    
1125
  ipolicy = CreateIPolicyFromOpts(
1126
    minmax_ispecs=opts.ipolicy_bounds_specs,
1127
    std_ispecs=opts.ipolicy_std_specs,
1128
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1129
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1130
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1131
    )
1132

    
1133
  mnh = opts.maintain_node_health
1134

    
1135
  uid_pool = opts.uid_pool
1136
  if uid_pool is not None:
1137
    uid_pool = uidpool.ParseUidPool(uid_pool)
1138

    
1139
  add_uids = opts.add_uids
1140
  if add_uids is not None:
1141
    add_uids = uidpool.ParseUidPool(add_uids)
1142

    
1143
  remove_uids = opts.remove_uids
1144
  if remove_uids is not None:
1145
    remove_uids = uidpool.ParseUidPool(remove_uids)
1146

    
1147
  if opts.reserved_lvs is not None:
1148
    if opts.reserved_lvs == "":
1149
      opts.reserved_lvs = []
1150
    else:
1151
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1152

    
1153
  if opts.master_netmask is not None:
1154
    try:
1155
      opts.master_netmask = int(opts.master_netmask)
1156
    except ValueError:
1157
      ToStderr("The --master-netmask option expects an int parameter.")
1158
      return 1
1159

    
1160
  ext_ip_script = opts.use_external_mip_script
1161

    
1162
  if opts.disk_state:
1163
    disk_state = utils.FlatToDict(opts.disk_state)
1164
  else:
1165
    disk_state = {}
1166

    
1167
  hv_state = dict(opts.hv_state)
1168

    
1169
  op = opcodes.OpClusterSetParams(
1170
    vg_name=vg_name,
1171
    drbd_helper=drbd_helper,
1172
    enabled_hypervisors=hvlist,
1173
    hvparams=hvparams,
1174
    os_hvp=None,
1175
    beparams=beparams,
1176
    nicparams=nicparams,
1177
    ndparams=ndparams,
1178
    diskparams=diskparams,
1179
    ipolicy=ipolicy,
1180
    candidate_pool_size=opts.candidate_pool_size,
1181
    maintain_node_health=mnh,
1182
    modify_etc_hosts=opts.modify_etc_hosts,
1183
    uid_pool=uid_pool,
1184
    add_uids=add_uids,
1185
    remove_uids=remove_uids,
1186
    default_iallocator=opts.default_iallocator,
1187
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1188
    master_netdev=opts.master_netdev,
1189
    master_netmask=opts.master_netmask,
1190
    reserved_lvs=opts.reserved_lvs,
1191
    use_external_mip_script=ext_ip_script,
1192
    hv_state=hv_state,
1193
    disk_state=disk_state,
1194
    enabled_disk_templates=enabled_disk_templates,
1195
    force=opts.force,
1196
    file_storage_dir=opts.file_storage_dir,
1197
    )
1198
  SubmitOrSend(op, opts)
1199
  return 0
1200

    
1201

    
1202
def QueueOps(opts, args):
1203
  """Queue operations.
1204

1205
  @param opts: the command line options selected by the user
1206
  @type args: list
1207
  @param args: should contain only one element, the subcommand
1208
  @rtype: int
1209
  @return: the desired exit code
1210

1211
  """
1212
  command = args[0]
1213
  client = GetClient()
1214
  if command in ("drain", "undrain"):
1215
    drain_flag = command == "drain"
1216
    client.SetQueueDrainFlag(drain_flag)
1217
  elif command == "info":
1218
    result = client.QueryConfigValues(["drain_flag"])
1219
    if result[0]:
1220
      val = "set"
1221
    else:
1222
      val = "unset"
1223
    ToStdout("The drain flag is %s" % val)
1224
  else:
1225
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1226
                               errors.ECODE_INVAL)
1227

    
1228
  return 0
1229

    
1230

    
1231
def _ShowWatcherPause(until):
1232
  if until is None or until < time.time():
1233
    ToStdout("The watcher is not paused.")
1234
  else:
1235
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1236

    
1237

    
1238
def WatcherOps(opts, args):
1239
  """Watcher operations.
1240

1241
  @param opts: the command line options selected by the user
1242
  @type args: list
1243
  @param args: should contain only one element, the subcommand
1244
  @rtype: int
1245
  @return: the desired exit code
1246

1247
  """
1248
  command = args[0]
1249
  client = GetClient()
1250

    
1251
  if command == "continue":
1252
    client.SetWatcherPause(None)
1253
    ToStdout("The watcher is no longer paused.")
1254

    
1255
  elif command == "pause":
1256
    if len(args) < 2:
1257
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1258

    
1259
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1260
    _ShowWatcherPause(result)
1261

    
1262
  elif command == "info":
1263
    result = client.QueryConfigValues(["watcher_pause"])
1264
    _ShowWatcherPause(result[0])
1265

    
1266
  else:
1267
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1268
                               errors.ECODE_INVAL)
1269

    
1270
  return 0
1271

    
1272

    
1273
def _OobPower(opts, node_list, power):
1274
  """Puts the node in the list to desired power state.
1275

1276
  @param opts: The command line options selected by the user
1277
  @param node_list: The list of nodes to operate on
1278
  @param power: True if they should be powered on, False otherwise
1279
  @return: The success of the operation (none failed)
1280

1281
  """
1282
  if power:
1283
    command = constants.OOB_POWER_ON
1284
  else:
1285
    command = constants.OOB_POWER_OFF
1286

    
1287
  op = opcodes.OpOobCommand(node_names=node_list,
1288
                            command=command,
1289
                            ignore_status=True,
1290
                            timeout=opts.oob_timeout,
1291
                            power_delay=opts.power_delay)
1292
  result = SubmitOpCode(op, opts=opts)
1293
  errs = 0
1294
  for node_result in result:
1295
    (node_tuple, data_tuple) = node_result
1296
    (_, node_name) = node_tuple
1297
    (data_status, _) = data_tuple
1298
    if data_status != constants.RS_NORMAL:
1299
      assert data_status != constants.RS_UNAVAIL
1300
      errs += 1
1301
      ToStderr("There was a problem changing power for %s, please investigate",
1302
               node_name)
1303

    
1304
  if errs > 0:
1305
    return False
1306

    
1307
  return True
1308

    
1309

    
1310
def _InstanceStart(opts, inst_list, start, no_remember=False):
1311
  """Puts the instances in the list to desired state.
1312

1313
  @param opts: The command line options selected by the user
1314
  @param inst_list: The list of instances to operate on
1315
  @param start: True if they should be started, False for shutdown
1316
  @param no_remember: If the instance state should be remembered
1317
  @return: The success of the operation (none failed)
1318

1319
  """
1320
  if start:
1321
    opcls = opcodes.OpInstanceStartup
1322
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1323
  else:
1324
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1325
                           timeout=opts.shutdown_timeout,
1326
                           no_remember=no_remember)
1327
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1328

    
1329
  jex = JobExecutor(opts=opts)
1330

    
1331
  for inst in inst_list:
1332
    ToStdout("Submit %s of instance %s", text_submit, inst)
1333
    op = opcls(instance_name=inst)
1334
    jex.QueueJob(inst, op)
1335

    
1336
  results = jex.GetResults()
1337
  bad_cnt = len([1 for (success, _) in results if not success])
1338

    
1339
  if bad_cnt == 0:
1340
    ToStdout("All instances have been %s successfully", text_success)
1341
  else:
1342
    ToStderr("There were errors while %s instances:\n"
1343
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1344
             len(results))
1345
    return False
1346

    
1347
  return True
1348

    
1349

    
1350
class _RunWhenNodesReachableHelper:
1351
  """Helper class to make shared internal state sharing easier.
1352

1353
  @ivar success: Indicates if all action_cb calls were successful
1354

1355
  """
1356
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1357
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1358
    """Init the object.
1359

1360
    @param node_list: The list of nodes to be reachable
1361
    @param action_cb: Callback called when a new host is reachable
1362
    @type node2ip: dict
1363
    @param node2ip: Node to ip mapping
1364
    @param port: The port to use for the TCP ping
1365
    @param feedback_fn: The function used for feedback
1366
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1367
    @param _sleep_fn: Function to sleep (for unittest use only)
1368

1369
    """
1370
    self.down = set(node_list)
1371
    self.up = set()
1372
    self.node2ip = node2ip
1373
    self.success = True
1374
    self.action_cb = action_cb
1375
    self.port = port
1376
    self.feedback_fn = feedback_fn
1377
    self._ping_fn = _ping_fn
1378
    self._sleep_fn = _sleep_fn
1379

    
1380
  def __call__(self):
1381
    """When called we run action_cb.
1382

1383
    @raises utils.RetryAgain: When there are still down nodes
1384

1385
    """
1386
    if not self.action_cb(self.up):
1387
      self.success = False
1388

    
1389
    if self.down:
1390
      raise utils.RetryAgain()
1391
    else:
1392
      return self.success
1393

    
1394
  def Wait(self, secs):
1395
    """Checks if a host is up or waits remaining seconds.
1396

1397
    @param secs: The secs remaining
1398

1399
    """
1400
    start = time.time()
1401
    for node in self.down:
1402
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1403
                       live_port_needed=True):
1404
        self.feedback_fn("Node %s became available" % node)
1405
        self.up.add(node)
1406
        self.down -= self.up
1407
        # If we have a node available there is the possibility to run the
1408
        # action callback successfully, therefore we don't wait and return
1409
        return
1410

    
1411
    self._sleep_fn(max(0.0, start + secs - time.time()))
1412

    
1413

    
1414
def _RunWhenNodesReachable(node_list, action_cb, interval):
1415
  """Run action_cb when nodes become reachable.
1416

1417
  @param node_list: The list of nodes to be reachable
1418
  @param action_cb: Callback called when a new host is reachable
1419
  @param interval: The earliest time to retry
1420

1421
  """
1422
  client = GetClient()
1423
  cluster_info = client.QueryClusterInfo()
1424
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1425
    family = netutils.IPAddress.family
1426
  else:
1427
    family = netutils.IP6Address.family
1428

    
1429
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1430
                 for node in node_list)
1431

    
1432
  port = netutils.GetDaemonPort(constants.NODED)
1433
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1434
                                        ToStdout)
1435

    
1436
  try:
1437
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1438
                       wait_fn=helper.Wait)
1439
  except utils.RetryTimeout:
1440
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1441
             " again:\n  - %s", "  - ".join(helper.down))
1442
    return False
1443

    
1444

    
1445
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1446
                          _instance_start_fn=_InstanceStart):
1447
  """Start the instances conditional based on node_states.
1448

1449
  @param opts: The command line options selected by the user
1450
  @param inst_map: A dict of inst -> nodes mapping
1451
  @param nodes_online: A list of nodes online
1452
  @param _instance_start_fn: Callback to start instances (unittest use only)
1453
  @return: Success of the operation on all instances
1454

1455
  """
1456
  start_inst_list = []
1457
  for (inst, nodes) in inst_map.items():
1458
    if not (nodes - nodes_online):
1459
      # All nodes the instance lives on are back online
1460
      start_inst_list.append(inst)
1461

    
1462
  for inst in start_inst_list:
1463
    del inst_map[inst]
1464

    
1465
  if start_inst_list:
1466
    return _instance_start_fn(opts, start_inst_list, True)
1467

    
1468
  return True
1469

    
1470

    
1471
def _EpoOn(opts, full_node_list, node_list, inst_map):
1472
  """Does the actual power on.
1473

1474
  @param opts: The command line options selected by the user
1475
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1476
                         OOB)
1477
  @param node_list: The list of nodes to operate on (all need to support OOB)
1478
  @param inst_map: A dict of inst -> nodes mapping
1479
  @return: The desired exit status
1480

1481
  """
1482
  if node_list and not _OobPower(opts, node_list, False):
1483
    ToStderr("Not all nodes seem to get back up, investigate and start"
1484
             " manually if needed")
1485

    
1486
  # Wait for the nodes to be back up
1487
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1488

    
1489
  ToStdout("Waiting until all nodes are available again")
1490
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1491
    ToStderr("Please investigate and start stopped instances manually")
1492
    return constants.EXIT_FAILURE
1493

    
1494
  return constants.EXIT_SUCCESS
1495

    
1496

    
1497
def _EpoOff(opts, node_list, inst_map):
1498
  """Does the actual power off.
1499

1500
  @param opts: The command line options selected by the user
1501
  @param node_list: The list of nodes to operate on (all need to support OOB)
1502
  @param inst_map: A dict of inst -> nodes mapping
1503
  @return: The desired exit status
1504

1505
  """
1506
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1507
    ToStderr("Please investigate and stop instances manually before continuing")
1508
    return constants.EXIT_FAILURE
1509

    
1510
  if not node_list:
1511
    return constants.EXIT_SUCCESS
1512

    
1513
  if _OobPower(opts, node_list, False):
1514
    return constants.EXIT_SUCCESS
1515
  else:
1516
    return constants.EXIT_FAILURE
1517

    
1518

    
1519
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1520
        _confirm_fn=ConfirmOperation,
1521
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1522
  """EPO operations.
1523

1524
  @param opts: the command line options selected by the user
1525
  @type args: list
1526
  @param args: should contain only one element, the subcommand
1527
  @rtype: int
1528
  @return: the desired exit code
1529

1530
  """
1531
  if opts.groups and opts.show_all:
1532
    _stderr_fn("Only one of --groups or --all are allowed")
1533
    return constants.EXIT_FAILURE
1534
  elif args and opts.show_all:
1535
    _stderr_fn("Arguments in combination with --all are not allowed")
1536
    return constants.EXIT_FAILURE
1537

    
1538
  if cl is None:
1539
    cl = GetClient()
1540

    
1541
  if opts.groups:
1542
    node_query_list = \
1543
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1544
  else:
1545
    node_query_list = args
1546

    
1547
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1548
                                           "sinst_list", "powered", "offline"],
1549
                         False)
1550

    
1551
  all_nodes = map(compat.fst, result)
1552
  node_list = []
1553
  inst_map = {}
1554
  for (node, master, pinsts, sinsts, powered, offline) in result:
1555
    if not offline:
1556
      for inst in (pinsts + sinsts):
1557
        if inst in inst_map:
1558
          if not master:
1559
            inst_map[inst].add(node)
1560
        elif master:
1561
          inst_map[inst] = set()
1562
        else:
1563
          inst_map[inst] = set([node])
1564

    
1565
    if master and opts.on:
1566
      # We ignore the master for turning on the machines, in fact we are
1567
      # already operating on the master at this point :)
1568
      continue
1569
    elif master and not opts.show_all:
1570
      _stderr_fn("%s is the master node, please do a master-failover to another"
1571
                 " node not affected by the EPO or use --all if you intend to"
1572
                 " shutdown the whole cluster", node)
1573
      return constants.EXIT_FAILURE
1574
    elif powered is None:
1575
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1576
                 " handled in a fully automated manner", node)
1577
    elif powered == opts.on:
1578
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1579
    elif not offline or (offline and powered):
1580
      node_list.append(node)
1581

    
1582
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1583
    return constants.EXIT_FAILURE
1584

    
1585
  if opts.on:
1586
    return _on_fn(opts, all_nodes, node_list, inst_map)
1587
  else:
1588
    return _off_fn(opts, node_list, inst_map)
1589

    
1590

    
1591
def _GetCreateCommand(info):
1592
  buf = StringIO()
1593
  buf.write("gnt-cluster init")
1594
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1595
  buf.write(" ")
1596
  buf.write(info["name"])
1597
  return buf.getvalue()
1598

    
1599

    
1600
def ShowCreateCommand(opts, args):
1601
  """Shows the command that can be used to re-create the cluster.
1602

1603
  Currently it works only for ipolicy specs.
1604

1605
  """
1606
  cl = GetClient(query=True)
1607
  result = cl.QueryClusterInfo()
1608
  ToStdout(_GetCreateCommand(result))
1609

    
1610

    
1611
commands = {
1612
  "init": (
1613
    InitCluster, [ArgHost(min=1, max=1)],
1614
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1615
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1616
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1617
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1618
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1619
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1620
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1621
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
1622
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
1623
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1624
  "destroy": (
1625
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1626
    "", "Destroy cluster"),
1627
  "rename": (
1628
    RenameCluster, [ArgHost(min=1, max=1)],
1629
    [FORCE_OPT, DRY_RUN_OPT],
1630
    "<new_name>",
1631
    "Renames the cluster"),
1632
  "redist-conf": (
1633
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS + [DRY_RUN_OPT, PRIORITY_OPT],
1634
    "", "Forces a push of the configuration file and ssconf files"
1635
    " to the nodes in the cluster"),
1636
  "verify": (
1637
    VerifyCluster, ARGS_NONE,
1638
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1639
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1640
    "", "Does a check on the cluster configuration"),
1641
  "verify-disks": (
1642
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1643
    "", "Does a check on the cluster disk status"),
1644
  "repair-disk-sizes": (
1645
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1646
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1647
  "master-failover": (
1648
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1649
    "", "Makes the current node the master"),
1650
  "master-ping": (
1651
    MasterPing, ARGS_NONE, [],
1652
    "", "Checks if the master is alive"),
1653
  "version": (
1654
    ShowClusterVersion, ARGS_NONE, [],
1655
    "", "Shows the cluster version"),
1656
  "getmaster": (
1657
    ShowClusterMaster, ARGS_NONE, [],
1658
    "", "Shows the cluster master"),
1659
  "copyfile": (
1660
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1661
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1662
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1663
  "command": (
1664
    RunClusterCommand, [ArgCommand(min=1)],
1665
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1666
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1667
  "info": (
1668
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1669
    "[--roman]", "Show cluster configuration"),
1670
  "list-tags": (
1671
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1672
  "add-tags": (
1673
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
1674
    "tag...", "Add tags to the cluster"),
1675
  "remove-tags": (
1676
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
1677
    "tag...", "Remove tags from the cluster"),
1678
  "search-tags": (
1679
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1680
    "Searches the tags on all objects on"
1681
    " the cluster for a given pattern (regex)"),
1682
  "queue": (
1683
    QueueOps,
1684
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1685
    [], "drain|undrain|info", "Change queue properties"),
1686
  "watcher": (
1687
    WatcherOps,
1688
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1689
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1690
    [],
1691
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1692
  "modify": (
1693
    SetClusterParams, ARGS_NONE,
1694
    [FORCE_OPT,
1695
     BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1696
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1697
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1698
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1699
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1700
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1701
     DISK_STATE_OPT] + SUBMIT_OPTS +
1702
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
1703
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
1704
    "[opts...]",
1705
    "Alters the parameters of the cluster"),
1706
  "renew-crypto": (
1707
    RenewCrypto, ARGS_NONE,
1708
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1709
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1710
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1711
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1712
    "[opts...]",
1713
    "Renews cluster certificates, keys and secrets"),
1714
  "epo": (
1715
    Epo, [ArgUnknown()],
1716
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1717
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1718
    "[opts...] [args]",
1719
    "Performs an emergency power-off on given args"),
1720
  "activate-master-ip": (
1721
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1722
  "deactivate-master-ip": (
1723
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1724
    "Deactivates the master IP"),
1725
  "show-ispecs-cmd": (
1726
    ShowCreateCommand, ARGS_NONE, [], "",
1727
    "Show the command line to re-create the cluster"),
1728
  }
1729

    
1730

    
1731
#: dictionary with aliases for commands
1732
aliases = {
1733
  "masterfailover": "master-failover",
1734
  "show": "info",
1735
}
1736

    
1737

    
1738
def Main():
1739
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1740
                     aliases=aliases)