Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ d9fdd354

History | View | Annotate | Download (53.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os.path
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import pathutils
47

    
48

    
49
ON_OPT = cli_option("--on", default=False,
50
                    action="store_true", dest="on",
51
                    help="Recover from an EPO")
52

    
53
GROUPS_OPT = cli_option("--groups", default=False,
54
                        action="store_true", dest="groups",
55
                        help="Arguments are node groups instead of nodes")
56

    
57
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
58
                            help="Override interactive check for --no-voting",
59
                            default=False, action="store_true")
60

    
61
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
62
_EPO_PING_TIMEOUT = 1 # 1 second
63
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
64

    
65

    
66
def _CheckNoLvmStorageOptDeprecated(opts):
67
  """Checks if the legacy option '--no-lvm-storage' is used.
68

69
  """
70
  if not opts.lvm_storage:
71
    ToStderr("The option --no-lvm-storage is no longer supported. If you want"
72
             " to disable lvm-based storage cluster-wide, use the option"
73
             " --enabled-disk-templates to disable all of these lvm-base disk "
74
             "  templates: %s" %
75
             utils.CommaJoin(utils.GetLvmDiskTemplates()))
76
    return 1
77

    
78

    
79
@UsesRPC
80
def InitCluster(opts, args):
81
  """Initialize the cluster.
82

83
  @param opts: the command line options selected by the user
84
  @type args: list
85
  @param args: should contain only one element, the desired
86
      cluster name
87
  @rtype: int
88
  @return: the desired exit code
89

90
  """
91
  if _CheckNoLvmStorageOptDeprecated(opts):
92
    return 1
93
  enabled_disk_templates = opts.enabled_disk_templates
94
  if enabled_disk_templates:
95
    enabled_disk_templates = enabled_disk_templates.split(",")
96
  else:
97
    enabled_disk_templates = constants.DEFAULT_ENABLED_DISK_TEMPLATES
98

    
99
  vg_name = None
100
  if opts.vg_name is not None:
101
    vg_name = opts.vg_name
102
    if vg_name:
103
      if not utils.IsLvmEnabled(enabled_disk_templates):
104
        ToStdout("You specified a volume group with --vg-name, but you did not"
105
                 " enable any disk template that uses lvm.")
106
    else:
107
      if utils.IsLvmEnabled(enabled_disk_templates):
108
        ToStderr("LVM disk templates are enabled, but vg name not set.")
109
        return 1
110
  else:
111
    if utils.IsLvmEnabled(enabled_disk_templates):
112
      vg_name = constants.DEFAULT_VG
113

    
114
  if not opts.drbd_storage and opts.drbd_helper:
115
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
116
    return 1
117

    
118
  drbd_helper = opts.drbd_helper
119
  if opts.drbd_storage and not opts.drbd_helper:
120
    drbd_helper = constants.DEFAULT_DRBD_HELPER
121

    
122
  master_netdev = opts.master_netdev
123
  if master_netdev is None:
124
    master_netdev = constants.DEFAULT_BRIDGE
125

    
126
  hvlist = opts.enabled_hypervisors
127
  if hvlist is None:
128
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
129
  hvlist = hvlist.split(",")
130

    
131
  hvparams = dict(opts.hvparams)
132
  beparams = opts.beparams
133
  nicparams = opts.nicparams
134

    
135
  diskparams = dict(opts.diskparams)
136

    
137
  # check the disk template types here, as we cannot rely on the type check done
138
  # by the opcode parameter types
139
  diskparams_keys = set(diskparams.keys())
140
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
141
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
142
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
143
    return 1
144

    
145
  # prepare beparams dict
146
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
147
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
148

    
149
  # prepare nicparams dict
150
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
151
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
152

    
153
  # prepare ndparams dict
154
  if opts.ndparams is None:
155
    ndparams = dict(constants.NDC_DEFAULTS)
156
  else:
157
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
158
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
159

    
160
  # prepare hvparams dict
161
  for hv in constants.HYPER_TYPES:
162
    if hv not in hvparams:
163
      hvparams[hv] = {}
164
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
165
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
166

    
167
  # prepare diskparams dict
168
  for templ in constants.DISK_TEMPLATES:
169
    if templ not in diskparams:
170
      diskparams[templ] = {}
171
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
172
                                         diskparams[templ])
173
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
174

    
175
  # prepare ipolicy dict
176
  ipolicy = CreateIPolicyFromOpts(
177
    ispecs_mem_size=opts.ispecs_mem_size,
178
    ispecs_cpu_count=opts.ispecs_cpu_count,
179
    ispecs_disk_count=opts.ispecs_disk_count,
180
    ispecs_disk_size=opts.ispecs_disk_size,
181
    ispecs_nic_count=opts.ispecs_nic_count,
182
    minmax_ispecs=opts.ipolicy_bounds_specs,
183
    std_ispecs=opts.ipolicy_std_specs,
184
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
185
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
186
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
187
    fill_all=True)
188

    
189
  if opts.candidate_pool_size is None:
190
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
191

    
192
  if opts.mac_prefix is None:
193
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
194

    
195
  uid_pool = opts.uid_pool
196
  if uid_pool is not None:
197
    uid_pool = uidpool.ParseUidPool(uid_pool)
198

    
199
  if opts.prealloc_wipe_disks is None:
200
    opts.prealloc_wipe_disks = False
201

    
202
  external_ip_setup_script = opts.use_external_mip_script
203
  if external_ip_setup_script is None:
204
    external_ip_setup_script = False
205

    
206
  try:
207
    primary_ip_version = int(opts.primary_ip_version)
208
  except (ValueError, TypeError), err:
209
    ToStderr("Invalid primary ip version value: %s" % str(err))
210
    return 1
211

    
212
  master_netmask = opts.master_netmask
213
  try:
214
    if master_netmask is not None:
215
      master_netmask = int(master_netmask)
216
  except (ValueError, TypeError), err:
217
    ToStderr("Invalid master netmask value: %s" % str(err))
218
    return 1
219

    
220
  if opts.disk_state:
221
    disk_state = utils.FlatToDict(opts.disk_state)
222
  else:
223
    disk_state = {}
224

    
225
  hv_state = dict(opts.hv_state)
226

    
227
  bootstrap.InitCluster(cluster_name=args[0],
228
                        secondary_ip=opts.secondary_ip,
229
                        vg_name=vg_name,
230
                        mac_prefix=opts.mac_prefix,
231
                        master_netmask=master_netmask,
232
                        master_netdev=master_netdev,
233
                        file_storage_dir=opts.file_storage_dir,
234
                        shared_file_storage_dir=opts.shared_file_storage_dir,
235
                        enabled_hypervisors=hvlist,
236
                        hvparams=hvparams,
237
                        beparams=beparams,
238
                        nicparams=nicparams,
239
                        ndparams=ndparams,
240
                        diskparams=diskparams,
241
                        ipolicy=ipolicy,
242
                        candidate_pool_size=opts.candidate_pool_size,
243
                        modify_etc_hosts=opts.modify_etc_hosts,
244
                        modify_ssh_setup=opts.modify_ssh_setup,
245
                        maintain_node_health=opts.maintain_node_health,
246
                        drbd_helper=drbd_helper,
247
                        uid_pool=uid_pool,
248
                        default_iallocator=opts.default_iallocator,
249
                        primary_ip_version=primary_ip_version,
250
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
251
                        use_external_mip_script=external_ip_setup_script,
252
                        hv_state=hv_state,
253
                        disk_state=disk_state,
254
                        enabled_disk_templates=enabled_disk_templates,
255
                        )
256
  op = opcodes.OpClusterPostInit()
257
  SubmitOpCode(op, opts=opts)
258
  return 0
259

    
260

    
261
@UsesRPC
262
def DestroyCluster(opts, args):
263
  """Destroy the cluster.
264

265
  @param opts: the command line options selected by the user
266
  @type args: list
267
  @param args: should be an empty list
268
  @rtype: int
269
  @return: the desired exit code
270

271
  """
272
  if not opts.yes_do_it:
273
    ToStderr("Destroying a cluster is irreversible. If you really want"
274
             " destroy this cluster, supply the --yes-do-it option.")
275
    return 1
276

    
277
  op = opcodes.OpClusterDestroy()
278
  master_uuid = SubmitOpCode(op, opts=opts)
279
  # if we reached this, the opcode didn't fail; we can proceed to
280
  # shutdown all the daemons
281
  bootstrap.FinalizeClusterDestroy(master_uuid)
282
  return 0
283

    
284

    
285
def RenameCluster(opts, args):
286
  """Rename the cluster.
287

288
  @param opts: the command line options selected by the user
289
  @type args: list
290
  @param args: should contain only one element, the new cluster name
291
  @rtype: int
292
  @return: the desired exit code
293

294
  """
295
  cl = GetClient()
296

    
297
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
298

    
299
  new_name = args[0]
300
  if not opts.force:
301
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
302
                " connected over the network to the cluster name, the"
303
                " operation is very dangerous as the IP address will be"
304
                " removed from the node and the change may not go through."
305
                " Continue?") % (cluster_name, new_name)
306
    if not AskUser(usertext):
307
      return 1
308

    
309
  op = opcodes.OpClusterRename(name=new_name)
310
  result = SubmitOpCode(op, opts=opts, cl=cl)
311

    
312
  if result:
313
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
314

    
315
  return 0
316

    
317

    
318
def ActivateMasterIp(opts, args):
319
  """Activates the master IP.
320

321
  """
322
  op = opcodes.OpClusterActivateMasterIp()
323
  SubmitOpCode(op)
324
  return 0
325

    
326

    
327
def DeactivateMasterIp(opts, args):
328
  """Deactivates the master IP.
329

330
  """
331
  if not opts.confirm:
332
    usertext = ("This will disable the master IP. All the open connections to"
333
                " the master IP will be closed. To reach the master you will"
334
                " need to use its node IP."
335
                " Continue?")
336
    if not AskUser(usertext):
337
      return 1
338

    
339
  op = opcodes.OpClusterDeactivateMasterIp()
340
  SubmitOpCode(op)
341
  return 0
342

    
343

    
344
def RedistributeConfig(opts, args):
345
  """Forces push of the cluster configuration.
346

347
  @param opts: the command line options selected by the user
348
  @type args: list
349
  @param args: empty list
350
  @rtype: int
351
  @return: the desired exit code
352

353
  """
354
  op = opcodes.OpClusterRedistConf()
355
  SubmitOrSend(op, opts)
356
  return 0
357

    
358

    
359
def ShowClusterVersion(opts, args):
360
  """Write version of ganeti software to the standard output.
361

362
  @param opts: the command line options selected by the user
363
  @type args: list
364
  @param args: should be an empty list
365
  @rtype: int
366
  @return: the desired exit code
367

368
  """
369
  cl = GetClient(query=True)
370
  result = cl.QueryClusterInfo()
371
  ToStdout("Software version: %s", result["software_version"])
372
  ToStdout("Internode protocol: %s", result["protocol_version"])
373
  ToStdout("Configuration format: %s", result["config_version"])
374
  ToStdout("OS api version: %s", result["os_api_version"])
375
  ToStdout("Export interface: %s", result["export_version"])
376
  ToStdout("VCS version: %s", result["vcs_version"])
377
  return 0
378

    
379

    
380
def ShowClusterMaster(opts, args):
381
  """Write name of master node to the standard output.
382

383
  @param opts: the command line options selected by the user
384
  @type args: list
385
  @param args: should be an empty list
386
  @rtype: int
387
  @return: the desired exit code
388

389
  """
390
  master = bootstrap.GetMaster()
391
  ToStdout(master)
392
  return 0
393

    
394

    
395
def _FormatGroupedParams(paramsdict, roman=False):
396
  """Format Grouped parameters (be, nic, disk) by group.
397

398
  @type paramsdict: dict of dicts
399
  @param paramsdict: {group: {param: value, ...}, ...}
400
  @rtype: dict of dicts
401
  @return: copy of the input dictionaries with strings as values
402

403
  """
404
  ret = {}
405
  for (item, val) in paramsdict.items():
406
    if isinstance(val, dict):
407
      ret[item] = _FormatGroupedParams(val, roman=roman)
408
    elif roman and isinstance(val, int):
409
      ret[item] = compat.TryToRoman(val)
410
    else:
411
      ret[item] = str(val)
412
  return ret
413

    
414

    
415
def ShowClusterConfig(opts, args):
416
  """Shows cluster information.
417

418
  @param opts: the command line options selected by the user
419
  @type args: list
420
  @param args: should be an empty list
421
  @rtype: int
422
  @return: the desired exit code
423

424
  """
425
  cl = GetClient(query=True)
426
  result = cl.QueryClusterInfo()
427

    
428
  if result["tags"]:
429
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
430
  else:
431
    tags = "(none)"
432
  if result["reserved_lvs"]:
433
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
434
  else:
435
    reserved_lvs = "(none)"
436

    
437
  enabled_hv = result["enabled_hypervisors"]
438
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
439
                  if k in enabled_hv)
440

    
441
  info = [
442
    ("Cluster name", result["name"]),
443
    ("Cluster UUID", result["uuid"]),
444

    
445
    ("Creation time", utils.FormatTime(result["ctime"])),
446
    ("Modification time", utils.FormatTime(result["mtime"])),
447

    
448
    ("Master node", result["master"]),
449

    
450
    ("Architecture (this node)",
451
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
452

    
453
    ("Tags", tags),
454

    
455
    ("Default hypervisor", result["default_hypervisor"]),
456
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
457

    
458
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
459

    
460
    ("OS-specific hypervisor parameters",
461
     _FormatGroupedParams(result["os_hvp"])),
462

    
463
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
464

    
465
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
466
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
467

    
468
    ("Cluster parameters", [
469
      ("candidate pool size",
470
       compat.TryToRoman(result["candidate_pool_size"],
471
                         convert=opts.roman_integers)),
472
      ("master netdev", result["master_netdev"]),
473
      ("master netmask", result["master_netmask"]),
474
      ("use external master IP address setup script",
475
       result["use_external_mip_script"]),
476
      ("lvm volume group", result["volume_group_name"]),
477
      ("lvm reserved volumes", reserved_lvs),
478
      ("drbd usermode helper", result["drbd_usermode_helper"]),
479
      ("file storage path", result["file_storage_dir"]),
480
      ("shared file storage path", result["shared_file_storage_dir"]),
481
      ("maintenance of node health", result["maintain_node_health"]),
482
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
483
      ("default instance allocator", result["default_iallocator"]),
484
      ("primary ip version", result["primary_ip_version"]),
485
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
486
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
487
      ("ExtStorage Providers search path",
488
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
489
      ("enabled disk templates",
490
       utils.CommaJoin(result["enabled_disk_templates"])),
491
      ]),
492

    
493
    ("Default node parameters",
494
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
495

    
496
    ("Default instance parameters",
497
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
498

    
499
    ("Default nic parameters",
500
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
501

    
502
    ("Default disk parameters",
503
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
504

    
505
    ("Instance policy - limits for instances",
506
     FormatPolicyInfo(result["ipolicy"], None, True)),
507
    ]
508

    
509
  PrintGenericInfo(info)
510
  return 0
511

    
512

    
513
def ClusterCopyFile(opts, args):
514
  """Copy a file from master to some nodes.
515

516
  @param opts: the command line options selected by the user
517
  @type args: list
518
  @param args: should contain only one element, the path of
519
      the file to be copied
520
  @rtype: int
521
  @return: the desired exit code
522

523
  """
524
  filename = args[0]
525
  if not os.path.exists(filename):
526
    raise errors.OpPrereqError("No such filename '%s'" % filename,
527
                               errors.ECODE_INVAL)
528

    
529
  cl = GetClient()
530

    
531
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
532

    
533
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
534
                           secondary_ips=opts.use_replication_network,
535
                           nodegroup=opts.nodegroup)
536

    
537
  srun = ssh.SshRunner(cluster_name)
538
  for node in results:
539
    if not srun.CopyFileToNode(node, filename):
540
      ToStderr("Copy of file %s to node %s failed", filename, node)
541

    
542
  return 0
543

    
544

    
545
def RunClusterCommand(opts, args):
546
  """Run a command on some nodes.
547

548
  @param opts: the command line options selected by the user
549
  @type args: list
550
  @param args: should contain the command to be run and its arguments
551
  @rtype: int
552
  @return: the desired exit code
553

554
  """
555
  cl = GetClient()
556

    
557
  command = " ".join(args)
558

    
559
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
560

    
561
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
562
                                                    "master_node"])
563

    
564
  srun = ssh.SshRunner(cluster_name=cluster_name)
565

    
566
  # Make sure master node is at list end
567
  if master_node in nodes:
568
    nodes.remove(master_node)
569
    nodes.append(master_node)
570

    
571
  for name in nodes:
572
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
573

    
574
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
575
      # Do not output anything for successful commands
576
      continue
577

    
578
    ToStdout("------------------------------------------------")
579
    if opts.show_machine_names:
580
      for line in result.output.splitlines():
581
        ToStdout("%s: %s", name, line)
582
    else:
583
      ToStdout("node: %s", name)
584
      ToStdout("%s", result.output)
585
    ToStdout("return code = %s", result.exit_code)
586

    
587
  return 0
588

    
589

    
590
def VerifyCluster(opts, args):
591
  """Verify integrity of cluster, performing various test on nodes.
592

593
  @param opts: the command line options selected by the user
594
  @type args: list
595
  @param args: should be an empty list
596
  @rtype: int
597
  @return: the desired exit code
598

599
  """
600
  skip_checks = []
601

    
602
  if opts.skip_nplusone_mem:
603
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
604

    
605
  cl = GetClient()
606

    
607
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
608
                               error_codes=opts.error_codes,
609
                               debug_simulate_errors=opts.simulate_errors,
610
                               skip_checks=skip_checks,
611
                               ignore_errors=opts.ignore_errors,
612
                               group_name=opts.nodegroup)
613
  result = SubmitOpCode(op, cl=cl, opts=opts)
614

    
615
  # Keep track of submitted jobs
616
  jex = JobExecutor(cl=cl, opts=opts)
617

    
618
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
619
    jex.AddJobId(None, status, job_id)
620

    
621
  results = jex.GetResults()
622

    
623
  (bad_jobs, bad_results) = \
624
    map(len,
625
        # Convert iterators to lists
626
        map(list,
627
            # Count errors
628
            map(compat.partial(itertools.ifilterfalse, bool),
629
                # Convert result to booleans in a tuple
630
                zip(*((job_success, len(op_results) == 1 and op_results[0])
631
                      for (job_success, op_results) in results)))))
632

    
633
  if bad_jobs == 0 and bad_results == 0:
634
    rcode = constants.EXIT_SUCCESS
635
  else:
636
    rcode = constants.EXIT_FAILURE
637
    if bad_jobs > 0:
638
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
639

    
640
  return rcode
641

    
642

    
643
def VerifyDisks(opts, args):
644
  """Verify integrity of cluster disks.
645

646
  @param opts: the command line options selected by the user
647
  @type args: list
648
  @param args: should be an empty list
649
  @rtype: int
650
  @return: the desired exit code
651

652
  """
653
  cl = GetClient()
654

    
655
  op = opcodes.OpClusterVerifyDisks()
656

    
657
  result = SubmitOpCode(op, cl=cl, opts=opts)
658

    
659
  # Keep track of submitted jobs
660
  jex = JobExecutor(cl=cl, opts=opts)
661

    
662
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
663
    jex.AddJobId(None, status, job_id)
664

    
665
  retcode = constants.EXIT_SUCCESS
666

    
667
  for (status, result) in jex.GetResults():
668
    if not status:
669
      ToStdout("Job failed: %s", result)
670
      continue
671

    
672
    ((bad_nodes, instances, missing), ) = result
673

    
674
    for node, text in bad_nodes.items():
675
      ToStdout("Error gathering data on node %s: %s",
676
               node, utils.SafeEncode(text[-400:]))
677
      retcode = constants.EXIT_FAILURE
678
      ToStdout("You need to fix these nodes first before fixing instances")
679

    
680
    for iname in instances:
681
      if iname in missing:
682
        continue
683
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
684
      try:
685
        ToStdout("Activating disks for instance '%s'", iname)
686
        SubmitOpCode(op, opts=opts, cl=cl)
687
      except errors.GenericError, err:
688
        nret, msg = FormatError(err)
689
        retcode |= nret
690
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
691

    
692
    if missing:
693
      for iname, ival in missing.iteritems():
694
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
695
        if all_missing:
696
          ToStdout("Instance %s cannot be verified as it lives on"
697
                   " broken nodes", iname)
698
        else:
699
          ToStdout("Instance %s has missing logical volumes:", iname)
700
          ival.sort()
701
          for node, vol in ival:
702
            if node in bad_nodes:
703
              ToStdout("\tbroken node %s /dev/%s", node, vol)
704
            else:
705
              ToStdout("\t%s /dev/%s", node, vol)
706

    
707
      ToStdout("You need to replace or recreate disks for all the above"
708
               " instances if this message persists after fixing broken nodes.")
709
      retcode = constants.EXIT_FAILURE
710
    elif not instances:
711
      ToStdout("No disks need to be activated.")
712

    
713
  return retcode
714

    
715

    
716
def RepairDiskSizes(opts, args):
717
  """Verify sizes of cluster disks.
718

719
  @param opts: the command line options selected by the user
720
  @type args: list
721
  @param args: optional list of instances to restrict check to
722
  @rtype: int
723
  @return: the desired exit code
724

725
  """
726
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
727
  SubmitOpCode(op, opts=opts)
728

    
729

    
730
@UsesRPC
731
def MasterFailover(opts, args):
732
  """Failover the master node.
733

734
  This command, when run on a non-master node, will cause the current
735
  master to cease being master, and the non-master to become new
736
  master.
737

738
  @param opts: the command line options selected by the user
739
  @type args: list
740
  @param args: should be an empty list
741
  @rtype: int
742
  @return: the desired exit code
743

744
  """
745
  if opts.no_voting and not opts.yes_do_it:
746
    usertext = ("This will perform the failover even if most other nodes"
747
                " are down, or if this node is outdated. This is dangerous"
748
                " as it can lead to a non-consistent cluster. Check the"
749
                " gnt-cluster(8) man page before proceeding. Continue?")
750
    if not AskUser(usertext):
751
      return 1
752

    
753
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
754

    
755

    
756
def MasterPing(opts, args):
757
  """Checks if the master is alive.
758

759
  @param opts: the command line options selected by the user
760
  @type args: list
761
  @param args: should be an empty list
762
  @rtype: int
763
  @return: the desired exit code
764

765
  """
766
  try:
767
    cl = GetClient()
768
    cl.QueryClusterInfo()
769
    return 0
770
  except Exception: # pylint: disable=W0703
771
    return 1
772

    
773

    
774
def SearchTags(opts, args):
775
  """Searches the tags on all the cluster.
776

777
  @param opts: the command line options selected by the user
778
  @type args: list
779
  @param args: should contain only one element, the tag pattern
780
  @rtype: int
781
  @return: the desired exit code
782

783
  """
784
  op = opcodes.OpTagsSearch(pattern=args[0])
785
  result = SubmitOpCode(op, opts=opts)
786
  if not result:
787
    return 1
788
  result = list(result)
789
  result.sort()
790
  for path, tag in result:
791
    ToStdout("%s %s", path, tag)
792

    
793

    
794
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
795
  """Reads and verifies an X509 certificate.
796

797
  @type cert_filename: string
798
  @param cert_filename: the path of the file containing the certificate to
799
                        verify encoded in PEM format
800
  @type verify_private_key: bool
801
  @param verify_private_key: whether to verify the private key in addition to
802
                             the public certificate
803
  @rtype: string
804
  @return: a string containing the PEM-encoded certificate.
805

806
  """
807
  try:
808
    pem = utils.ReadFile(cert_filename)
809
  except IOError, err:
810
    raise errors.X509CertError(cert_filename,
811
                               "Unable to read certificate: %s" % str(err))
812

    
813
  try:
814
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
815
  except Exception, err:
816
    raise errors.X509CertError(cert_filename,
817
                               "Unable to load certificate: %s" % str(err))
818

    
819
  if verify_private_key:
820
    try:
821
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
822
    except Exception, err:
823
      raise errors.X509CertError(cert_filename,
824
                                 "Unable to load private key: %s" % str(err))
825

    
826
  return pem
827

    
828

    
829
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
830
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
831
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
832
                 cds_filename, force):
833
  """Renews cluster certificates, keys and secrets.
834

835
  @type new_cluster_cert: bool
836
  @param new_cluster_cert: Whether to generate a new cluster certificate
837
  @type new_rapi_cert: bool
838
  @param new_rapi_cert: Whether to generate a new RAPI certificate
839
  @type rapi_cert_filename: string
840
  @param rapi_cert_filename: Path to file containing new RAPI certificate
841
  @type new_spice_cert: bool
842
  @param new_spice_cert: Whether to generate a new SPICE certificate
843
  @type spice_cert_filename: string
844
  @param spice_cert_filename: Path to file containing new SPICE certificate
845
  @type spice_cacert_filename: string
846
  @param spice_cacert_filename: Path to file containing the certificate of the
847
                                CA that signed the SPICE certificate
848
  @type new_confd_hmac_key: bool
849
  @param new_confd_hmac_key: Whether to generate a new HMAC key
850
  @type new_cds: bool
851
  @param new_cds: Whether to generate a new cluster domain secret
852
  @type cds_filename: string
853
  @param cds_filename: Path to file containing new cluster domain secret
854
  @type force: bool
855
  @param force: Whether to ask user for confirmation
856

857
  """
858
  if new_rapi_cert and rapi_cert_filename:
859
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
860
             " options can be specified at the same time.")
861
    return 1
862

    
863
  if new_cds and cds_filename:
864
    ToStderr("Only one of the --new-cluster-domain-secret and"
865
             " --cluster-domain-secret options can be specified at"
866
             " the same time.")
867
    return 1
868

    
869
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
870
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
871
             " and --spice-ca-certificate must not be used.")
872
    return 1
873

    
874
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
875
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
876
             " specified.")
877
    return 1
878

    
879
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
880
  try:
881
    if rapi_cert_filename:
882
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
883
    if spice_cert_filename:
884
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
885
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
886
  except errors.X509CertError, err:
887
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
888
    return 1
889

    
890
  if cds_filename:
891
    try:
892
      cds = utils.ReadFile(cds_filename)
893
    except Exception, err: # pylint: disable=W0703
894
      ToStderr("Can't load new cluster domain secret from %s: %s" %
895
               (cds_filename, str(err)))
896
      return 1
897
  else:
898
    cds = None
899

    
900
  if not force:
901
    usertext = ("This requires all daemons on all nodes to be restarted and"
902
                " may take some time. Continue?")
903
    if not AskUser(usertext):
904
      return 1
905

    
906
  def _RenewCryptoInner(ctx):
907
    ctx.feedback_fn("Updating certificates and keys")
908
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
909
                                    new_rapi_cert,
910
                                    new_spice_cert,
911
                                    new_confd_hmac_key,
912
                                    new_cds,
913
                                    rapi_cert_pem=rapi_cert_pem,
914
                                    spice_cert_pem=spice_cert_pem,
915
                                    spice_cacert_pem=spice_cacert_pem,
916
                                    cds=cds)
917

    
918
    files_to_copy = []
919

    
920
    if new_cluster_cert:
921
      files_to_copy.append(pathutils.NODED_CERT_FILE)
922

    
923
    if new_rapi_cert or rapi_cert_pem:
924
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
925

    
926
    if new_spice_cert or spice_cert_pem:
927
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
928
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
929

    
930
    if new_confd_hmac_key:
931
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
932

    
933
    if new_cds or cds:
934
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
935

    
936
    if files_to_copy:
937
      for node_name in ctx.nonmaster_nodes:
938
        ctx.feedback_fn("Copying %s to %s" %
939
                        (", ".join(files_to_copy), node_name))
940
        for file_name in files_to_copy:
941
          ctx.ssh.CopyFileToNode(node_name, file_name)
942

    
943
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
944

    
945
  ToStdout("All requested certificates and keys have been replaced."
946
           " Running \"gnt-cluster verify\" now is recommended.")
947

    
948
  return 0
949

    
950

    
951
def RenewCrypto(opts, args):
952
  """Renews cluster certificates, keys and secrets.
953

954
  """
955
  return _RenewCrypto(opts.new_cluster_cert,
956
                      opts.new_rapi_cert,
957
                      opts.rapi_cert,
958
                      opts.new_spice_cert,
959
                      opts.spice_cert,
960
                      opts.spice_cacert,
961
                      opts.new_confd_hmac_key,
962
                      opts.new_cluster_domain_secret,
963
                      opts.cluster_domain_secret,
964
                      opts.force)
965

    
966

    
967
def SetClusterParams(opts, args):
968
  """Modify the cluster.
969

970
  @param opts: the command line options selected by the user
971
  @type args: list
972
  @param args: should be an empty list
973
  @rtype: int
974
  @return: the desired exit code
975

976
  """
977
  if not (opts.vg_name is not None or opts.drbd_helper or
978
          opts.enabled_hypervisors or opts.hvparams or
979
          opts.beparams or opts.nicparams or
980
          opts.ndparams or opts.diskparams or
981
          opts.candidate_pool_size is not None or
982
          opts.uid_pool is not None or
983
          opts.maintain_node_health is not None or
984
          opts.add_uids is not None or
985
          opts.remove_uids is not None or
986
          opts.default_iallocator is not None or
987
          opts.reserved_lvs is not None or
988
          opts.master_netdev is not None or
989
          opts.master_netmask is not None or
990
          opts.use_external_mip_script is not None or
991
          opts.prealloc_wipe_disks is not None or
992
          opts.hv_state or
993
          opts.enabled_disk_templates or
994
          opts.disk_state or
995
          opts.ipolicy_bounds_specs is not None or
996
          opts.ipolicy_std_specs is not None or
997
          opts.ipolicy_disk_templates is not None or
998
          opts.ipolicy_vcpu_ratio is not None or
999
          opts.ipolicy_spindle_ratio is not None or
1000
          opts.modify_etc_hosts is not None or
1001
          opts.file_storage_dir is not None):
1002
    ToStderr("Please give at least one of the parameters.")
1003
    return 1
1004

    
1005
  if _CheckNoLvmStorageOptDeprecated(opts):
1006
    return 1
1007

    
1008
  enabled_disk_templates = None
1009
  if opts.enabled_disk_templates:
1010
    enabled_disk_templates = opts.enabled_disk_templates.split(",")
1011

    
1012
  # consistency between vg name and enabled disk templates
1013
  vg_name = None
1014
  if opts.vg_name is not None:
1015
    vg_name = opts.vg_name
1016
  if enabled_disk_templates:
1017
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1018
      ToStdout("You specified a volume group with --vg-name, but you did not"
1019
               " enable any of the following lvm-based disk templates: %s" %
1020
               utils.CommaJoin(utils.GetLvmDiskTemplates()))
1021

    
1022
  drbd_helper = opts.drbd_helper
1023
  if not opts.drbd_storage and opts.drbd_helper:
1024
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
1025
    return 1
1026

    
1027
  if not opts.drbd_storage:
1028
    drbd_helper = ""
1029

    
1030
  hvlist = opts.enabled_hypervisors
1031
  if hvlist is not None:
1032
    hvlist = hvlist.split(",")
1033

    
1034
  # a list of (name, dict) we can pass directly to dict() (or [])
1035
  hvparams = dict(opts.hvparams)
1036
  for hv_params in hvparams.values():
1037
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1038

    
1039
  diskparams = dict(opts.diskparams)
1040

    
1041
  for dt_params in diskparams.values():
1042
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1043

    
1044
  beparams = opts.beparams
1045
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1046

    
1047
  nicparams = opts.nicparams
1048
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1049

    
1050
  ndparams = opts.ndparams
1051
  if ndparams is not None:
1052
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1053

    
1054
  ipolicy = CreateIPolicyFromOpts(
1055
    minmax_ispecs=opts.ipolicy_bounds_specs,
1056
    std_ispecs=opts.ipolicy_std_specs,
1057
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1058
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1059
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1060
    )
1061

    
1062
  mnh = opts.maintain_node_health
1063

    
1064
  uid_pool = opts.uid_pool
1065
  if uid_pool is not None:
1066
    uid_pool = uidpool.ParseUidPool(uid_pool)
1067

    
1068
  add_uids = opts.add_uids
1069
  if add_uids is not None:
1070
    add_uids = uidpool.ParseUidPool(add_uids)
1071

    
1072
  remove_uids = opts.remove_uids
1073
  if remove_uids is not None:
1074
    remove_uids = uidpool.ParseUidPool(remove_uids)
1075

    
1076
  if opts.reserved_lvs is not None:
1077
    if opts.reserved_lvs == "":
1078
      opts.reserved_lvs = []
1079
    else:
1080
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1081

    
1082
  if opts.master_netmask is not None:
1083
    try:
1084
      opts.master_netmask = int(opts.master_netmask)
1085
    except ValueError:
1086
      ToStderr("The --master-netmask option expects an int parameter.")
1087
      return 1
1088

    
1089
  ext_ip_script = opts.use_external_mip_script
1090

    
1091
  if opts.disk_state:
1092
    disk_state = utils.FlatToDict(opts.disk_state)
1093
  else:
1094
    disk_state = {}
1095

    
1096
  hv_state = dict(opts.hv_state)
1097

    
1098
  op = opcodes.OpClusterSetParams(
1099
    vg_name=vg_name,
1100
    drbd_helper=drbd_helper,
1101
    enabled_hypervisors=hvlist,
1102
    hvparams=hvparams,
1103
    os_hvp=None,
1104
    beparams=beparams,
1105
    nicparams=nicparams,
1106
    ndparams=ndparams,
1107
    diskparams=diskparams,
1108
    ipolicy=ipolicy,
1109
    candidate_pool_size=opts.candidate_pool_size,
1110
    maintain_node_health=mnh,
1111
    modify_etc_hosts=opts.modify_etc_hosts,
1112
    uid_pool=uid_pool,
1113
    add_uids=add_uids,
1114
    remove_uids=remove_uids,
1115
    default_iallocator=opts.default_iallocator,
1116
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1117
    master_netdev=opts.master_netdev,
1118
    master_netmask=opts.master_netmask,
1119
    reserved_lvs=opts.reserved_lvs,
1120
    use_external_mip_script=ext_ip_script,
1121
    hv_state=hv_state,
1122
    disk_state=disk_state,
1123
    enabled_disk_templates=enabled_disk_templates,
1124
    force=opts.force,
1125
    file_storage_dir=opts.file_storage_dir,
1126
    )
1127
  SubmitOrSend(op, opts)
1128
  return 0
1129

    
1130

    
1131
def QueueOps(opts, args):
1132
  """Queue operations.
1133

1134
  @param opts: the command line options selected by the user
1135
  @type args: list
1136
  @param args: should contain only one element, the subcommand
1137
  @rtype: int
1138
  @return: the desired exit code
1139

1140
  """
1141
  command = args[0]
1142
  client = GetClient()
1143
  if command in ("drain", "undrain"):
1144
    drain_flag = command == "drain"
1145
    client.SetQueueDrainFlag(drain_flag)
1146
  elif command == "info":
1147
    result = client.QueryConfigValues(["drain_flag"])
1148
    if result[0]:
1149
      val = "set"
1150
    else:
1151
      val = "unset"
1152
    ToStdout("The drain flag is %s" % val)
1153
  else:
1154
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1155
                               errors.ECODE_INVAL)
1156

    
1157
  return 0
1158

    
1159

    
1160
def _ShowWatcherPause(until):
1161
  if until is None or until < time.time():
1162
    ToStdout("The watcher is not paused.")
1163
  else:
1164
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1165

    
1166

    
1167
def WatcherOps(opts, args):
1168
  """Watcher operations.
1169

1170
  @param opts: the command line options selected by the user
1171
  @type args: list
1172
  @param args: should contain only one element, the subcommand
1173
  @rtype: int
1174
  @return: the desired exit code
1175

1176
  """
1177
  command = args[0]
1178
  client = GetClient()
1179

    
1180
  if command == "continue":
1181
    client.SetWatcherPause(None)
1182
    ToStdout("The watcher is no longer paused.")
1183

    
1184
  elif command == "pause":
1185
    if len(args) < 2:
1186
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1187

    
1188
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1189
    _ShowWatcherPause(result)
1190

    
1191
  elif command == "info":
1192
    result = client.QueryConfigValues(["watcher_pause"])
1193
    _ShowWatcherPause(result[0])
1194

    
1195
  else:
1196
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1197
                               errors.ECODE_INVAL)
1198

    
1199
  return 0
1200

    
1201

    
1202
def _OobPower(opts, node_list, power):
1203
  """Puts the node in the list to desired power state.
1204

1205
  @param opts: The command line options selected by the user
1206
  @param node_list: The list of nodes to operate on
1207
  @param power: True if they should be powered on, False otherwise
1208
  @return: The success of the operation (none failed)
1209

1210
  """
1211
  if power:
1212
    command = constants.OOB_POWER_ON
1213
  else:
1214
    command = constants.OOB_POWER_OFF
1215

    
1216
  op = opcodes.OpOobCommand(node_names=node_list,
1217
                            command=command,
1218
                            ignore_status=True,
1219
                            timeout=opts.oob_timeout,
1220
                            power_delay=opts.power_delay)
1221
  result = SubmitOpCode(op, opts=opts)
1222
  errs = 0
1223
  for node_result in result:
1224
    (node_tuple, data_tuple) = node_result
1225
    (_, node_name) = node_tuple
1226
    (data_status, _) = data_tuple
1227
    if data_status != constants.RS_NORMAL:
1228
      assert data_status != constants.RS_UNAVAIL
1229
      errs += 1
1230
      ToStderr("There was a problem changing power for %s, please investigate",
1231
               node_name)
1232

    
1233
  if errs > 0:
1234
    return False
1235

    
1236
  return True
1237

    
1238

    
1239
def _InstanceStart(opts, inst_list, start, no_remember=False):
1240
  """Puts the instances in the list to desired state.
1241

1242
  @param opts: The command line options selected by the user
1243
  @param inst_list: The list of instances to operate on
1244
  @param start: True if they should be started, False for shutdown
1245
  @param no_remember: If the instance state should be remembered
1246
  @return: The success of the operation (none failed)
1247

1248
  """
1249
  if start:
1250
    opcls = opcodes.OpInstanceStartup
1251
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1252
  else:
1253
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1254
                           timeout=opts.shutdown_timeout,
1255
                           no_remember=no_remember)
1256
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1257

    
1258
  jex = JobExecutor(opts=opts)
1259

    
1260
  for inst in inst_list:
1261
    ToStdout("Submit %s of instance %s", text_submit, inst)
1262
    op = opcls(instance_name=inst)
1263
    jex.QueueJob(inst, op)
1264

    
1265
  results = jex.GetResults()
1266
  bad_cnt = len([1 for (success, _) in results if not success])
1267

    
1268
  if bad_cnt == 0:
1269
    ToStdout("All instances have been %s successfully", text_success)
1270
  else:
1271
    ToStderr("There were errors while %s instances:\n"
1272
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1273
             len(results))
1274
    return False
1275

    
1276
  return True
1277

    
1278

    
1279
class _RunWhenNodesReachableHelper:
1280
  """Helper class to make shared internal state sharing easier.
1281

1282
  @ivar success: Indicates if all action_cb calls were successful
1283

1284
  """
1285
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1286
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1287
    """Init the object.
1288

1289
    @param node_list: The list of nodes to be reachable
1290
    @param action_cb: Callback called when a new host is reachable
1291
    @type node2ip: dict
1292
    @param node2ip: Node to ip mapping
1293
    @param port: The port to use for the TCP ping
1294
    @param feedback_fn: The function used for feedback
1295
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1296
    @param _sleep_fn: Function to sleep (for unittest use only)
1297

1298
    """
1299
    self.down = set(node_list)
1300
    self.up = set()
1301
    self.node2ip = node2ip
1302
    self.success = True
1303
    self.action_cb = action_cb
1304
    self.port = port
1305
    self.feedback_fn = feedback_fn
1306
    self._ping_fn = _ping_fn
1307
    self._sleep_fn = _sleep_fn
1308

    
1309
  def __call__(self):
1310
    """When called we run action_cb.
1311

1312
    @raises utils.RetryAgain: When there are still down nodes
1313

1314
    """
1315
    if not self.action_cb(self.up):
1316
      self.success = False
1317

    
1318
    if self.down:
1319
      raise utils.RetryAgain()
1320
    else:
1321
      return self.success
1322

    
1323
  def Wait(self, secs):
1324
    """Checks if a host is up or waits remaining seconds.
1325

1326
    @param secs: The secs remaining
1327

1328
    """
1329
    start = time.time()
1330
    for node in self.down:
1331
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1332
                       live_port_needed=True):
1333
        self.feedback_fn("Node %s became available" % node)
1334
        self.up.add(node)
1335
        self.down -= self.up
1336
        # If we have a node available there is the possibility to run the
1337
        # action callback successfully, therefore we don't wait and return
1338
        return
1339

    
1340
    self._sleep_fn(max(0.0, start + secs - time.time()))
1341

    
1342

    
1343
def _RunWhenNodesReachable(node_list, action_cb, interval):
1344
  """Run action_cb when nodes become reachable.
1345

1346
  @param node_list: The list of nodes to be reachable
1347
  @param action_cb: Callback called when a new host is reachable
1348
  @param interval: The earliest time to retry
1349

1350
  """
1351
  client = GetClient()
1352
  cluster_info = client.QueryClusterInfo()
1353
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1354
    family = netutils.IPAddress.family
1355
  else:
1356
    family = netutils.IP6Address.family
1357

    
1358
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1359
                 for node in node_list)
1360

    
1361
  port = netutils.GetDaemonPort(constants.NODED)
1362
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1363
                                        ToStdout)
1364

    
1365
  try:
1366
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1367
                       wait_fn=helper.Wait)
1368
  except utils.RetryTimeout:
1369
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1370
             " again:\n  - %s", "  - ".join(helper.down))
1371
    return False
1372

    
1373

    
1374
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1375
                          _instance_start_fn=_InstanceStart):
1376
  """Start the instances conditional based on node_states.
1377

1378
  @param opts: The command line options selected by the user
1379
  @param inst_map: A dict of inst -> nodes mapping
1380
  @param nodes_online: A list of nodes online
1381
  @param _instance_start_fn: Callback to start instances (unittest use only)
1382
  @return: Success of the operation on all instances
1383

1384
  """
1385
  start_inst_list = []
1386
  for (inst, nodes) in inst_map.items():
1387
    if not (nodes - nodes_online):
1388
      # All nodes the instance lives on are back online
1389
      start_inst_list.append(inst)
1390

    
1391
  for inst in start_inst_list:
1392
    del inst_map[inst]
1393

    
1394
  if start_inst_list:
1395
    return _instance_start_fn(opts, start_inst_list, True)
1396

    
1397
  return True
1398

    
1399

    
1400
def _EpoOn(opts, full_node_list, node_list, inst_map):
1401
  """Does the actual power on.
1402

1403
  @param opts: The command line options selected by the user
1404
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1405
                         OOB)
1406
  @param node_list: The list of nodes to operate on (all need to support OOB)
1407
  @param inst_map: A dict of inst -> nodes mapping
1408
  @return: The desired exit status
1409

1410
  """
1411
  if node_list and not _OobPower(opts, node_list, False):
1412
    ToStderr("Not all nodes seem to get back up, investigate and start"
1413
             " manually if needed")
1414

    
1415
  # Wait for the nodes to be back up
1416
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1417

    
1418
  ToStdout("Waiting until all nodes are available again")
1419
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1420
    ToStderr("Please investigate and start stopped instances manually")
1421
    return constants.EXIT_FAILURE
1422

    
1423
  return constants.EXIT_SUCCESS
1424

    
1425

    
1426
def _EpoOff(opts, node_list, inst_map):
1427
  """Does the actual power off.
1428

1429
  @param opts: The command line options selected by the user
1430
  @param node_list: The list of nodes to operate on (all need to support OOB)
1431
  @param inst_map: A dict of inst -> nodes mapping
1432
  @return: The desired exit status
1433

1434
  """
1435
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1436
    ToStderr("Please investigate and stop instances manually before continuing")
1437
    return constants.EXIT_FAILURE
1438

    
1439
  if not node_list:
1440
    return constants.EXIT_SUCCESS
1441

    
1442
  if _OobPower(opts, node_list, False):
1443
    return constants.EXIT_SUCCESS
1444
  else:
1445
    return constants.EXIT_FAILURE
1446

    
1447

    
1448
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1449
        _confirm_fn=ConfirmOperation,
1450
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1451
  """EPO operations.
1452

1453
  @param opts: the command line options selected by the user
1454
  @type args: list
1455
  @param args: should contain only one element, the subcommand
1456
  @rtype: int
1457
  @return: the desired exit code
1458

1459
  """
1460
  if opts.groups and opts.show_all:
1461
    _stderr_fn("Only one of --groups or --all are allowed")
1462
    return constants.EXIT_FAILURE
1463
  elif args and opts.show_all:
1464
    _stderr_fn("Arguments in combination with --all are not allowed")
1465
    return constants.EXIT_FAILURE
1466

    
1467
  if cl is None:
1468
    cl = GetClient()
1469

    
1470
  if opts.groups:
1471
    node_query_list = \
1472
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1473
  else:
1474
    node_query_list = args
1475

    
1476
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1477
                                           "sinst_list", "powered", "offline"],
1478
                         False)
1479

    
1480
  all_nodes = map(compat.fst, result)
1481
  node_list = []
1482
  inst_map = {}
1483
  for (node, master, pinsts, sinsts, powered, offline) in result:
1484
    if not offline:
1485
      for inst in (pinsts + sinsts):
1486
        if inst in inst_map:
1487
          if not master:
1488
            inst_map[inst].add(node)
1489
        elif master:
1490
          inst_map[inst] = set()
1491
        else:
1492
          inst_map[inst] = set([node])
1493

    
1494
    if master and opts.on:
1495
      # We ignore the master for turning on the machines, in fact we are
1496
      # already operating on the master at this point :)
1497
      continue
1498
    elif master and not opts.show_all:
1499
      _stderr_fn("%s is the master node, please do a master-failover to another"
1500
                 " node not affected by the EPO or use --all if you intend to"
1501
                 " shutdown the whole cluster", node)
1502
      return constants.EXIT_FAILURE
1503
    elif powered is None:
1504
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1505
                 " handled in a fully automated manner", node)
1506
    elif powered == opts.on:
1507
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1508
    elif not offline or (offline and powered):
1509
      node_list.append(node)
1510

    
1511
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1512
    return constants.EXIT_FAILURE
1513

    
1514
  if opts.on:
1515
    return _on_fn(opts, all_nodes, node_list, inst_map)
1516
  else:
1517
    return _off_fn(opts, node_list, inst_map)
1518

    
1519

    
1520
def _GetCreateCommand(info):
1521
  buf = StringIO()
1522
  buf.write("gnt-cluster init")
1523
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1524
  buf.write(" ")
1525
  buf.write(info["name"])
1526
  return buf.getvalue()
1527

    
1528

    
1529
def ShowCreateCommand(opts, args):
1530
  """Shows the command that can be used to re-create the cluster.
1531

1532
  Currently it works only for ipolicy specs.
1533

1534
  """
1535
  cl = GetClient(query=True)
1536
  result = cl.QueryClusterInfo()
1537
  ToStdout(_GetCreateCommand(result))
1538

    
1539

    
1540
commands = {
1541
  "init": (
1542
    InitCluster, [ArgHost(min=1, max=1)],
1543
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1544
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1545
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1546
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1547
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1548
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1549
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1550
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
1551
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
1552
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1553
  "destroy": (
1554
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1555
    "", "Destroy cluster"),
1556
  "rename": (
1557
    RenameCluster, [ArgHost(min=1, max=1)],
1558
    [FORCE_OPT, DRY_RUN_OPT],
1559
    "<new_name>",
1560
    "Renames the cluster"),
1561
  "redist-conf": (
1562
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS + [DRY_RUN_OPT, PRIORITY_OPT],
1563
    "", "Forces a push of the configuration file and ssconf files"
1564
    " to the nodes in the cluster"),
1565
  "verify": (
1566
    VerifyCluster, ARGS_NONE,
1567
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1568
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1569
    "", "Does a check on the cluster configuration"),
1570
  "verify-disks": (
1571
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1572
    "", "Does a check on the cluster disk status"),
1573
  "repair-disk-sizes": (
1574
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1575
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1576
  "master-failover": (
1577
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1578
    "", "Makes the current node the master"),
1579
  "master-ping": (
1580
    MasterPing, ARGS_NONE, [],
1581
    "", "Checks if the master is alive"),
1582
  "version": (
1583
    ShowClusterVersion, ARGS_NONE, [],
1584
    "", "Shows the cluster version"),
1585
  "getmaster": (
1586
    ShowClusterMaster, ARGS_NONE, [],
1587
    "", "Shows the cluster master"),
1588
  "copyfile": (
1589
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1590
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1591
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1592
  "command": (
1593
    RunClusterCommand, [ArgCommand(min=1)],
1594
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1595
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1596
  "info": (
1597
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1598
    "[--roman]", "Show cluster configuration"),
1599
  "list-tags": (
1600
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1601
  "add-tags": (
1602
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
1603
    "tag...", "Add tags to the cluster"),
1604
  "remove-tags": (
1605
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
1606
    "tag...", "Remove tags from the cluster"),
1607
  "search-tags": (
1608
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1609
    "Searches the tags on all objects on"
1610
    " the cluster for a given pattern (regex)"),
1611
  "queue": (
1612
    QueueOps,
1613
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1614
    [], "drain|undrain|info", "Change queue properties"),
1615
  "watcher": (
1616
    WatcherOps,
1617
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1618
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1619
    [],
1620
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1621
  "modify": (
1622
    SetClusterParams, ARGS_NONE,
1623
    [FORCE_OPT,
1624
     BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1625
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1626
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1627
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1628
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1629
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1630
     DISK_STATE_OPT] + SUBMIT_OPTS +
1631
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
1632
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
1633
    "[opts...]",
1634
    "Alters the parameters of the cluster"),
1635
  "renew-crypto": (
1636
    RenewCrypto, ARGS_NONE,
1637
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1638
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1639
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1640
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1641
    "[opts...]",
1642
    "Renews cluster certificates, keys and secrets"),
1643
  "epo": (
1644
    Epo, [ArgUnknown()],
1645
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1646
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1647
    "[opts...] [args]",
1648
    "Performs an emergency power-off on given args"),
1649
  "activate-master-ip": (
1650
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1651
  "deactivate-master-ip": (
1652
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1653
    "Deactivates the master IP"),
1654
  "show-ispecs-cmd": (
1655
    ShowCreateCommand, ARGS_NONE, [], "",
1656
    "Show the command line to re-create the cluster"),
1657
  }
1658

    
1659

    
1660
#: dictionary with aliases for commands
1661
aliases = {
1662
  "masterfailover": "master-failover",
1663
  "show": "info",
1664
}
1665

    
1666

    
1667
def Main():
1668
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1669
                     aliases=aliases)