Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ c121d42f

History | View | Annotate | Download (54.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os.path
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import pathutils
47

    
48

    
49
ON_OPT = cli_option("--on", default=False,
50
                    action="store_true", dest="on",
51
                    help="Recover from an EPO")
52

    
53
GROUPS_OPT = cli_option("--groups", default=False,
54
                        action="store_true", dest="groups",
55
                        help="Arguments are node groups instead of nodes")
56

    
57
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
58
                            help="Override interactive check for --no-voting",
59
                            default=False, action="store_true")
60

    
61
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
62
_EPO_PING_TIMEOUT = 1 # 1 second
63
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
64

    
65

    
66
def _CheckNoLvmStorageOptDeprecated(opts):
67
  """Checks if the legacy option '--no-lvm-storage' is used.
68

69
  """
70
  if not opts.lvm_storage:
71
    ToStderr("The option --no-lvm-storage is no longer supported. If you want"
72
             " to disable lvm-based storage cluster-wide, use the option"
73
             " --enabled-disk-templates to disable all of these lvm-base disk "
74
             "  templates: %s" %
75
             utils.CommaJoin(utils.GetLvmDiskTemplates()))
76
    return 1
77

    
78

    
79
def _InitEnabledDiskTemplates(opts):
80
  """Initialize the list of enabled disk templates.
81

82
  """
83
  if opts.enabled_disk_templates:
84
    return opts.enabled_disk_templates.split(",")
85
  else:
86
    return constants.DEFAULT_ENABLED_DISK_TEMPLATES
87

    
88

    
89
def _InitVgName(opts, enabled_disk_templates):
90
  """Initialize the volume group name.
91

92
  @type enabled_disk_templates: list of strings
93
  @param enabled_disk_templates: cluster-wide enabled disk templates
94

95
  """
96
  vg_name = None
97
  if opts.vg_name is not None:
98
    vg_name = opts.vg_name
99
    if vg_name:
100
      if not utils.IsLvmEnabled(enabled_disk_templates):
101
        ToStdout("You specified a volume group with --vg-name, but you did not"
102
                 " enable any disk template that uses lvm.")
103
    elif utils.IsLvmEnabled(enabled_disk_templates):
104
      raise errors.OpPrereqError(
105
          "LVM disk templates are enabled, but vg name not set.")
106
  elif utils.IsLvmEnabled(enabled_disk_templates):
107
    vg_name = constants.DEFAULT_VG
108
  return vg_name
109

    
110

    
111
def _InitDrbdHelper(opts):
112
  """Initialize the DRBD usermode helper.
113

114
  """
115
  if not opts.drbd_storage and opts.drbd_helper:
116
    raise errors.OpPrereqError(
117
        "Options --no-drbd-storage and --drbd-usermode-helper conflict.")
118

    
119
  if opts.drbd_storage and not opts.drbd_helper:
120
    return constants.DEFAULT_DRBD_HELPER
121

    
122
  return opts.drbd_helper
123

    
124

    
125
@UsesRPC
126
def InitCluster(opts, args):
127
  """Initialize the cluster.
128

129
  @param opts: the command line options selected by the user
130
  @type args: list
131
  @param args: should contain only one element, the desired
132
      cluster name
133
  @rtype: int
134
  @return: the desired exit code
135

136
  """
137
  if _CheckNoLvmStorageOptDeprecated(opts):
138
    return 1
139

    
140
  enabled_disk_templates = _InitEnabledDiskTemplates(opts)
141

    
142
  try:
143
    vg_name = _InitVgName(opts, enabled_disk_templates)
144
    drbd_helper = _InitDrbdHelper(opts)
145
  except errors.OpPrereqError, e:
146
    ToStderr(str(e))
147
    return 1
148

    
149
  master_netdev = opts.master_netdev
150
  if master_netdev is None:
151
    nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
152
    if not nic_mode:
153
      # default case, use bridging
154
      master_netdev = constants.DEFAULT_BRIDGE
155
    elif nic_mode == constants.NIC_MODE_OVS:
156
      # default ovs is different from default bridge
157
      master_netdev = constants.DEFAULT_OVS
158
      opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
159

    
160
  hvlist = opts.enabled_hypervisors
161
  if hvlist is None:
162
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
163
  hvlist = hvlist.split(",")
164

    
165
  hvparams = dict(opts.hvparams)
166
  beparams = opts.beparams
167
  nicparams = opts.nicparams
168

    
169
  diskparams = dict(opts.diskparams)
170

    
171
  # check the disk template types here, as we cannot rely on the type check done
172
  # by the opcode parameter types
173
  diskparams_keys = set(diskparams.keys())
174
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
175
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
176
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
177
    return 1
178

    
179
  # prepare beparams dict
180
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
181
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
182

    
183
  # prepare nicparams dict
184
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
185
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
186

    
187
  # prepare ndparams dict
188
  if opts.ndparams is None:
189
    ndparams = dict(constants.NDC_DEFAULTS)
190
  else:
191
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
192
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
193

    
194
  # prepare hvparams dict
195
  for hv in constants.HYPER_TYPES:
196
    if hv not in hvparams:
197
      hvparams[hv] = {}
198
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
199
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
200

    
201
  # prepare diskparams dict
202
  for templ in constants.DISK_TEMPLATES:
203
    if templ not in diskparams:
204
      diskparams[templ] = {}
205
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
206
                                         diskparams[templ])
207
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
208

    
209
  # prepare ipolicy dict
210
  ipolicy = CreateIPolicyFromOpts(
211
    ispecs_mem_size=opts.ispecs_mem_size,
212
    ispecs_cpu_count=opts.ispecs_cpu_count,
213
    ispecs_disk_count=opts.ispecs_disk_count,
214
    ispecs_disk_size=opts.ispecs_disk_size,
215
    ispecs_nic_count=opts.ispecs_nic_count,
216
    minmax_ispecs=opts.ipolicy_bounds_specs,
217
    std_ispecs=opts.ipolicy_std_specs,
218
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
219
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
220
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
221
    fill_all=True)
222

    
223
  if opts.candidate_pool_size is None:
224
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
225

    
226
  if opts.mac_prefix is None:
227
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
228

    
229
  uid_pool = opts.uid_pool
230
  if uid_pool is not None:
231
    uid_pool = uidpool.ParseUidPool(uid_pool)
232

    
233
  if opts.prealloc_wipe_disks is None:
234
    opts.prealloc_wipe_disks = False
235

    
236
  external_ip_setup_script = opts.use_external_mip_script
237
  if external_ip_setup_script is None:
238
    external_ip_setup_script = False
239

    
240
  try:
241
    primary_ip_version = int(opts.primary_ip_version)
242
  except (ValueError, TypeError), err:
243
    ToStderr("Invalid primary ip version value: %s" % str(err))
244
    return 1
245

    
246
  master_netmask = opts.master_netmask
247
  try:
248
    if master_netmask is not None:
249
      master_netmask = int(master_netmask)
250
  except (ValueError, TypeError), err:
251
    ToStderr("Invalid master netmask value: %s" % str(err))
252
    return 1
253

    
254
  if opts.disk_state:
255
    disk_state = utils.FlatToDict(opts.disk_state)
256
  else:
257
    disk_state = {}
258

    
259
  hv_state = dict(opts.hv_state)
260

    
261
  bootstrap.InitCluster(cluster_name=args[0],
262
                        secondary_ip=opts.secondary_ip,
263
                        vg_name=vg_name,
264
                        mac_prefix=opts.mac_prefix,
265
                        master_netmask=master_netmask,
266
                        master_netdev=master_netdev,
267
                        file_storage_dir=opts.file_storage_dir,
268
                        shared_file_storage_dir=opts.shared_file_storage_dir,
269
                        enabled_hypervisors=hvlist,
270
                        hvparams=hvparams,
271
                        beparams=beparams,
272
                        nicparams=nicparams,
273
                        ndparams=ndparams,
274
                        diskparams=diskparams,
275
                        ipolicy=ipolicy,
276
                        candidate_pool_size=opts.candidate_pool_size,
277
                        modify_etc_hosts=opts.modify_etc_hosts,
278
                        modify_ssh_setup=opts.modify_ssh_setup,
279
                        maintain_node_health=opts.maintain_node_health,
280
                        drbd_helper=drbd_helper,
281
                        uid_pool=uid_pool,
282
                        default_iallocator=opts.default_iallocator,
283
                        primary_ip_version=primary_ip_version,
284
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
285
                        use_external_mip_script=external_ip_setup_script,
286
                        hv_state=hv_state,
287
                        disk_state=disk_state,
288
                        enabled_disk_templates=enabled_disk_templates,
289
                        )
290
  op = opcodes.OpClusterPostInit()
291
  SubmitOpCode(op, opts=opts)
292
  return 0
293

    
294

    
295
@UsesRPC
296
def DestroyCluster(opts, args):
297
  """Destroy the cluster.
298

299
  @param opts: the command line options selected by the user
300
  @type args: list
301
  @param args: should be an empty list
302
  @rtype: int
303
  @return: the desired exit code
304

305
  """
306
  if not opts.yes_do_it:
307
    ToStderr("Destroying a cluster is irreversible. If you really want"
308
             " destroy this cluster, supply the --yes-do-it option.")
309
    return 1
310

    
311
  op = opcodes.OpClusterDestroy()
312
  master_uuid = SubmitOpCode(op, opts=opts)
313
  # if we reached this, the opcode didn't fail; we can proceed to
314
  # shutdown all the daemons
315
  bootstrap.FinalizeClusterDestroy(master_uuid)
316
  return 0
317

    
318

    
319
def RenameCluster(opts, args):
320
  """Rename the cluster.
321

322
  @param opts: the command line options selected by the user
323
  @type args: list
324
  @param args: should contain only one element, the new cluster name
325
  @rtype: int
326
  @return: the desired exit code
327

328
  """
329
  cl = GetClient()
330

    
331
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
332

    
333
  new_name = args[0]
334
  if not opts.force:
335
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
336
                " connected over the network to the cluster name, the"
337
                " operation is very dangerous as the IP address will be"
338
                " removed from the node and the change may not go through."
339
                " Continue?") % (cluster_name, new_name)
340
    if not AskUser(usertext):
341
      return 1
342

    
343
  op = opcodes.OpClusterRename(name=new_name)
344
  result = SubmitOpCode(op, opts=opts, cl=cl)
345

    
346
  if result:
347
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
348

    
349
  return 0
350

    
351

    
352
def ActivateMasterIp(opts, args):
353
  """Activates the master IP.
354

355
  """
356
  op = opcodes.OpClusterActivateMasterIp()
357
  SubmitOpCode(op)
358
  return 0
359

    
360

    
361
def DeactivateMasterIp(opts, args):
362
  """Deactivates the master IP.
363

364
  """
365
  if not opts.confirm:
366
    usertext = ("This will disable the master IP. All the open connections to"
367
                " the master IP will be closed. To reach the master you will"
368
                " need to use its node IP."
369
                " Continue?")
370
    if not AskUser(usertext):
371
      return 1
372

    
373
  op = opcodes.OpClusterDeactivateMasterIp()
374
  SubmitOpCode(op)
375
  return 0
376

    
377

    
378
def RedistributeConfig(opts, args):
379
  """Forces push of the cluster configuration.
380

381
  @param opts: the command line options selected by the user
382
  @type args: list
383
  @param args: empty list
384
  @rtype: int
385
  @return: the desired exit code
386

387
  """
388
  op = opcodes.OpClusterRedistConf()
389
  SubmitOrSend(op, opts)
390
  return 0
391

    
392

    
393
def ShowClusterVersion(opts, args):
394
  """Write version of ganeti software to the standard output.
395

396
  @param opts: the command line options selected by the user
397
  @type args: list
398
  @param args: should be an empty list
399
  @rtype: int
400
  @return: the desired exit code
401

402
  """
403
  cl = GetClient(query=True)
404
  result = cl.QueryClusterInfo()
405
  ToStdout("Software version: %s", result["software_version"])
406
  ToStdout("Internode protocol: %s", result["protocol_version"])
407
  ToStdout("Configuration format: %s", result["config_version"])
408
  ToStdout("OS api version: %s", result["os_api_version"])
409
  ToStdout("Export interface: %s", result["export_version"])
410
  ToStdout("VCS version: %s", result["vcs_version"])
411
  return 0
412

    
413

    
414
def ShowClusterMaster(opts, args):
415
  """Write name of master node to the standard output.
416

417
  @param opts: the command line options selected by the user
418
  @type args: list
419
  @param args: should be an empty list
420
  @rtype: int
421
  @return: the desired exit code
422

423
  """
424
  master = bootstrap.GetMaster()
425
  ToStdout(master)
426
  return 0
427

    
428

    
429
def _FormatGroupedParams(paramsdict, roman=False):
430
  """Format Grouped parameters (be, nic, disk) by group.
431

432
  @type paramsdict: dict of dicts
433
  @param paramsdict: {group: {param: value, ...}, ...}
434
  @rtype: dict of dicts
435
  @return: copy of the input dictionaries with strings as values
436

437
  """
438
  ret = {}
439
  for (item, val) in paramsdict.items():
440
    if isinstance(val, dict):
441
      ret[item] = _FormatGroupedParams(val, roman=roman)
442
    elif roman and isinstance(val, int):
443
      ret[item] = compat.TryToRoman(val)
444
    else:
445
      ret[item] = str(val)
446
  return ret
447

    
448

    
449
def ShowClusterConfig(opts, args):
450
  """Shows cluster information.
451

452
  @param opts: the command line options selected by the user
453
  @type args: list
454
  @param args: should be an empty list
455
  @rtype: int
456
  @return: the desired exit code
457

458
  """
459
  cl = GetClient(query=True)
460
  result = cl.QueryClusterInfo()
461

    
462
  if result["tags"]:
463
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
464
  else:
465
    tags = "(none)"
466
  if result["reserved_lvs"]:
467
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
468
  else:
469
    reserved_lvs = "(none)"
470

    
471
  enabled_hv = result["enabled_hypervisors"]
472
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
473
                  if k in enabled_hv)
474

    
475
  info = [
476
    ("Cluster name", result["name"]),
477
    ("Cluster UUID", result["uuid"]),
478

    
479
    ("Creation time", utils.FormatTime(result["ctime"])),
480
    ("Modification time", utils.FormatTime(result["mtime"])),
481

    
482
    ("Master node", result["master"]),
483

    
484
    ("Architecture (this node)",
485
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
486

    
487
    ("Tags", tags),
488

    
489
    ("Default hypervisor", result["default_hypervisor"]),
490
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
491

    
492
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
493

    
494
    ("OS-specific hypervisor parameters",
495
     _FormatGroupedParams(result["os_hvp"])),
496

    
497
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
498

    
499
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
500
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
501

    
502
    ("Cluster parameters", [
503
      ("candidate pool size",
504
       compat.TryToRoman(result["candidate_pool_size"],
505
                         convert=opts.roman_integers)),
506
      ("master netdev", result["master_netdev"]),
507
      ("master netmask", result["master_netmask"]),
508
      ("use external master IP address setup script",
509
       result["use_external_mip_script"]),
510
      ("lvm volume group", result["volume_group_name"]),
511
      ("lvm reserved volumes", reserved_lvs),
512
      ("drbd usermode helper", result["drbd_usermode_helper"]),
513
      ("file storage path", result["file_storage_dir"]),
514
      ("shared file storage path", result["shared_file_storage_dir"]),
515
      ("maintenance of node health", result["maintain_node_health"]),
516
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
517
      ("default instance allocator", result["default_iallocator"]),
518
      ("primary ip version", result["primary_ip_version"]),
519
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
520
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
521
      ("ExtStorage Providers search path",
522
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
523
      ("enabled disk templates",
524
       utils.CommaJoin(result["enabled_disk_templates"])),
525
      ]),
526

    
527
    ("Default node parameters",
528
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
529

    
530
    ("Default instance parameters",
531
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
532

    
533
    ("Default nic parameters",
534
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
535

    
536
    ("Default disk parameters",
537
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
538

    
539
    ("Instance policy - limits for instances",
540
     FormatPolicyInfo(result["ipolicy"], None, True)),
541
    ]
542

    
543
  PrintGenericInfo(info)
544
  return 0
545

    
546

    
547
def ClusterCopyFile(opts, args):
548
  """Copy a file from master to some nodes.
549

550
  @param opts: the command line options selected by the user
551
  @type args: list
552
  @param args: should contain only one element, the path of
553
      the file to be copied
554
  @rtype: int
555
  @return: the desired exit code
556

557
  """
558
  filename = args[0]
559
  if not os.path.exists(filename):
560
    raise errors.OpPrereqError("No such filename '%s'" % filename,
561
                               errors.ECODE_INVAL)
562

    
563
  cl = GetClient()
564

    
565
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
566

    
567
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
568
                           secondary_ips=opts.use_replication_network,
569
                           nodegroup=opts.nodegroup)
570

    
571
  srun = ssh.SshRunner(cluster_name)
572
  for node in results:
573
    if not srun.CopyFileToNode(node, filename):
574
      ToStderr("Copy of file %s to node %s failed", filename, node)
575

    
576
  return 0
577

    
578

    
579
def RunClusterCommand(opts, args):
580
  """Run a command on some nodes.
581

582
  @param opts: the command line options selected by the user
583
  @type args: list
584
  @param args: should contain the command to be run and its arguments
585
  @rtype: int
586
  @return: the desired exit code
587

588
  """
589
  cl = GetClient()
590

    
591
  command = " ".join(args)
592

    
593
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
594

    
595
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
596
                                                    "master_node"])
597

    
598
  srun = ssh.SshRunner(cluster_name=cluster_name)
599

    
600
  # Make sure master node is at list end
601
  if master_node in nodes:
602
    nodes.remove(master_node)
603
    nodes.append(master_node)
604

    
605
  for name in nodes:
606
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
607

    
608
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
609
      # Do not output anything for successful commands
610
      continue
611

    
612
    ToStdout("------------------------------------------------")
613
    if opts.show_machine_names:
614
      for line in result.output.splitlines():
615
        ToStdout("%s: %s", name, line)
616
    else:
617
      ToStdout("node: %s", name)
618
      ToStdout("%s", result.output)
619
    ToStdout("return code = %s", result.exit_code)
620

    
621
  return 0
622

    
623

    
624
def VerifyCluster(opts, args):
625
  """Verify integrity of cluster, performing various test on nodes.
626

627
  @param opts: the command line options selected by the user
628
  @type args: list
629
  @param args: should be an empty list
630
  @rtype: int
631
  @return: the desired exit code
632

633
  """
634
  skip_checks = []
635

    
636
  if opts.skip_nplusone_mem:
637
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
638

    
639
  cl = GetClient()
640

    
641
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
642
                               error_codes=opts.error_codes,
643
                               debug_simulate_errors=opts.simulate_errors,
644
                               skip_checks=skip_checks,
645
                               ignore_errors=opts.ignore_errors,
646
                               group_name=opts.nodegroup)
647
  result = SubmitOpCode(op, cl=cl, opts=opts)
648

    
649
  # Keep track of submitted jobs
650
  jex = JobExecutor(cl=cl, opts=opts)
651

    
652
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
653
    jex.AddJobId(None, status, job_id)
654

    
655
  results = jex.GetResults()
656

    
657
  (bad_jobs, bad_results) = \
658
    map(len,
659
        # Convert iterators to lists
660
        map(list,
661
            # Count errors
662
            map(compat.partial(itertools.ifilterfalse, bool),
663
                # Convert result to booleans in a tuple
664
                zip(*((job_success, len(op_results) == 1 and op_results[0])
665
                      for (job_success, op_results) in results)))))
666

    
667
  if bad_jobs == 0 and bad_results == 0:
668
    rcode = constants.EXIT_SUCCESS
669
  else:
670
    rcode = constants.EXIT_FAILURE
671
    if bad_jobs > 0:
672
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
673

    
674
  return rcode
675

    
676

    
677
def VerifyDisks(opts, args):
678
  """Verify integrity of cluster disks.
679

680
  @param opts: the command line options selected by the user
681
  @type args: list
682
  @param args: should be an empty list
683
  @rtype: int
684
  @return: the desired exit code
685

686
  """
687
  cl = GetClient()
688

    
689
  op = opcodes.OpClusterVerifyDisks()
690

    
691
  result = SubmitOpCode(op, cl=cl, opts=opts)
692

    
693
  # Keep track of submitted jobs
694
  jex = JobExecutor(cl=cl, opts=opts)
695

    
696
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
697
    jex.AddJobId(None, status, job_id)
698

    
699
  retcode = constants.EXIT_SUCCESS
700

    
701
  for (status, result) in jex.GetResults():
702
    if not status:
703
      ToStdout("Job failed: %s", result)
704
      continue
705

    
706
    ((bad_nodes, instances, missing), ) = result
707

    
708
    for node, text in bad_nodes.items():
709
      ToStdout("Error gathering data on node %s: %s",
710
               node, utils.SafeEncode(text[-400:]))
711
      retcode = constants.EXIT_FAILURE
712
      ToStdout("You need to fix these nodes first before fixing instances")
713

    
714
    for iname in instances:
715
      if iname in missing:
716
        continue
717
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
718
      try:
719
        ToStdout("Activating disks for instance '%s'", iname)
720
        SubmitOpCode(op, opts=opts, cl=cl)
721
      except errors.GenericError, err:
722
        nret, msg = FormatError(err)
723
        retcode |= nret
724
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
725

    
726
    if missing:
727
      for iname, ival in missing.iteritems():
728
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
729
        if all_missing:
730
          ToStdout("Instance %s cannot be verified as it lives on"
731
                   " broken nodes", iname)
732
        else:
733
          ToStdout("Instance %s has missing logical volumes:", iname)
734
          ival.sort()
735
          for node, vol in ival:
736
            if node in bad_nodes:
737
              ToStdout("\tbroken node %s /dev/%s", node, vol)
738
            else:
739
              ToStdout("\t%s /dev/%s", node, vol)
740

    
741
      ToStdout("You need to replace or recreate disks for all the above"
742
               " instances if this message persists after fixing broken nodes.")
743
      retcode = constants.EXIT_FAILURE
744
    elif not instances:
745
      ToStdout("No disks need to be activated.")
746

    
747
  return retcode
748

    
749

    
750
def RepairDiskSizes(opts, args):
751
  """Verify sizes of cluster disks.
752

753
  @param opts: the command line options selected by the user
754
  @type args: list
755
  @param args: optional list of instances to restrict check to
756
  @rtype: int
757
  @return: the desired exit code
758

759
  """
760
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
761
  SubmitOpCode(op, opts=opts)
762

    
763

    
764
@UsesRPC
765
def MasterFailover(opts, args):
766
  """Failover the master node.
767

768
  This command, when run on a non-master node, will cause the current
769
  master to cease being master, and the non-master to become new
770
  master.
771

772
  @param opts: the command line options selected by the user
773
  @type args: list
774
  @param args: should be an empty list
775
  @rtype: int
776
  @return: the desired exit code
777

778
  """
779
  if opts.no_voting and not opts.yes_do_it:
780
    usertext = ("This will perform the failover even if most other nodes"
781
                " are down, or if this node is outdated. This is dangerous"
782
                " as it can lead to a non-consistent cluster. Check the"
783
                " gnt-cluster(8) man page before proceeding. Continue?")
784
    if not AskUser(usertext):
785
      return 1
786

    
787
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
788

    
789

    
790
def MasterPing(opts, args):
791
  """Checks if the master is alive.
792

793
  @param opts: the command line options selected by the user
794
  @type args: list
795
  @param args: should be an empty list
796
  @rtype: int
797
  @return: the desired exit code
798

799
  """
800
  try:
801
    cl = GetClient()
802
    cl.QueryClusterInfo()
803
    return 0
804
  except Exception: # pylint: disable=W0703
805
    return 1
806

    
807

    
808
def SearchTags(opts, args):
809
  """Searches the tags on all the cluster.
810

811
  @param opts: the command line options selected by the user
812
  @type args: list
813
  @param args: should contain only one element, the tag pattern
814
  @rtype: int
815
  @return: the desired exit code
816

817
  """
818
  op = opcodes.OpTagsSearch(pattern=args[0])
819
  result = SubmitOpCode(op, opts=opts)
820
  if not result:
821
    return 1
822
  result = list(result)
823
  result.sort()
824
  for path, tag in result:
825
    ToStdout("%s %s", path, tag)
826

    
827

    
828
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
829
  """Reads and verifies an X509 certificate.
830

831
  @type cert_filename: string
832
  @param cert_filename: the path of the file containing the certificate to
833
                        verify encoded in PEM format
834
  @type verify_private_key: bool
835
  @param verify_private_key: whether to verify the private key in addition to
836
                             the public certificate
837
  @rtype: string
838
  @return: a string containing the PEM-encoded certificate.
839

840
  """
841
  try:
842
    pem = utils.ReadFile(cert_filename)
843
  except IOError, err:
844
    raise errors.X509CertError(cert_filename,
845
                               "Unable to read certificate: %s" % str(err))
846

    
847
  try:
848
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
849
  except Exception, err:
850
    raise errors.X509CertError(cert_filename,
851
                               "Unable to load certificate: %s" % str(err))
852

    
853
  if verify_private_key:
854
    try:
855
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
856
    except Exception, err:
857
      raise errors.X509CertError(cert_filename,
858
                                 "Unable to load private key: %s" % str(err))
859

    
860
  return pem
861

    
862

    
863
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
864
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
865
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
866
                 cds_filename, force):
867
  """Renews cluster certificates, keys and secrets.
868

869
  @type new_cluster_cert: bool
870
  @param new_cluster_cert: Whether to generate a new cluster certificate
871
  @type new_rapi_cert: bool
872
  @param new_rapi_cert: Whether to generate a new RAPI certificate
873
  @type rapi_cert_filename: string
874
  @param rapi_cert_filename: Path to file containing new RAPI certificate
875
  @type new_spice_cert: bool
876
  @param new_spice_cert: Whether to generate a new SPICE certificate
877
  @type spice_cert_filename: string
878
  @param spice_cert_filename: Path to file containing new SPICE certificate
879
  @type spice_cacert_filename: string
880
  @param spice_cacert_filename: Path to file containing the certificate of the
881
                                CA that signed the SPICE certificate
882
  @type new_confd_hmac_key: bool
883
  @param new_confd_hmac_key: Whether to generate a new HMAC key
884
  @type new_cds: bool
885
  @param new_cds: Whether to generate a new cluster domain secret
886
  @type cds_filename: string
887
  @param cds_filename: Path to file containing new cluster domain secret
888
  @type force: bool
889
  @param force: Whether to ask user for confirmation
890

891
  """
892
  if new_rapi_cert and rapi_cert_filename:
893
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
894
             " options can be specified at the same time.")
895
    return 1
896

    
897
  if new_cds and cds_filename:
898
    ToStderr("Only one of the --new-cluster-domain-secret and"
899
             " --cluster-domain-secret options can be specified at"
900
             " the same time.")
901
    return 1
902

    
903
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
904
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
905
             " and --spice-ca-certificate must not be used.")
906
    return 1
907

    
908
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
909
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
910
             " specified.")
911
    return 1
912

    
913
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
914
  try:
915
    if rapi_cert_filename:
916
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
917
    if spice_cert_filename:
918
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
919
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
920
  except errors.X509CertError, err:
921
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
922
    return 1
923

    
924
  if cds_filename:
925
    try:
926
      cds = utils.ReadFile(cds_filename)
927
    except Exception, err: # pylint: disable=W0703
928
      ToStderr("Can't load new cluster domain secret from %s: %s" %
929
               (cds_filename, str(err)))
930
      return 1
931
  else:
932
    cds = None
933

    
934
  if not force:
935
    usertext = ("This requires all daemons on all nodes to be restarted and"
936
                " may take some time. Continue?")
937
    if not AskUser(usertext):
938
      return 1
939

    
940
  def _RenewCryptoInner(ctx):
941
    ctx.feedback_fn("Updating certificates and keys")
942
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
943
                                    new_rapi_cert,
944
                                    new_spice_cert,
945
                                    new_confd_hmac_key,
946
                                    new_cds,
947
                                    rapi_cert_pem=rapi_cert_pem,
948
                                    spice_cert_pem=spice_cert_pem,
949
                                    spice_cacert_pem=spice_cacert_pem,
950
                                    cds=cds)
951

    
952
    files_to_copy = []
953

    
954
    if new_cluster_cert:
955
      files_to_copy.append(pathutils.NODED_CERT_FILE)
956

    
957
    if new_rapi_cert or rapi_cert_pem:
958
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
959

    
960
    if new_spice_cert or spice_cert_pem:
961
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
962
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
963

    
964
    if new_confd_hmac_key:
965
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
966

    
967
    if new_cds or cds:
968
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
969

    
970
    if files_to_copy:
971
      for node_name in ctx.nonmaster_nodes:
972
        ctx.feedback_fn("Copying %s to %s" %
973
                        (", ".join(files_to_copy), node_name))
974
        for file_name in files_to_copy:
975
          ctx.ssh.CopyFileToNode(node_name, file_name)
976

    
977
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
978

    
979
  ToStdout("All requested certificates and keys have been replaced."
980
           " Running \"gnt-cluster verify\" now is recommended.")
981

    
982
  return 0
983

    
984

    
985
def RenewCrypto(opts, args):
986
  """Renews cluster certificates, keys and secrets.
987

988
  """
989
  return _RenewCrypto(opts.new_cluster_cert,
990
                      opts.new_rapi_cert,
991
                      opts.rapi_cert,
992
                      opts.new_spice_cert,
993
                      opts.spice_cert,
994
                      opts.spice_cacert,
995
                      opts.new_confd_hmac_key,
996
                      opts.new_cluster_domain_secret,
997
                      opts.cluster_domain_secret,
998
                      opts.force)
999

    
1000

    
1001
def _GetEnabledDiskTemplates(opts):
1002
  """Determine the list of enabled disk templates.
1003

1004
  """
1005
  if opts.enabled_disk_templates:
1006
    return opts.enabled_disk_templates.split(",")
1007
  else:
1008
    return None
1009

    
1010

    
1011
def _GetVgName(opts, enabled_disk_templates):
1012
  """Determine the volume group name.
1013

1014
  @type enabled_disk_templates: list of strings
1015
  @param enabled_disk_templates: cluster-wide enabled disk-templates
1016

1017
  """
1018
  # consistency between vg name and enabled disk templates
1019
  vg_name = None
1020
  if opts.vg_name is not None:
1021
    vg_name = opts.vg_name
1022
  if enabled_disk_templates:
1023
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1024
      ToStdout("You specified a volume group with --vg-name, but you did not"
1025
               " enable any of the following lvm-based disk templates: %s" %
1026
               utils.CommaJoin(utils.GetLvmDiskTemplates()))
1027
  return vg_name
1028

    
1029

    
1030
def _GetDrbdHelper(opts):
1031
  """Determine the DRBD usermode helper.
1032

1033
  """
1034
  drbd_helper = opts.drbd_helper
1035
  if not opts.drbd_storage and opts.drbd_helper:
1036
    raise errors.OpPrereqError(
1037
        "Options --no-drbd-storage and --drbd-usermode-helper conflict.")
1038

    
1039
  if not opts.drbd_storage:
1040
    drbd_helper = ""
1041
  return drbd_helper
1042

    
1043

    
1044
def SetClusterParams(opts, args):
1045
  """Modify the cluster.
1046

1047
  @param opts: the command line options selected by the user
1048
  @type args: list
1049
  @param args: should be an empty list
1050
  @rtype: int
1051
  @return: the desired exit code
1052

1053
  """
1054
  if not (opts.vg_name is not None or opts.drbd_helper or
1055
          opts.enabled_hypervisors or opts.hvparams or
1056
          opts.beparams or opts.nicparams or
1057
          opts.ndparams or opts.diskparams or
1058
          opts.candidate_pool_size is not None or
1059
          opts.uid_pool is not None or
1060
          opts.maintain_node_health is not None or
1061
          opts.add_uids is not None or
1062
          opts.remove_uids is not None or
1063
          opts.default_iallocator is not None or
1064
          opts.reserved_lvs is not None or
1065
          opts.master_netdev is not None or
1066
          opts.master_netmask is not None or
1067
          opts.use_external_mip_script is not None or
1068
          opts.prealloc_wipe_disks is not None or
1069
          opts.hv_state or
1070
          opts.enabled_disk_templates or
1071
          opts.disk_state or
1072
          opts.ipolicy_bounds_specs is not None or
1073
          opts.ipolicy_std_specs is not None or
1074
          opts.ipolicy_disk_templates is not None or
1075
          opts.ipolicy_vcpu_ratio is not None or
1076
          opts.ipolicy_spindle_ratio is not None or
1077
          opts.modify_etc_hosts is not None or
1078
          opts.file_storage_dir is not None):
1079
    ToStderr("Please give at least one of the parameters.")
1080
    return 1
1081

    
1082
  if _CheckNoLvmStorageOptDeprecated(opts):
1083
    return 1
1084

    
1085
  enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1086
  vg_name = _GetVgName(opts, enabled_disk_templates)
1087

    
1088
  try:
1089
    drbd_helper = _GetDrbdHelper(opts)
1090
  except errors.OpPrereqError, e:
1091
    ToStderr(str(e))
1092
    return 1
1093

    
1094
  hvlist = opts.enabled_hypervisors
1095
  if hvlist is not None:
1096
    hvlist = hvlist.split(",")
1097

    
1098
  # a list of (name, dict) we can pass directly to dict() (or [])
1099
  hvparams = dict(opts.hvparams)
1100
  for hv_params in hvparams.values():
1101
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1102

    
1103
  diskparams = dict(opts.diskparams)
1104

    
1105
  for dt_params in diskparams.values():
1106
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1107

    
1108
  beparams = opts.beparams
1109
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1110

    
1111
  nicparams = opts.nicparams
1112
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1113

    
1114
  ndparams = opts.ndparams
1115
  if ndparams is not None:
1116
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1117

    
1118
  ipolicy = CreateIPolicyFromOpts(
1119
    minmax_ispecs=opts.ipolicy_bounds_specs,
1120
    std_ispecs=opts.ipolicy_std_specs,
1121
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1122
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1123
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1124
    )
1125

    
1126
  mnh = opts.maintain_node_health
1127

    
1128
  uid_pool = opts.uid_pool
1129
  if uid_pool is not None:
1130
    uid_pool = uidpool.ParseUidPool(uid_pool)
1131

    
1132
  add_uids = opts.add_uids
1133
  if add_uids is not None:
1134
    add_uids = uidpool.ParseUidPool(add_uids)
1135

    
1136
  remove_uids = opts.remove_uids
1137
  if remove_uids is not None:
1138
    remove_uids = uidpool.ParseUidPool(remove_uids)
1139

    
1140
  if opts.reserved_lvs is not None:
1141
    if opts.reserved_lvs == "":
1142
      opts.reserved_lvs = []
1143
    else:
1144
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1145

    
1146
  if opts.master_netmask is not None:
1147
    try:
1148
      opts.master_netmask = int(opts.master_netmask)
1149
    except ValueError:
1150
      ToStderr("The --master-netmask option expects an int parameter.")
1151
      return 1
1152

    
1153
  ext_ip_script = opts.use_external_mip_script
1154

    
1155
  if opts.disk_state:
1156
    disk_state = utils.FlatToDict(opts.disk_state)
1157
  else:
1158
    disk_state = {}
1159

    
1160
  hv_state = dict(opts.hv_state)
1161

    
1162
  op = opcodes.OpClusterSetParams(
1163
    vg_name=vg_name,
1164
    drbd_helper=drbd_helper,
1165
    enabled_hypervisors=hvlist,
1166
    hvparams=hvparams,
1167
    os_hvp=None,
1168
    beparams=beparams,
1169
    nicparams=nicparams,
1170
    ndparams=ndparams,
1171
    diskparams=diskparams,
1172
    ipolicy=ipolicy,
1173
    candidate_pool_size=opts.candidate_pool_size,
1174
    maintain_node_health=mnh,
1175
    modify_etc_hosts=opts.modify_etc_hosts,
1176
    uid_pool=uid_pool,
1177
    add_uids=add_uids,
1178
    remove_uids=remove_uids,
1179
    default_iallocator=opts.default_iallocator,
1180
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1181
    master_netdev=opts.master_netdev,
1182
    master_netmask=opts.master_netmask,
1183
    reserved_lvs=opts.reserved_lvs,
1184
    use_external_mip_script=ext_ip_script,
1185
    hv_state=hv_state,
1186
    disk_state=disk_state,
1187
    enabled_disk_templates=enabled_disk_templates,
1188
    force=opts.force,
1189
    file_storage_dir=opts.file_storage_dir,
1190
    )
1191
  SubmitOrSend(op, opts)
1192
  return 0
1193

    
1194

    
1195
def QueueOps(opts, args):
1196
  """Queue operations.
1197

1198
  @param opts: the command line options selected by the user
1199
  @type args: list
1200
  @param args: should contain only one element, the subcommand
1201
  @rtype: int
1202
  @return: the desired exit code
1203

1204
  """
1205
  command = args[0]
1206
  client = GetClient()
1207
  if command in ("drain", "undrain"):
1208
    drain_flag = command == "drain"
1209
    client.SetQueueDrainFlag(drain_flag)
1210
  elif command == "info":
1211
    result = client.QueryConfigValues(["drain_flag"])
1212
    if result[0]:
1213
      val = "set"
1214
    else:
1215
      val = "unset"
1216
    ToStdout("The drain flag is %s" % val)
1217
  else:
1218
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1219
                               errors.ECODE_INVAL)
1220

    
1221
  return 0
1222

    
1223

    
1224
def _ShowWatcherPause(until):
1225
  if until is None or until < time.time():
1226
    ToStdout("The watcher is not paused.")
1227
  else:
1228
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1229

    
1230

    
1231
def WatcherOps(opts, args):
1232
  """Watcher operations.
1233

1234
  @param opts: the command line options selected by the user
1235
  @type args: list
1236
  @param args: should contain only one element, the subcommand
1237
  @rtype: int
1238
  @return: the desired exit code
1239

1240
  """
1241
  command = args[0]
1242
  client = GetClient()
1243

    
1244
  if command == "continue":
1245
    client.SetWatcherPause(None)
1246
    ToStdout("The watcher is no longer paused.")
1247

    
1248
  elif command == "pause":
1249
    if len(args) < 2:
1250
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1251

    
1252
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1253
    _ShowWatcherPause(result)
1254

    
1255
  elif command == "info":
1256
    result = client.QueryConfigValues(["watcher_pause"])
1257
    _ShowWatcherPause(result[0])
1258

    
1259
  else:
1260
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1261
                               errors.ECODE_INVAL)
1262

    
1263
  return 0
1264

    
1265

    
1266
def _OobPower(opts, node_list, power):
1267
  """Puts the node in the list to desired power state.
1268

1269
  @param opts: The command line options selected by the user
1270
  @param node_list: The list of nodes to operate on
1271
  @param power: True if they should be powered on, False otherwise
1272
  @return: The success of the operation (none failed)
1273

1274
  """
1275
  if power:
1276
    command = constants.OOB_POWER_ON
1277
  else:
1278
    command = constants.OOB_POWER_OFF
1279

    
1280
  op = opcodes.OpOobCommand(node_names=node_list,
1281
                            command=command,
1282
                            ignore_status=True,
1283
                            timeout=opts.oob_timeout,
1284
                            power_delay=opts.power_delay)
1285
  result = SubmitOpCode(op, opts=opts)
1286
  errs = 0
1287
  for node_result in result:
1288
    (node_tuple, data_tuple) = node_result
1289
    (_, node_name) = node_tuple
1290
    (data_status, _) = data_tuple
1291
    if data_status != constants.RS_NORMAL:
1292
      assert data_status != constants.RS_UNAVAIL
1293
      errs += 1
1294
      ToStderr("There was a problem changing power for %s, please investigate",
1295
               node_name)
1296

    
1297
  if errs > 0:
1298
    return False
1299

    
1300
  return True
1301

    
1302

    
1303
def _InstanceStart(opts, inst_list, start, no_remember=False):
1304
  """Puts the instances in the list to desired state.
1305

1306
  @param opts: The command line options selected by the user
1307
  @param inst_list: The list of instances to operate on
1308
  @param start: True if they should be started, False for shutdown
1309
  @param no_remember: If the instance state should be remembered
1310
  @return: The success of the operation (none failed)
1311

1312
  """
1313
  if start:
1314
    opcls = opcodes.OpInstanceStartup
1315
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1316
  else:
1317
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1318
                           timeout=opts.shutdown_timeout,
1319
                           no_remember=no_remember)
1320
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1321

    
1322
  jex = JobExecutor(opts=opts)
1323

    
1324
  for inst in inst_list:
1325
    ToStdout("Submit %s of instance %s", text_submit, inst)
1326
    op = opcls(instance_name=inst)
1327
    jex.QueueJob(inst, op)
1328

    
1329
  results = jex.GetResults()
1330
  bad_cnt = len([1 for (success, _) in results if not success])
1331

    
1332
  if bad_cnt == 0:
1333
    ToStdout("All instances have been %s successfully", text_success)
1334
  else:
1335
    ToStderr("There were errors while %s instances:\n"
1336
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1337
             len(results))
1338
    return False
1339

    
1340
  return True
1341

    
1342

    
1343
class _RunWhenNodesReachableHelper:
1344
  """Helper class to make shared internal state sharing easier.
1345

1346
  @ivar success: Indicates if all action_cb calls were successful
1347

1348
  """
1349
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1350
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1351
    """Init the object.
1352

1353
    @param node_list: The list of nodes to be reachable
1354
    @param action_cb: Callback called when a new host is reachable
1355
    @type node2ip: dict
1356
    @param node2ip: Node to ip mapping
1357
    @param port: The port to use for the TCP ping
1358
    @param feedback_fn: The function used for feedback
1359
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1360
    @param _sleep_fn: Function to sleep (for unittest use only)
1361

1362
    """
1363
    self.down = set(node_list)
1364
    self.up = set()
1365
    self.node2ip = node2ip
1366
    self.success = True
1367
    self.action_cb = action_cb
1368
    self.port = port
1369
    self.feedback_fn = feedback_fn
1370
    self._ping_fn = _ping_fn
1371
    self._sleep_fn = _sleep_fn
1372

    
1373
  def __call__(self):
1374
    """When called we run action_cb.
1375

1376
    @raises utils.RetryAgain: When there are still down nodes
1377

1378
    """
1379
    if not self.action_cb(self.up):
1380
      self.success = False
1381

    
1382
    if self.down:
1383
      raise utils.RetryAgain()
1384
    else:
1385
      return self.success
1386

    
1387
  def Wait(self, secs):
1388
    """Checks if a host is up or waits remaining seconds.
1389

1390
    @param secs: The secs remaining
1391

1392
    """
1393
    start = time.time()
1394
    for node in self.down:
1395
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1396
                       live_port_needed=True):
1397
        self.feedback_fn("Node %s became available" % node)
1398
        self.up.add(node)
1399
        self.down -= self.up
1400
        # If we have a node available there is the possibility to run the
1401
        # action callback successfully, therefore we don't wait and return
1402
        return
1403

    
1404
    self._sleep_fn(max(0.0, start + secs - time.time()))
1405

    
1406

    
1407
def _RunWhenNodesReachable(node_list, action_cb, interval):
1408
  """Run action_cb when nodes become reachable.
1409

1410
  @param node_list: The list of nodes to be reachable
1411
  @param action_cb: Callback called when a new host is reachable
1412
  @param interval: The earliest time to retry
1413

1414
  """
1415
  client = GetClient()
1416
  cluster_info = client.QueryClusterInfo()
1417
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1418
    family = netutils.IPAddress.family
1419
  else:
1420
    family = netutils.IP6Address.family
1421

    
1422
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1423
                 for node in node_list)
1424

    
1425
  port = netutils.GetDaemonPort(constants.NODED)
1426
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1427
                                        ToStdout)
1428

    
1429
  try:
1430
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1431
                       wait_fn=helper.Wait)
1432
  except utils.RetryTimeout:
1433
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1434
             " again:\n  - %s", "  - ".join(helper.down))
1435
    return False
1436

    
1437

    
1438
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1439
                          _instance_start_fn=_InstanceStart):
1440
  """Start the instances conditional based on node_states.
1441

1442
  @param opts: The command line options selected by the user
1443
  @param inst_map: A dict of inst -> nodes mapping
1444
  @param nodes_online: A list of nodes online
1445
  @param _instance_start_fn: Callback to start instances (unittest use only)
1446
  @return: Success of the operation on all instances
1447

1448
  """
1449
  start_inst_list = []
1450
  for (inst, nodes) in inst_map.items():
1451
    if not (nodes - nodes_online):
1452
      # All nodes the instance lives on are back online
1453
      start_inst_list.append(inst)
1454

    
1455
  for inst in start_inst_list:
1456
    del inst_map[inst]
1457

    
1458
  if start_inst_list:
1459
    return _instance_start_fn(opts, start_inst_list, True)
1460

    
1461
  return True
1462

    
1463

    
1464
def _EpoOn(opts, full_node_list, node_list, inst_map):
1465
  """Does the actual power on.
1466

1467
  @param opts: The command line options selected by the user
1468
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1469
                         OOB)
1470
  @param node_list: The list of nodes to operate on (all need to support OOB)
1471
  @param inst_map: A dict of inst -> nodes mapping
1472
  @return: The desired exit status
1473

1474
  """
1475
  if node_list and not _OobPower(opts, node_list, False):
1476
    ToStderr("Not all nodes seem to get back up, investigate and start"
1477
             " manually if needed")
1478

    
1479
  # Wait for the nodes to be back up
1480
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1481

    
1482
  ToStdout("Waiting until all nodes are available again")
1483
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1484
    ToStderr("Please investigate and start stopped instances manually")
1485
    return constants.EXIT_FAILURE
1486

    
1487
  return constants.EXIT_SUCCESS
1488

    
1489

    
1490
def _EpoOff(opts, node_list, inst_map):
1491
  """Does the actual power off.
1492

1493
  @param opts: The command line options selected by the user
1494
  @param node_list: The list of nodes to operate on (all need to support OOB)
1495
  @param inst_map: A dict of inst -> nodes mapping
1496
  @return: The desired exit status
1497

1498
  """
1499
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1500
    ToStderr("Please investigate and stop instances manually before continuing")
1501
    return constants.EXIT_FAILURE
1502

    
1503
  if not node_list:
1504
    return constants.EXIT_SUCCESS
1505

    
1506
  if _OobPower(opts, node_list, False):
1507
    return constants.EXIT_SUCCESS
1508
  else:
1509
    return constants.EXIT_FAILURE
1510

    
1511

    
1512
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1513
        _confirm_fn=ConfirmOperation,
1514
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1515
  """EPO operations.
1516

1517
  @param opts: the command line options selected by the user
1518
  @type args: list
1519
  @param args: should contain only one element, the subcommand
1520
  @rtype: int
1521
  @return: the desired exit code
1522

1523
  """
1524
  if opts.groups and opts.show_all:
1525
    _stderr_fn("Only one of --groups or --all are allowed")
1526
    return constants.EXIT_FAILURE
1527
  elif args and opts.show_all:
1528
    _stderr_fn("Arguments in combination with --all are not allowed")
1529
    return constants.EXIT_FAILURE
1530

    
1531
  if cl is None:
1532
    cl = GetClient()
1533

    
1534
  if opts.groups:
1535
    node_query_list = \
1536
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1537
  else:
1538
    node_query_list = args
1539

    
1540
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1541
                                           "sinst_list", "powered", "offline"],
1542
                         False)
1543

    
1544
  all_nodes = map(compat.fst, result)
1545
  node_list = []
1546
  inst_map = {}
1547
  for (node, master, pinsts, sinsts, powered, offline) in result:
1548
    if not offline:
1549
      for inst in (pinsts + sinsts):
1550
        if inst in inst_map:
1551
          if not master:
1552
            inst_map[inst].add(node)
1553
        elif master:
1554
          inst_map[inst] = set()
1555
        else:
1556
          inst_map[inst] = set([node])
1557

    
1558
    if master and opts.on:
1559
      # We ignore the master for turning on the machines, in fact we are
1560
      # already operating on the master at this point :)
1561
      continue
1562
    elif master and not opts.show_all:
1563
      _stderr_fn("%s is the master node, please do a master-failover to another"
1564
                 " node not affected by the EPO or use --all if you intend to"
1565
                 " shutdown the whole cluster", node)
1566
      return constants.EXIT_FAILURE
1567
    elif powered is None:
1568
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1569
                 " handled in a fully automated manner", node)
1570
    elif powered == opts.on:
1571
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1572
    elif not offline or (offline and powered):
1573
      node_list.append(node)
1574

    
1575
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1576
    return constants.EXIT_FAILURE
1577

    
1578
  if opts.on:
1579
    return _on_fn(opts, all_nodes, node_list, inst_map)
1580
  else:
1581
    return _off_fn(opts, node_list, inst_map)
1582

    
1583

    
1584
def _GetCreateCommand(info):
1585
  buf = StringIO()
1586
  buf.write("gnt-cluster init")
1587
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1588
  buf.write(" ")
1589
  buf.write(info["name"])
1590
  return buf.getvalue()
1591

    
1592

    
1593
def ShowCreateCommand(opts, args):
1594
  """Shows the command that can be used to re-create the cluster.
1595

1596
  Currently it works only for ipolicy specs.
1597

1598
  """
1599
  cl = GetClient(query=True)
1600
  result = cl.QueryClusterInfo()
1601
  ToStdout(_GetCreateCommand(result))
1602

    
1603

    
1604
commands = {
1605
  "init": (
1606
    InitCluster, [ArgHost(min=1, max=1)],
1607
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1608
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1609
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1610
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1611
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1612
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1613
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1614
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
1615
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
1616
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1617
  "destroy": (
1618
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1619
    "", "Destroy cluster"),
1620
  "rename": (
1621
    RenameCluster, [ArgHost(min=1, max=1)],
1622
    [FORCE_OPT, DRY_RUN_OPT],
1623
    "<new_name>",
1624
    "Renames the cluster"),
1625
  "redist-conf": (
1626
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS + [DRY_RUN_OPT, PRIORITY_OPT],
1627
    "", "Forces a push of the configuration file and ssconf files"
1628
    " to the nodes in the cluster"),
1629
  "verify": (
1630
    VerifyCluster, ARGS_NONE,
1631
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1632
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1633
    "", "Does a check on the cluster configuration"),
1634
  "verify-disks": (
1635
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1636
    "", "Does a check on the cluster disk status"),
1637
  "repair-disk-sizes": (
1638
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1639
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1640
  "master-failover": (
1641
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1642
    "", "Makes the current node the master"),
1643
  "master-ping": (
1644
    MasterPing, ARGS_NONE, [],
1645
    "", "Checks if the master is alive"),
1646
  "version": (
1647
    ShowClusterVersion, ARGS_NONE, [],
1648
    "", "Shows the cluster version"),
1649
  "getmaster": (
1650
    ShowClusterMaster, ARGS_NONE, [],
1651
    "", "Shows the cluster master"),
1652
  "copyfile": (
1653
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1654
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1655
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1656
  "command": (
1657
    RunClusterCommand, [ArgCommand(min=1)],
1658
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1659
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1660
  "info": (
1661
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1662
    "[--roman]", "Show cluster configuration"),
1663
  "list-tags": (
1664
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1665
  "add-tags": (
1666
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
1667
    "tag...", "Add tags to the cluster"),
1668
  "remove-tags": (
1669
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
1670
    "tag...", "Remove tags from the cluster"),
1671
  "search-tags": (
1672
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1673
    "Searches the tags on all objects on"
1674
    " the cluster for a given pattern (regex)"),
1675
  "queue": (
1676
    QueueOps,
1677
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1678
    [], "drain|undrain|info", "Change queue properties"),
1679
  "watcher": (
1680
    WatcherOps,
1681
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1682
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1683
    [],
1684
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1685
  "modify": (
1686
    SetClusterParams, ARGS_NONE,
1687
    [FORCE_OPT,
1688
     BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1689
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1690
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1691
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1692
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1693
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1694
     DISK_STATE_OPT] + SUBMIT_OPTS +
1695
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
1696
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
1697
    "[opts...]",
1698
    "Alters the parameters of the cluster"),
1699
  "renew-crypto": (
1700
    RenewCrypto, ARGS_NONE,
1701
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1702
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1703
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1704
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1705
    "[opts...]",
1706
    "Renews cluster certificates, keys and secrets"),
1707
  "epo": (
1708
    Epo, [ArgUnknown()],
1709
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1710
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1711
    "[opts...] [args]",
1712
    "Performs an emergency power-off on given args"),
1713
  "activate-master-ip": (
1714
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1715
  "deactivate-master-ip": (
1716
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1717
    "Deactivates the master IP"),
1718
  "show-ispecs-cmd": (
1719
    ShowCreateCommand, ARGS_NONE, [], "",
1720
    "Show the command line to re-create the cluster"),
1721
  }
1722

    
1723

    
1724
#: dictionary with aliases for commands
1725
aliases = {
1726
  "masterfailover": "master-failover",
1727
  "show": "info",
1728
}
1729

    
1730

    
1731
def Main():
1732
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1733
                     aliases=aliases)