Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 0500f6fd

History | View | Annotate | Download (52.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os.path
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import pathutils
47

    
48

    
49
ON_OPT = cli_option("--on", default=False,
50
                    action="store_true", dest="on",
51
                    help="Recover from an EPO")
52

    
53
GROUPS_OPT = cli_option("--groups", default=False,
54
                        action="store_true", dest="groups",
55
                        help="Arguments are node groups instead of nodes")
56

    
57
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
58
                            help="Override interactive check for --no-voting",
59
                            default=False, action="store_true")
60

    
61
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
62
_EPO_PING_TIMEOUT = 1 # 1 second
63
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
64

    
65

    
66
@UsesRPC
67
def InitCluster(opts, args):
68
  """Initialize the cluster.
69

70
  @param opts: the command line options selected by the user
71
  @type args: list
72
  @param args: should contain only one element, the desired
73
      cluster name
74
  @rtype: int
75
  @return: the desired exit code
76

77
  """
78
  if not opts.lvm_storage and opts.vg_name:
79
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
80
    return 1
81

    
82
  vg_name = opts.vg_name
83
  if opts.lvm_storage and not opts.vg_name:
84
    vg_name = constants.DEFAULT_VG
85

    
86
  if not opts.drbd_storage and opts.drbd_helper:
87
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
88
    return 1
89

    
90
  drbd_helper = opts.drbd_helper
91
  if opts.drbd_storage and not opts.drbd_helper:
92
    drbd_helper = constants.DEFAULT_DRBD_HELPER
93

    
94
  master_netdev = opts.master_netdev
95
  if master_netdev is None:
96
    master_netdev = constants.DEFAULT_BRIDGE
97

    
98
  hvlist = opts.enabled_hypervisors
99
  if hvlist is None:
100
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
101
  hvlist = hvlist.split(",")
102

    
103
  hvparams = dict(opts.hvparams)
104
  beparams = opts.beparams
105
  nicparams = opts.nicparams
106

    
107
  diskparams = dict(opts.diskparams)
108

    
109
  # check the disk template types here, as we cannot rely on the type check done
110
  # by the opcode parameter types
111
  diskparams_keys = set(diskparams.keys())
112
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
113
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
114
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
115
    return 1
116

    
117
  # prepare beparams dict
118
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
119
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
120

    
121
  # prepare nicparams dict
122
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
123
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
124

    
125
  # prepare ndparams dict
126
  if opts.ndparams is None:
127
    ndparams = dict(constants.NDC_DEFAULTS)
128
  else:
129
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
130
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
131

    
132
  # prepare hvparams dict
133
  for hv in constants.HYPER_TYPES:
134
    if hv not in hvparams:
135
      hvparams[hv] = {}
136
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
137
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
138

    
139
  # prepare diskparams dict
140
  for templ in constants.DISK_TEMPLATES:
141
    if templ not in diskparams:
142
      diskparams[templ] = {}
143
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
144
                                         diskparams[templ])
145
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
146

    
147
  # prepare ipolicy dict
148
  ipolicy = CreateIPolicyFromOpts(
149
    ispecs_mem_size=opts.ispecs_mem_size,
150
    ispecs_cpu_count=opts.ispecs_cpu_count,
151
    ispecs_disk_count=opts.ispecs_disk_count,
152
    ispecs_disk_size=opts.ispecs_disk_size,
153
    ispecs_nic_count=opts.ispecs_nic_count,
154
    minmax_ispecs=opts.ipolicy_bounds_specs,
155
    std_ispecs=opts.ipolicy_std_specs,
156
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
157
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
158
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
159
    fill_all=True)
160

    
161
  if opts.candidate_pool_size is None:
162
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
163

    
164
  if opts.mac_prefix is None:
165
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
166

    
167
  uid_pool = opts.uid_pool
168
  if uid_pool is not None:
169
    uid_pool = uidpool.ParseUidPool(uid_pool)
170

    
171
  if opts.prealloc_wipe_disks is None:
172
    opts.prealloc_wipe_disks = False
173

    
174
  external_ip_setup_script = opts.use_external_mip_script
175
  if external_ip_setup_script is None:
176
    external_ip_setup_script = False
177

    
178
  try:
179
    primary_ip_version = int(opts.primary_ip_version)
180
  except (ValueError, TypeError), err:
181
    ToStderr("Invalid primary ip version value: %s" % str(err))
182
    return 1
183

    
184
  master_netmask = opts.master_netmask
185
  try:
186
    if master_netmask is not None:
187
      master_netmask = int(master_netmask)
188
  except (ValueError, TypeError), err:
189
    ToStderr("Invalid master netmask value: %s" % str(err))
190
    return 1
191

    
192
  if opts.disk_state:
193
    disk_state = utils.FlatToDict(opts.disk_state)
194
  else:
195
    disk_state = {}
196

    
197
  hv_state = dict(opts.hv_state)
198

    
199
  enabled_disk_templates = opts.enabled_disk_templates
200
  if enabled_disk_templates:
201
    enabled_disk_templates = enabled_disk_templates.split(",")
202
  else:
203
    enabled_disk_templates = list(constants.DEFAULT_ENABLED_DISK_TEMPLATES)
204

    
205
  bootstrap.InitCluster(cluster_name=args[0],
206
                        secondary_ip=opts.secondary_ip,
207
                        vg_name=vg_name,
208
                        mac_prefix=opts.mac_prefix,
209
                        master_netmask=master_netmask,
210
                        master_netdev=master_netdev,
211
                        file_storage_dir=opts.file_storage_dir,
212
                        shared_file_storage_dir=opts.shared_file_storage_dir,
213
                        enabled_hypervisors=hvlist,
214
                        hvparams=hvparams,
215
                        beparams=beparams,
216
                        nicparams=nicparams,
217
                        ndparams=ndparams,
218
                        diskparams=diskparams,
219
                        ipolicy=ipolicy,
220
                        candidate_pool_size=opts.candidate_pool_size,
221
                        modify_etc_hosts=opts.modify_etc_hosts,
222
                        modify_ssh_setup=opts.modify_ssh_setup,
223
                        maintain_node_health=opts.maintain_node_health,
224
                        drbd_helper=drbd_helper,
225
                        uid_pool=uid_pool,
226
                        default_iallocator=opts.default_iallocator,
227
                        primary_ip_version=primary_ip_version,
228
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
229
                        use_external_mip_script=external_ip_setup_script,
230
                        hv_state=hv_state,
231
                        disk_state=disk_state,
232
                        enabled_disk_templates=enabled_disk_templates,
233
                        )
234
  op = opcodes.OpClusterPostInit()
235
  SubmitOpCode(op, opts=opts)
236
  return 0
237

    
238

    
239
@UsesRPC
240
def DestroyCluster(opts, args):
241
  """Destroy the cluster.
242

243
  @param opts: the command line options selected by the user
244
  @type args: list
245
  @param args: should be an empty list
246
  @rtype: int
247
  @return: the desired exit code
248

249
  """
250
  if not opts.yes_do_it:
251
    ToStderr("Destroying a cluster is irreversible. If you really want"
252
             " destroy this cluster, supply the --yes-do-it option.")
253
    return 1
254

    
255
  op = opcodes.OpClusterDestroy()
256
  master = SubmitOpCode(op, opts=opts)
257
  # if we reached this, the opcode didn't fail; we can proceed to
258
  # shutdown all the daemons
259
  bootstrap.FinalizeClusterDestroy(master)
260
  return 0
261

    
262

    
263
def RenameCluster(opts, args):
264
  """Rename the cluster.
265

266
  @param opts: the command line options selected by the user
267
  @type args: list
268
  @param args: should contain only one element, the new cluster name
269
  @rtype: int
270
  @return: the desired exit code
271

272
  """
273
  cl = GetClient()
274

    
275
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
276

    
277
  new_name = args[0]
278
  if not opts.force:
279
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
280
                " connected over the network to the cluster name, the"
281
                " operation is very dangerous as the IP address will be"
282
                " removed from the node and the change may not go through."
283
                " Continue?") % (cluster_name, new_name)
284
    if not AskUser(usertext):
285
      return 1
286

    
287
  op = opcodes.OpClusterRename(name=new_name)
288
  result = SubmitOpCode(op, opts=opts, cl=cl)
289

    
290
  if result:
291
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
292

    
293
  return 0
294

    
295

    
296
def ActivateMasterIp(opts, args):
297
  """Activates the master IP.
298

299
  """
300
  op = opcodes.OpClusterActivateMasterIp()
301
  SubmitOpCode(op)
302
  return 0
303

    
304

    
305
def DeactivateMasterIp(opts, args):
306
  """Deactivates the master IP.
307

308
  """
309
  if not opts.confirm:
310
    usertext = ("This will disable the master IP. All the open connections to"
311
                " the master IP will be closed. To reach the master you will"
312
                " need to use its node IP."
313
                " Continue?")
314
    if not AskUser(usertext):
315
      return 1
316

    
317
  op = opcodes.OpClusterDeactivateMasterIp()
318
  SubmitOpCode(op)
319
  return 0
320

    
321

    
322
def RedistributeConfig(opts, args):
323
  """Forces push of the cluster configuration.
324

325
  @param opts: the command line options selected by the user
326
  @type args: list
327
  @param args: empty list
328
  @rtype: int
329
  @return: the desired exit code
330

331
  """
332
  op = opcodes.OpClusterRedistConf()
333
  SubmitOrSend(op, opts)
334
  return 0
335

    
336

    
337
def ShowClusterVersion(opts, args):
338
  """Write version of ganeti software to the standard output.
339

340
  @param opts: the command line options selected by the user
341
  @type args: list
342
  @param args: should be an empty list
343
  @rtype: int
344
  @return: the desired exit code
345

346
  """
347
  cl = GetClient(query=True)
348
  result = cl.QueryClusterInfo()
349
  ToStdout("Software version: %s", result["software_version"])
350
  ToStdout("Internode protocol: %s", result["protocol_version"])
351
  ToStdout("Configuration format: %s", result["config_version"])
352
  ToStdout("OS api version: %s", result["os_api_version"])
353
  ToStdout("Export interface: %s", result["export_version"])
354
  ToStdout("VCS version: %s", result["vcs_version"])
355
  return 0
356

    
357

    
358
def ShowClusterMaster(opts, args):
359
  """Write name of master node to the standard output.
360

361
  @param opts: the command line options selected by the user
362
  @type args: list
363
  @param args: should be an empty list
364
  @rtype: int
365
  @return: the desired exit code
366

367
  """
368
  master = bootstrap.GetMaster()
369
  ToStdout(master)
370
  return 0
371

    
372

    
373
def _FormatGroupedParams(paramsdict, roman=False):
374
  """Format Grouped parameters (be, nic, disk) by group.
375

376
  @type paramsdict: dict of dicts
377
  @param paramsdict: {group: {param: value, ...}, ...}
378
  @rtype: dict of dicts
379
  @return: copy of the input dictionaries with strings as values
380

381
  """
382
  ret = {}
383
  for (item, val) in paramsdict.items():
384
    if isinstance(val, dict):
385
      ret[item] = _FormatGroupedParams(val, roman=roman)
386
    elif roman and isinstance(val, int):
387
      ret[item] = compat.TryToRoman(val)
388
    else:
389
      ret[item] = str(val)
390
  return ret
391

    
392

    
393
def ShowClusterConfig(opts, args):
394
  """Shows cluster information.
395

396
  @param opts: the command line options selected by the user
397
  @type args: list
398
  @param args: should be an empty list
399
  @rtype: int
400
  @return: the desired exit code
401

402
  """
403
  cl = GetClient(query=True)
404
  result = cl.QueryClusterInfo()
405

    
406
  if result["tags"]:
407
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
408
  else:
409
    tags = "(none)"
410
  if result["reserved_lvs"]:
411
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
412
  else:
413
    reserved_lvs = "(none)"
414

    
415
  enabled_hv = result["enabled_hypervisors"]
416
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
417
                  if k in enabled_hv)
418

    
419
  info = [
420
    ("Cluster name", result["name"]),
421
    ("Cluster UUID", result["uuid"]),
422

    
423
    ("Creation time", utils.FormatTime(result["ctime"])),
424
    ("Modification time", utils.FormatTime(result["mtime"])),
425

    
426
    ("Master node", result["master"]),
427

    
428
    ("Architecture (this node)",
429
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
430

    
431
    ("Tags", tags),
432

    
433
    ("Default hypervisor", result["default_hypervisor"]),
434
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
435

    
436
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
437

    
438
    ("OS-specific hypervisor parameters",
439
     _FormatGroupedParams(result["os_hvp"])),
440

    
441
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
442

    
443
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
444
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
445

    
446
    ("Cluster parameters", [
447
      ("candidate pool size",
448
       compat.TryToRoman(result["candidate_pool_size"],
449
                         convert=opts.roman_integers)),
450
      ("master netdev", result["master_netdev"]),
451
      ("master netmask", result["master_netmask"]),
452
      ("use external master IP address setup script",
453
       result["use_external_mip_script"]),
454
      ("lvm volume group", result["volume_group_name"]),
455
      ("lvm reserved volumes", reserved_lvs),
456
      ("drbd usermode helper", result["drbd_usermode_helper"]),
457
      ("file storage path", result["file_storage_dir"]),
458
      ("shared file storage path", result["shared_file_storage_dir"]),
459
      ("maintenance of node health", result["maintain_node_health"]),
460
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
461
      ("default instance allocator", result["default_iallocator"]),
462
      ("primary ip version", result["primary_ip_version"]),
463
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
464
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
465
      ("ExtStorage Providers search path",
466
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
467
      ("enabled disk templates",
468
       utils.CommaJoin(result["enabled_disk_templates"])),
469
      ]),
470

    
471
    ("Default node parameters",
472
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
473

    
474
    ("Default instance parameters",
475
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
476

    
477
    ("Default nic parameters",
478
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
479

    
480
    ("Default disk parameters",
481
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
482

    
483
    ("Instance policy - limits for instances",
484
     FormatPolicyInfo(result["ipolicy"], None, True)),
485
    ]
486

    
487
  PrintGenericInfo(info)
488
  return 0
489

    
490

    
491
def ClusterCopyFile(opts, args):
492
  """Copy a file from master to some nodes.
493

494
  @param opts: the command line options selected by the user
495
  @type args: list
496
  @param args: should contain only one element, the path of
497
      the file to be copied
498
  @rtype: int
499
  @return: the desired exit code
500

501
  """
502
  filename = args[0]
503
  if not os.path.exists(filename):
504
    raise errors.OpPrereqError("No such filename '%s'" % filename,
505
                               errors.ECODE_INVAL)
506

    
507
  cl = GetClient()
508

    
509
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
510

    
511
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
512
                           secondary_ips=opts.use_replication_network,
513
                           nodegroup=opts.nodegroup)
514

    
515
  srun = ssh.SshRunner(cluster_name)
516
  for node in results:
517
    if not srun.CopyFileToNode(node, filename):
518
      ToStderr("Copy of file %s to node %s failed", filename, node)
519

    
520
  return 0
521

    
522

    
523
def RunClusterCommand(opts, args):
524
  """Run a command on some nodes.
525

526
  @param opts: the command line options selected by the user
527
  @type args: list
528
  @param args: should contain the command to be run and its arguments
529
  @rtype: int
530
  @return: the desired exit code
531

532
  """
533
  cl = GetClient()
534

    
535
  command = " ".join(args)
536

    
537
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
538

    
539
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
540
                                                    "master_node"])
541

    
542
  srun = ssh.SshRunner(cluster_name=cluster_name)
543

    
544
  # Make sure master node is at list end
545
  if master_node in nodes:
546
    nodes.remove(master_node)
547
    nodes.append(master_node)
548

    
549
  for name in nodes:
550
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
551

    
552
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
553
      # Do not output anything for successful commands
554
      continue
555

    
556
    ToStdout("------------------------------------------------")
557
    if opts.show_machine_names:
558
      for line in result.output.splitlines():
559
        ToStdout("%s: %s", name, line)
560
    else:
561
      ToStdout("node: %s", name)
562
      ToStdout("%s", result.output)
563
    ToStdout("return code = %s", result.exit_code)
564

    
565
  return 0
566

    
567

    
568
def VerifyCluster(opts, args):
569
  """Verify integrity of cluster, performing various test on nodes.
570

571
  @param opts: the command line options selected by the user
572
  @type args: list
573
  @param args: should be an empty list
574
  @rtype: int
575
  @return: the desired exit code
576

577
  """
578
  skip_checks = []
579

    
580
  if opts.skip_nplusone_mem:
581
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
582

    
583
  cl = GetClient()
584

    
585
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
586
                               error_codes=opts.error_codes,
587
                               debug_simulate_errors=opts.simulate_errors,
588
                               skip_checks=skip_checks,
589
                               ignore_errors=opts.ignore_errors,
590
                               group_name=opts.nodegroup)
591
  result = SubmitOpCode(op, cl=cl, opts=opts)
592

    
593
  # Keep track of submitted jobs
594
  jex = JobExecutor(cl=cl, opts=opts)
595

    
596
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
597
    jex.AddJobId(None, status, job_id)
598

    
599
  results = jex.GetResults()
600

    
601
  (bad_jobs, bad_results) = \
602
    map(len,
603
        # Convert iterators to lists
604
        map(list,
605
            # Count errors
606
            map(compat.partial(itertools.ifilterfalse, bool),
607
                # Convert result to booleans in a tuple
608
                zip(*((job_success, len(op_results) == 1 and op_results[0])
609
                      for (job_success, op_results) in results)))))
610

    
611
  if bad_jobs == 0 and bad_results == 0:
612
    rcode = constants.EXIT_SUCCESS
613
  else:
614
    rcode = constants.EXIT_FAILURE
615
    if bad_jobs > 0:
616
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
617

    
618
  return rcode
619

    
620

    
621
def VerifyDisks(opts, args):
622
  """Verify integrity of cluster disks.
623

624
  @param opts: the command line options selected by the user
625
  @type args: list
626
  @param args: should be an empty list
627
  @rtype: int
628
  @return: the desired exit code
629

630
  """
631
  cl = GetClient()
632

    
633
  op = opcodes.OpClusterVerifyDisks()
634

    
635
  result = SubmitOpCode(op, cl=cl, opts=opts)
636

    
637
  # Keep track of submitted jobs
638
  jex = JobExecutor(cl=cl, opts=opts)
639

    
640
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
641
    jex.AddJobId(None, status, job_id)
642

    
643
  retcode = constants.EXIT_SUCCESS
644

    
645
  for (status, result) in jex.GetResults():
646
    if not status:
647
      ToStdout("Job failed: %s", result)
648
      continue
649

    
650
    ((bad_nodes, instances, missing), ) = result
651

    
652
    for node, text in bad_nodes.items():
653
      ToStdout("Error gathering data on node %s: %s",
654
               node, utils.SafeEncode(text[-400:]))
655
      retcode = constants.EXIT_FAILURE
656
      ToStdout("You need to fix these nodes first before fixing instances")
657

    
658
    for iname in instances:
659
      if iname in missing:
660
        continue
661
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
662
      try:
663
        ToStdout("Activating disks for instance '%s'", iname)
664
        SubmitOpCode(op, opts=opts, cl=cl)
665
      except errors.GenericError, err:
666
        nret, msg = FormatError(err)
667
        retcode |= nret
668
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
669

    
670
    if missing:
671
      for iname, ival in missing.iteritems():
672
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
673
        if all_missing:
674
          ToStdout("Instance %s cannot be verified as it lives on"
675
                   " broken nodes", iname)
676
        else:
677
          ToStdout("Instance %s has missing logical volumes:", iname)
678
          ival.sort()
679
          for node, vol in ival:
680
            if node in bad_nodes:
681
              ToStdout("\tbroken node %s /dev/%s", node, vol)
682
            else:
683
              ToStdout("\t%s /dev/%s", node, vol)
684

    
685
      ToStdout("You need to replace or recreate disks for all the above"
686
               " instances if this message persists after fixing broken nodes.")
687
      retcode = constants.EXIT_FAILURE
688
    elif not instances:
689
      ToStdout("No disks need to be activated.")
690

    
691
  return retcode
692

    
693

    
694
def RepairDiskSizes(opts, args):
695
  """Verify sizes of cluster disks.
696

697
  @param opts: the command line options selected by the user
698
  @type args: list
699
  @param args: optional list of instances to restrict check to
700
  @rtype: int
701
  @return: the desired exit code
702

703
  """
704
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
705
  SubmitOpCode(op, opts=opts)
706

    
707

    
708
@UsesRPC
709
def MasterFailover(opts, args):
710
  """Failover the master node.
711

712
  This command, when run on a non-master node, will cause the current
713
  master to cease being master, and the non-master to become new
714
  master.
715

716
  @param opts: the command line options selected by the user
717
  @type args: list
718
  @param args: should be an empty list
719
  @rtype: int
720
  @return: the desired exit code
721

722
  """
723
  if opts.no_voting and not opts.yes_do_it:
724
    usertext = ("This will perform the failover even if most other nodes"
725
                " are down, or if this node is outdated. This is dangerous"
726
                " as it can lead to a non-consistent cluster. Check the"
727
                " gnt-cluster(8) man page before proceeding. Continue?")
728
    if not AskUser(usertext):
729
      return 1
730

    
731
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
732

    
733

    
734
def MasterPing(opts, args):
735
  """Checks if the master is alive.
736

737
  @param opts: the command line options selected by the user
738
  @type args: list
739
  @param args: should be an empty list
740
  @rtype: int
741
  @return: the desired exit code
742

743
  """
744
  try:
745
    cl = GetClient()
746
    cl.QueryClusterInfo()
747
    return 0
748
  except Exception: # pylint: disable=W0703
749
    return 1
750

    
751

    
752
def SearchTags(opts, args):
753
  """Searches the tags on all the cluster.
754

755
  @param opts: the command line options selected by the user
756
  @type args: list
757
  @param args: should contain only one element, the tag pattern
758
  @rtype: int
759
  @return: the desired exit code
760

761
  """
762
  op = opcodes.OpTagsSearch(pattern=args[0])
763
  result = SubmitOpCode(op, opts=opts)
764
  if not result:
765
    return 1
766
  result = list(result)
767
  result.sort()
768
  for path, tag in result:
769
    ToStdout("%s %s", path, tag)
770

    
771

    
772
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
773
  """Reads and verifies an X509 certificate.
774

775
  @type cert_filename: string
776
  @param cert_filename: the path of the file containing the certificate to
777
                        verify encoded in PEM format
778
  @type verify_private_key: bool
779
  @param verify_private_key: whether to verify the private key in addition to
780
                             the public certificate
781
  @rtype: string
782
  @return: a string containing the PEM-encoded certificate.
783

784
  """
785
  try:
786
    pem = utils.ReadFile(cert_filename)
787
  except IOError, err:
788
    raise errors.X509CertError(cert_filename,
789
                               "Unable to read certificate: %s" % str(err))
790

    
791
  try:
792
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
793
  except Exception, err:
794
    raise errors.X509CertError(cert_filename,
795
                               "Unable to load certificate: %s" % str(err))
796

    
797
  if verify_private_key:
798
    try:
799
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
800
    except Exception, err:
801
      raise errors.X509CertError(cert_filename,
802
                                 "Unable to load private key: %s" % str(err))
803

    
804
  return pem
805

    
806

    
807
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
808
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
809
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
810
                 cds_filename, force):
811
  """Renews cluster certificates, keys and secrets.
812

813
  @type new_cluster_cert: bool
814
  @param new_cluster_cert: Whether to generate a new cluster certificate
815
  @type new_rapi_cert: bool
816
  @param new_rapi_cert: Whether to generate a new RAPI certificate
817
  @type rapi_cert_filename: string
818
  @param rapi_cert_filename: Path to file containing new RAPI certificate
819
  @type new_spice_cert: bool
820
  @param new_spice_cert: Whether to generate a new SPICE certificate
821
  @type spice_cert_filename: string
822
  @param spice_cert_filename: Path to file containing new SPICE certificate
823
  @type spice_cacert_filename: string
824
  @param spice_cacert_filename: Path to file containing the certificate of the
825
                                CA that signed the SPICE certificate
826
  @type new_confd_hmac_key: bool
827
  @param new_confd_hmac_key: Whether to generate a new HMAC key
828
  @type new_cds: bool
829
  @param new_cds: Whether to generate a new cluster domain secret
830
  @type cds_filename: string
831
  @param cds_filename: Path to file containing new cluster domain secret
832
  @type force: bool
833
  @param force: Whether to ask user for confirmation
834

835
  """
836
  if new_rapi_cert and rapi_cert_filename:
837
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
838
             " options can be specified at the same time.")
839
    return 1
840

    
841
  if new_cds and cds_filename:
842
    ToStderr("Only one of the --new-cluster-domain-secret and"
843
             " --cluster-domain-secret options can be specified at"
844
             " the same time.")
845
    return 1
846

    
847
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
848
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
849
             " and --spice-ca-certificate must not be used.")
850
    return 1
851

    
852
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
853
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
854
             " specified.")
855
    return 1
856

    
857
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
858
  try:
859
    if rapi_cert_filename:
860
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
861
    if spice_cert_filename:
862
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
863
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
864
  except errors.X509CertError, err:
865
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
866
    return 1
867

    
868
  if cds_filename:
869
    try:
870
      cds = utils.ReadFile(cds_filename)
871
    except Exception, err: # pylint: disable=W0703
872
      ToStderr("Can't load new cluster domain secret from %s: %s" %
873
               (cds_filename, str(err)))
874
      return 1
875
  else:
876
    cds = None
877

    
878
  if not force:
879
    usertext = ("This requires all daemons on all nodes to be restarted and"
880
                " may take some time. Continue?")
881
    if not AskUser(usertext):
882
      return 1
883

    
884
  def _RenewCryptoInner(ctx):
885
    ctx.feedback_fn("Updating certificates and keys")
886
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
887
                                    new_rapi_cert,
888
                                    new_spice_cert,
889
                                    new_confd_hmac_key,
890
                                    new_cds,
891
                                    rapi_cert_pem=rapi_cert_pem,
892
                                    spice_cert_pem=spice_cert_pem,
893
                                    spice_cacert_pem=spice_cacert_pem,
894
                                    cds=cds)
895

    
896
    files_to_copy = []
897

    
898
    if new_cluster_cert:
899
      files_to_copy.append(pathutils.NODED_CERT_FILE)
900

    
901
    if new_rapi_cert or rapi_cert_pem:
902
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
903

    
904
    if new_spice_cert or spice_cert_pem:
905
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
906
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
907

    
908
    if new_confd_hmac_key:
909
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
910

    
911
    if new_cds or cds:
912
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
913

    
914
    if files_to_copy:
915
      for node_name in ctx.nonmaster_nodes:
916
        ctx.feedback_fn("Copying %s to %s" %
917
                        (", ".join(files_to_copy), node_name))
918
        for file_name in files_to_copy:
919
          ctx.ssh.CopyFileToNode(node_name, file_name)
920

    
921
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
922

    
923
  ToStdout("All requested certificates and keys have been replaced."
924
           " Running \"gnt-cluster verify\" now is recommended.")
925

    
926
  return 0
927

    
928

    
929
def RenewCrypto(opts, args):
930
  """Renews cluster certificates, keys and secrets.
931

932
  """
933
  return _RenewCrypto(opts.new_cluster_cert,
934
                      opts.new_rapi_cert,
935
                      opts.rapi_cert,
936
                      opts.new_spice_cert,
937
                      opts.spice_cert,
938
                      opts.spice_cacert,
939
                      opts.new_confd_hmac_key,
940
                      opts.new_cluster_domain_secret,
941
                      opts.cluster_domain_secret,
942
                      opts.force)
943

    
944

    
945
def SetClusterParams(opts, args):
946
  """Modify the cluster.
947

948
  @param opts: the command line options selected by the user
949
  @type args: list
950
  @param args: should be an empty list
951
  @rtype: int
952
  @return: the desired exit code
953

954
  """
955
  if not (not opts.lvm_storage or opts.vg_name or
956
          not opts.drbd_storage or opts.drbd_helper or
957
          opts.enabled_hypervisors or opts.hvparams or
958
          opts.beparams or opts.nicparams or
959
          opts.ndparams or opts.diskparams or
960
          opts.candidate_pool_size is not None or
961
          opts.uid_pool is not None or
962
          opts.maintain_node_health is not None or
963
          opts.add_uids is not None or
964
          opts.remove_uids is not None or
965
          opts.default_iallocator is not None or
966
          opts.reserved_lvs is not None or
967
          opts.master_netdev is not None or
968
          opts.master_netmask is not None or
969
          opts.use_external_mip_script is not None or
970
          opts.prealloc_wipe_disks is not None or
971
          opts.hv_state or
972
          opts.enabled_disk_templates or
973
          opts.disk_state or
974
          opts.ipolicy_bounds_specs is not None or
975
          opts.ipolicy_std_specs is not None or
976
          opts.ipolicy_disk_templates is not None or
977
          opts.ipolicy_vcpu_ratio is not None or
978
          opts.ipolicy_spindle_ratio is not None or
979
          opts.modify_etc_hosts is not None):
980
    ToStderr("Please give at least one of the parameters.")
981
    return 1
982

    
983
  vg_name = opts.vg_name
984
  if not opts.lvm_storage and opts.vg_name:
985
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
986
    return 1
987

    
988
  if not opts.lvm_storage:
989
    vg_name = ""
990

    
991
  drbd_helper = opts.drbd_helper
992
  if not opts.drbd_storage and opts.drbd_helper:
993
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
994
    return 1
995

    
996
  if not opts.drbd_storage:
997
    drbd_helper = ""
998

    
999
  hvlist = opts.enabled_hypervisors
1000
  if hvlist is not None:
1001
    hvlist = hvlist.split(",")
1002

    
1003
  enabled_disk_templates = opts.enabled_disk_templates
1004
  if enabled_disk_templates:
1005
    enabled_disk_templates = enabled_disk_templates.split(",")
1006

    
1007
  # a list of (name, dict) we can pass directly to dict() (or [])
1008
  hvparams = dict(opts.hvparams)
1009
  for hv_params in hvparams.values():
1010
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1011

    
1012
  diskparams = dict(opts.diskparams)
1013

    
1014
  for dt_params in diskparams.values():
1015
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1016

    
1017
  beparams = opts.beparams
1018
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1019

    
1020
  nicparams = opts.nicparams
1021
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1022

    
1023
  ndparams = opts.ndparams
1024
  if ndparams is not None:
1025
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1026

    
1027
  ipolicy = CreateIPolicyFromOpts(
1028
    minmax_ispecs=opts.ipolicy_bounds_specs,
1029
    std_ispecs=opts.ipolicy_std_specs,
1030
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1031
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1032
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1033
    )
1034

    
1035
  mnh = opts.maintain_node_health
1036

    
1037
  uid_pool = opts.uid_pool
1038
  if uid_pool is not None:
1039
    uid_pool = uidpool.ParseUidPool(uid_pool)
1040

    
1041
  add_uids = opts.add_uids
1042
  if add_uids is not None:
1043
    add_uids = uidpool.ParseUidPool(add_uids)
1044

    
1045
  remove_uids = opts.remove_uids
1046
  if remove_uids is not None:
1047
    remove_uids = uidpool.ParseUidPool(remove_uids)
1048

    
1049
  if opts.reserved_lvs is not None:
1050
    if opts.reserved_lvs == "":
1051
      opts.reserved_lvs = []
1052
    else:
1053
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1054

    
1055
  if opts.master_netmask is not None:
1056
    try:
1057
      opts.master_netmask = int(opts.master_netmask)
1058
    except ValueError:
1059
      ToStderr("The --master-netmask option expects an int parameter.")
1060
      return 1
1061

    
1062
  ext_ip_script = opts.use_external_mip_script
1063

    
1064
  if opts.disk_state:
1065
    disk_state = utils.FlatToDict(opts.disk_state)
1066
  else:
1067
    disk_state = {}
1068

    
1069
  hv_state = dict(opts.hv_state)
1070

    
1071
  op = opcodes.OpClusterSetParams(
1072
    vg_name=vg_name,
1073
    drbd_helper=drbd_helper,
1074
    enabled_hypervisors=hvlist,
1075
    hvparams=hvparams,
1076
    os_hvp=None,
1077
    beparams=beparams,
1078
    nicparams=nicparams,
1079
    ndparams=ndparams,
1080
    diskparams=diskparams,
1081
    ipolicy=ipolicy,
1082
    candidate_pool_size=opts.candidate_pool_size,
1083
    maintain_node_health=mnh,
1084
    modify_etc_hosts=opts.modify_etc_hosts,
1085
    uid_pool=uid_pool,
1086
    add_uids=add_uids,
1087
    remove_uids=remove_uids,
1088
    default_iallocator=opts.default_iallocator,
1089
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1090
    master_netdev=opts.master_netdev,
1091
    master_netmask=opts.master_netmask,
1092
    reserved_lvs=opts.reserved_lvs,
1093
    use_external_mip_script=ext_ip_script,
1094
    hv_state=hv_state,
1095
    disk_state=disk_state,
1096
    enabled_disk_templates=enabled_disk_templates,
1097
    force=opts.force,
1098
    )
1099
  SubmitOrSend(op, opts)
1100
  return 0
1101

    
1102

    
1103
def QueueOps(opts, args):
1104
  """Queue operations.
1105

1106
  @param opts: the command line options selected by the user
1107
  @type args: list
1108
  @param args: should contain only one element, the subcommand
1109
  @rtype: int
1110
  @return: the desired exit code
1111

1112
  """
1113
  command = args[0]
1114
  client = GetClient()
1115
  if command in ("drain", "undrain"):
1116
    drain_flag = command == "drain"
1117
    client.SetQueueDrainFlag(drain_flag)
1118
  elif command == "info":
1119
    result = client.QueryConfigValues(["drain_flag"])
1120
    if result[0]:
1121
      val = "set"
1122
    else:
1123
      val = "unset"
1124
    ToStdout("The drain flag is %s" % val)
1125
  else:
1126
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1127
                               errors.ECODE_INVAL)
1128

    
1129
  return 0
1130

    
1131

    
1132
def _ShowWatcherPause(until):
1133
  if until is None or until < time.time():
1134
    ToStdout("The watcher is not paused.")
1135
  else:
1136
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1137

    
1138

    
1139
def WatcherOps(opts, args):
1140
  """Watcher operations.
1141

1142
  @param opts: the command line options selected by the user
1143
  @type args: list
1144
  @param args: should contain only one element, the subcommand
1145
  @rtype: int
1146
  @return: the desired exit code
1147

1148
  """
1149
  command = args[0]
1150
  client = GetClient()
1151

    
1152
  if command == "continue":
1153
    client.SetWatcherPause(None)
1154
    ToStdout("The watcher is no longer paused.")
1155

    
1156
  elif command == "pause":
1157
    if len(args) < 2:
1158
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1159

    
1160
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1161
    _ShowWatcherPause(result)
1162

    
1163
  elif command == "info":
1164
    result = client.QueryConfigValues(["watcher_pause"])
1165
    _ShowWatcherPause(result[0])
1166

    
1167
  else:
1168
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1169
                               errors.ECODE_INVAL)
1170

    
1171
  return 0
1172

    
1173

    
1174
def _OobPower(opts, node_list, power):
1175
  """Puts the node in the list to desired power state.
1176

1177
  @param opts: The command line options selected by the user
1178
  @param node_list: The list of nodes to operate on
1179
  @param power: True if they should be powered on, False otherwise
1180
  @return: The success of the operation (none failed)
1181

1182
  """
1183
  if power:
1184
    command = constants.OOB_POWER_ON
1185
  else:
1186
    command = constants.OOB_POWER_OFF
1187

    
1188
  op = opcodes.OpOobCommand(node_names=node_list,
1189
                            command=command,
1190
                            ignore_status=True,
1191
                            timeout=opts.oob_timeout,
1192
                            power_delay=opts.power_delay)
1193
  result = SubmitOpCode(op, opts=opts)
1194
  errs = 0
1195
  for node_result in result:
1196
    (node_tuple, data_tuple) = node_result
1197
    (_, node_name) = node_tuple
1198
    (data_status, _) = data_tuple
1199
    if data_status != constants.RS_NORMAL:
1200
      assert data_status != constants.RS_UNAVAIL
1201
      errs += 1
1202
      ToStderr("There was a problem changing power for %s, please investigate",
1203
               node_name)
1204

    
1205
  if errs > 0:
1206
    return False
1207

    
1208
  return True
1209

    
1210

    
1211
def _InstanceStart(opts, inst_list, start, no_remember=False):
1212
  """Puts the instances in the list to desired state.
1213

1214
  @param opts: The command line options selected by the user
1215
  @param inst_list: The list of instances to operate on
1216
  @param start: True if they should be started, False for shutdown
1217
  @param no_remember: If the instance state should be remembered
1218
  @return: The success of the operation (none failed)
1219

1220
  """
1221
  if start:
1222
    opcls = opcodes.OpInstanceStartup
1223
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1224
  else:
1225
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1226
                           timeout=opts.shutdown_timeout,
1227
                           no_remember=no_remember)
1228
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1229

    
1230
  jex = JobExecutor(opts=opts)
1231

    
1232
  for inst in inst_list:
1233
    ToStdout("Submit %s of instance %s", text_submit, inst)
1234
    op = opcls(instance_name=inst)
1235
    jex.QueueJob(inst, op)
1236

    
1237
  results = jex.GetResults()
1238
  bad_cnt = len([1 for (success, _) in results if not success])
1239

    
1240
  if bad_cnt == 0:
1241
    ToStdout("All instances have been %s successfully", text_success)
1242
  else:
1243
    ToStderr("There were errors while %s instances:\n"
1244
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1245
             len(results))
1246
    return False
1247

    
1248
  return True
1249

    
1250

    
1251
class _RunWhenNodesReachableHelper:
1252
  """Helper class to make shared internal state sharing easier.
1253

1254
  @ivar success: Indicates if all action_cb calls were successful
1255

1256
  """
1257
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1258
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1259
    """Init the object.
1260

1261
    @param node_list: The list of nodes to be reachable
1262
    @param action_cb: Callback called when a new host is reachable
1263
    @type node2ip: dict
1264
    @param node2ip: Node to ip mapping
1265
    @param port: The port to use for the TCP ping
1266
    @param feedback_fn: The function used for feedback
1267
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1268
    @param _sleep_fn: Function to sleep (for unittest use only)
1269

1270
    """
1271
    self.down = set(node_list)
1272
    self.up = set()
1273
    self.node2ip = node2ip
1274
    self.success = True
1275
    self.action_cb = action_cb
1276
    self.port = port
1277
    self.feedback_fn = feedback_fn
1278
    self._ping_fn = _ping_fn
1279
    self._sleep_fn = _sleep_fn
1280

    
1281
  def __call__(self):
1282
    """When called we run action_cb.
1283

1284
    @raises utils.RetryAgain: When there are still down nodes
1285

1286
    """
1287
    if not self.action_cb(self.up):
1288
      self.success = False
1289

    
1290
    if self.down:
1291
      raise utils.RetryAgain()
1292
    else:
1293
      return self.success
1294

    
1295
  def Wait(self, secs):
1296
    """Checks if a host is up or waits remaining seconds.
1297

1298
    @param secs: The secs remaining
1299

1300
    """
1301
    start = time.time()
1302
    for node in self.down:
1303
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1304
                       live_port_needed=True):
1305
        self.feedback_fn("Node %s became available" % node)
1306
        self.up.add(node)
1307
        self.down -= self.up
1308
        # If we have a node available there is the possibility to run the
1309
        # action callback successfully, therefore we don't wait and return
1310
        return
1311

    
1312
    self._sleep_fn(max(0.0, start + secs - time.time()))
1313

    
1314

    
1315
def _RunWhenNodesReachable(node_list, action_cb, interval):
1316
  """Run action_cb when nodes become reachable.
1317

1318
  @param node_list: The list of nodes to be reachable
1319
  @param action_cb: Callback called when a new host is reachable
1320
  @param interval: The earliest time to retry
1321

1322
  """
1323
  client = GetClient()
1324
  cluster_info = client.QueryClusterInfo()
1325
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1326
    family = netutils.IPAddress.family
1327
  else:
1328
    family = netutils.IP6Address.family
1329

    
1330
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1331
                 for node in node_list)
1332

    
1333
  port = netutils.GetDaemonPort(constants.NODED)
1334
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1335
                                        ToStdout)
1336

    
1337
  try:
1338
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1339
                       wait_fn=helper.Wait)
1340
  except utils.RetryTimeout:
1341
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1342
             " again:\n  - %s", "  - ".join(helper.down))
1343
    return False
1344

    
1345

    
1346
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1347
                          _instance_start_fn=_InstanceStart):
1348
  """Start the instances conditional based on node_states.
1349

1350
  @param opts: The command line options selected by the user
1351
  @param inst_map: A dict of inst -> nodes mapping
1352
  @param nodes_online: A list of nodes online
1353
  @param _instance_start_fn: Callback to start instances (unittest use only)
1354
  @return: Success of the operation on all instances
1355

1356
  """
1357
  start_inst_list = []
1358
  for (inst, nodes) in inst_map.items():
1359
    if not (nodes - nodes_online):
1360
      # All nodes the instance lives on are back online
1361
      start_inst_list.append(inst)
1362

    
1363
  for inst in start_inst_list:
1364
    del inst_map[inst]
1365

    
1366
  if start_inst_list:
1367
    return _instance_start_fn(opts, start_inst_list, True)
1368

    
1369
  return True
1370

    
1371

    
1372
def _EpoOn(opts, full_node_list, node_list, inst_map):
1373
  """Does the actual power on.
1374

1375
  @param opts: The command line options selected by the user
1376
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1377
                         OOB)
1378
  @param node_list: The list of nodes to operate on (all need to support OOB)
1379
  @param inst_map: A dict of inst -> nodes mapping
1380
  @return: The desired exit status
1381

1382
  """
1383
  if node_list and not _OobPower(opts, node_list, False):
1384
    ToStderr("Not all nodes seem to get back up, investigate and start"
1385
             " manually if needed")
1386

    
1387
  # Wait for the nodes to be back up
1388
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1389

    
1390
  ToStdout("Waiting until all nodes are available again")
1391
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1392
    ToStderr("Please investigate and start stopped instances manually")
1393
    return constants.EXIT_FAILURE
1394

    
1395
  return constants.EXIT_SUCCESS
1396

    
1397

    
1398
def _EpoOff(opts, node_list, inst_map):
1399
  """Does the actual power off.
1400

1401
  @param opts: The command line options selected by the user
1402
  @param node_list: The list of nodes to operate on (all need to support OOB)
1403
  @param inst_map: A dict of inst -> nodes mapping
1404
  @return: The desired exit status
1405

1406
  """
1407
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1408
    ToStderr("Please investigate and stop instances manually before continuing")
1409
    return constants.EXIT_FAILURE
1410

    
1411
  if not node_list:
1412
    return constants.EXIT_SUCCESS
1413

    
1414
  if _OobPower(opts, node_list, False):
1415
    return constants.EXIT_SUCCESS
1416
  else:
1417
    return constants.EXIT_FAILURE
1418

    
1419

    
1420
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1421
        _confirm_fn=ConfirmOperation,
1422
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1423
  """EPO operations.
1424

1425
  @param opts: the command line options selected by the user
1426
  @type args: list
1427
  @param args: should contain only one element, the subcommand
1428
  @rtype: int
1429
  @return: the desired exit code
1430

1431
  """
1432
  if opts.groups and opts.show_all:
1433
    _stderr_fn("Only one of --groups or --all are allowed")
1434
    return constants.EXIT_FAILURE
1435
  elif args and opts.show_all:
1436
    _stderr_fn("Arguments in combination with --all are not allowed")
1437
    return constants.EXIT_FAILURE
1438

    
1439
  if cl is None:
1440
    cl = GetClient()
1441

    
1442
  if opts.groups:
1443
    node_query_list = \
1444
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1445
  else:
1446
    node_query_list = args
1447

    
1448
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1449
                                           "sinst_list", "powered", "offline"],
1450
                         False)
1451

    
1452
  all_nodes = map(compat.fst, result)
1453
  node_list = []
1454
  inst_map = {}
1455
  for (node, master, pinsts, sinsts, powered, offline) in result:
1456
    if not offline:
1457
      for inst in (pinsts + sinsts):
1458
        if inst in inst_map:
1459
          if not master:
1460
            inst_map[inst].add(node)
1461
        elif master:
1462
          inst_map[inst] = set()
1463
        else:
1464
          inst_map[inst] = set([node])
1465

    
1466
    if master and opts.on:
1467
      # We ignore the master for turning on the machines, in fact we are
1468
      # already operating on the master at this point :)
1469
      continue
1470
    elif master and not opts.show_all:
1471
      _stderr_fn("%s is the master node, please do a master-failover to another"
1472
                 " node not affected by the EPO or use --all if you intend to"
1473
                 " shutdown the whole cluster", node)
1474
      return constants.EXIT_FAILURE
1475
    elif powered is None:
1476
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1477
                 " handled in a fully automated manner", node)
1478
    elif powered == opts.on:
1479
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1480
    elif not offline or (offline and powered):
1481
      node_list.append(node)
1482

    
1483
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1484
    return constants.EXIT_FAILURE
1485

    
1486
  if opts.on:
1487
    return _on_fn(opts, all_nodes, node_list, inst_map)
1488
  else:
1489
    return _off_fn(opts, node_list, inst_map)
1490

    
1491

    
1492
def _GetCreateCommand(info):
1493
  buf = StringIO()
1494
  buf.write("gnt-cluster init")
1495
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1496
  buf.write(" ")
1497
  buf.write(info["name"])
1498
  return buf.getvalue()
1499

    
1500

    
1501
def ShowCreateCommand(opts, args):
1502
  """Shows the command that can be used to re-create the cluster.
1503

1504
  Currently it works only for ipolicy specs.
1505

1506
  """
1507
  cl = GetClient(query=True)
1508
  result = cl.QueryClusterInfo()
1509
  ToStdout(_GetCreateCommand(result))
1510

    
1511

    
1512
commands = {
1513
  "init": (
1514
    InitCluster, [ArgHost(min=1, max=1)],
1515
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1516
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1517
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1518
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1519
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1520
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1521
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1522
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
1523
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
1524
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1525
  "destroy": (
1526
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1527
    "", "Destroy cluster"),
1528
  "rename": (
1529
    RenameCluster, [ArgHost(min=1, max=1)],
1530
    [FORCE_OPT, DRY_RUN_OPT],
1531
    "<new_name>",
1532
    "Renames the cluster"),
1533
  "redist-conf": (
1534
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1535
    "", "Forces a push of the configuration file and ssconf files"
1536
    " to the nodes in the cluster"),
1537
  "verify": (
1538
    VerifyCluster, ARGS_NONE,
1539
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1540
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1541
    "", "Does a check on the cluster configuration"),
1542
  "verify-disks": (
1543
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1544
    "", "Does a check on the cluster disk status"),
1545
  "repair-disk-sizes": (
1546
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1547
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1548
  "master-failover": (
1549
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1550
    "", "Makes the current node the master"),
1551
  "master-ping": (
1552
    MasterPing, ARGS_NONE, [],
1553
    "", "Checks if the master is alive"),
1554
  "version": (
1555
    ShowClusterVersion, ARGS_NONE, [],
1556
    "", "Shows the cluster version"),
1557
  "getmaster": (
1558
    ShowClusterMaster, ARGS_NONE, [],
1559
    "", "Shows the cluster master"),
1560
  "copyfile": (
1561
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1562
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1563
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1564
  "command": (
1565
    RunClusterCommand, [ArgCommand(min=1)],
1566
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1567
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1568
  "info": (
1569
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1570
    "[--roman]", "Show cluster configuration"),
1571
  "list-tags": (
1572
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1573
  "add-tags": (
1574
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1575
    "tag...", "Add tags to the cluster"),
1576
  "remove-tags": (
1577
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1578
    "tag...", "Remove tags from the cluster"),
1579
  "search-tags": (
1580
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1581
    "Searches the tags on all objects on"
1582
    " the cluster for a given pattern (regex)"),
1583
  "queue": (
1584
    QueueOps,
1585
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1586
    [], "drain|undrain|info", "Change queue properties"),
1587
  "watcher": (
1588
    WatcherOps,
1589
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1590
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1591
    [],
1592
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1593
  "modify": (
1594
    SetClusterParams, ARGS_NONE,
1595
    [FORCE_OPT,
1596
     BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1597
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1598
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1599
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1600
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1601
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1602
     DISK_STATE_OPT, SUBMIT_OPT, ENABLED_DISK_TEMPLATES_OPT,
1603
     IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] + INSTANCE_POLICY_OPTS,
1604
    "[opts...]",
1605
    "Alters the parameters of the cluster"),
1606
  "renew-crypto": (
1607
    RenewCrypto, ARGS_NONE,
1608
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1609
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1610
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1611
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1612
    "[opts...]",
1613
    "Renews cluster certificates, keys and secrets"),
1614
  "epo": (
1615
    Epo, [ArgUnknown()],
1616
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1617
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1618
    "[opts...] [args]",
1619
    "Performs an emergency power-off on given args"),
1620
  "activate-master-ip": (
1621
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1622
  "deactivate-master-ip": (
1623
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1624
    "Deactivates the master IP"),
1625
  "show-ispecs-cmd": (
1626
    ShowCreateCommand, ARGS_NONE, [], "",
1627
    "Show the command line to re-create the cluster"),
1628
  }
1629

    
1630

    
1631
#: dictionary with aliases for commands
1632
aliases = {
1633
  "masterfailover": "master-failover",
1634
  "show": "info",
1635
}
1636

    
1637

    
1638
def Main():
1639
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1640
                     aliases=aliases)