Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ df5e2a28

History | View | Annotate | Download (52 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os.path
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import pathutils
47

    
48

    
49
ON_OPT = cli_option("--on", default=False,
50
                    action="store_true", dest="on",
51
                    help="Recover from an EPO")
52

    
53
GROUPS_OPT = cli_option("--groups", default=False,
54
                        action="store_true", dest="groups",
55
                        help="Arguments are node groups instead of nodes")
56

    
57
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
58
                            help="Override interactive check for --no-voting",
59
                            default=False, action="store_true")
60

    
61
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
62
_EPO_PING_TIMEOUT = 1 # 1 second
63
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
64

    
65

    
66
@UsesRPC
67
def InitCluster(opts, args):
68
  """Initialize the cluster.
69

70
  @param opts: the command line options selected by the user
71
  @type args: list
72
  @param args: should contain only one element, the desired
73
      cluster name
74
  @rtype: int
75
  @return: the desired exit code
76

77
  """
78
  if not opts.lvm_storage and opts.vg_name:
79
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
80
    return 1
81

    
82
  vg_name = opts.vg_name
83
  if opts.lvm_storage and not opts.vg_name:
84
    vg_name = constants.DEFAULT_VG
85

    
86
  if not opts.drbd_storage and opts.drbd_helper:
87
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
88
    return 1
89

    
90
  drbd_helper = opts.drbd_helper
91
  if opts.drbd_storage and not opts.drbd_helper:
92
    drbd_helper = constants.DEFAULT_DRBD_HELPER
93

    
94
  master_netdev = opts.master_netdev
95
  if master_netdev is None:
96
    master_netdev = constants.DEFAULT_BRIDGE
97

    
98
  hvlist = opts.enabled_hypervisors
99
  if hvlist is None:
100
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
101
  hvlist = hvlist.split(",")
102

    
103
  hvparams = dict(opts.hvparams)
104
  beparams = opts.beparams
105
  nicparams = opts.nicparams
106

    
107
  diskparams = dict(opts.diskparams)
108

    
109
  # check the disk template types here, as we cannot rely on the type check done
110
  # by the opcode parameter types
111
  diskparams_keys = set(diskparams.keys())
112
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
113
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
114
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
115
    return 1
116

    
117
  # prepare beparams dict
118
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
119
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
120

    
121
  # prepare nicparams dict
122
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
123
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
124

    
125
  # prepare ndparams dict
126
  if opts.ndparams is None:
127
    ndparams = dict(constants.NDC_DEFAULTS)
128
  else:
129
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
130
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
131

    
132
  # prepare hvparams dict
133
  for hv in constants.HYPER_TYPES:
134
    if hv not in hvparams:
135
      hvparams[hv] = {}
136
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
137
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
138

    
139
  # prepare diskparams dict
140
  for templ in constants.DISK_TEMPLATES:
141
    if templ not in diskparams:
142
      diskparams[templ] = {}
143
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
144
                                         diskparams[templ])
145
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
146

    
147
  # prepare ipolicy dict
148
  ipolicy = CreateIPolicyFromOpts(
149
    ispecs_mem_size=opts.ispecs_mem_size,
150
    ispecs_cpu_count=opts.ispecs_cpu_count,
151
    ispecs_disk_count=opts.ispecs_disk_count,
152
    ispecs_disk_size=opts.ispecs_disk_size,
153
    ispecs_nic_count=opts.ispecs_nic_count,
154
    minmax_ispecs=opts.ipolicy_bounds_specs,
155
    std_ispecs=opts.ipolicy_std_specs,
156
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
157
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
158
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
159
    fill_all=True)
160

    
161
  if opts.candidate_pool_size is None:
162
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
163

    
164
  if opts.mac_prefix is None:
165
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
166

    
167
  uid_pool = opts.uid_pool
168
  if uid_pool is not None:
169
    uid_pool = uidpool.ParseUidPool(uid_pool)
170

    
171
  if opts.prealloc_wipe_disks is None:
172
    opts.prealloc_wipe_disks = False
173

    
174
  external_ip_setup_script = opts.use_external_mip_script
175
  if external_ip_setup_script is None:
176
    external_ip_setup_script = False
177

    
178
  try:
179
    primary_ip_version = int(opts.primary_ip_version)
180
  except (ValueError, TypeError), err:
181
    ToStderr("Invalid primary ip version value: %s" % str(err))
182
    return 1
183

    
184
  master_netmask = opts.master_netmask
185
  try:
186
    if master_netmask is not None:
187
      master_netmask = int(master_netmask)
188
  except (ValueError, TypeError), err:
189
    ToStderr("Invalid master netmask value: %s" % str(err))
190
    return 1
191

    
192
  if opts.disk_state:
193
    disk_state = utils.FlatToDict(opts.disk_state)
194
  else:
195
    disk_state = {}
196

    
197
  hv_state = dict(opts.hv_state)
198

    
199
  enabled_disk_templates = opts.enabled_disk_templates
200
  if enabled_disk_templates:
201
    enabled_disk_templates = enabled_disk_templates.split(",")
202
  else:
203
    enabled_disk_templates = list(constants.DEFAULT_ENABLED_DISK_TEMPLATES)
204

    
205
  bootstrap.InitCluster(cluster_name=args[0],
206
                        secondary_ip=opts.secondary_ip,
207
                        vg_name=vg_name,
208
                        mac_prefix=opts.mac_prefix,
209
                        master_netmask=master_netmask,
210
                        master_netdev=master_netdev,
211
                        file_storage_dir=opts.file_storage_dir,
212
                        shared_file_storage_dir=opts.shared_file_storage_dir,
213
                        enabled_hypervisors=hvlist,
214
                        hvparams=hvparams,
215
                        beparams=beparams,
216
                        nicparams=nicparams,
217
                        ndparams=ndparams,
218
                        diskparams=diskparams,
219
                        ipolicy=ipolicy,
220
                        candidate_pool_size=opts.candidate_pool_size,
221
                        modify_etc_hosts=opts.modify_etc_hosts,
222
                        modify_ssh_setup=opts.modify_ssh_setup,
223
                        maintain_node_health=opts.maintain_node_health,
224
                        drbd_helper=drbd_helper,
225
                        uid_pool=uid_pool,
226
                        default_iallocator=opts.default_iallocator,
227
                        primary_ip_version=primary_ip_version,
228
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
229
                        use_external_mip_script=external_ip_setup_script,
230
                        hv_state=hv_state,
231
                        disk_state=disk_state,
232
                        enabled_disk_templates=enabled_disk_templates,
233
                        )
234
  op = opcodes.OpClusterPostInit()
235
  SubmitOpCode(op, opts=opts)
236
  return 0
237

    
238

    
239
@UsesRPC
240
def DestroyCluster(opts, args):
241
  """Destroy the cluster.
242

243
  @param opts: the command line options selected by the user
244
  @type args: list
245
  @param args: should be an empty list
246
  @rtype: int
247
  @return: the desired exit code
248

249
  """
250
  if not opts.yes_do_it:
251
    ToStderr("Destroying a cluster is irreversible. If you really want"
252
             " destroy this cluster, supply the --yes-do-it option.")
253
    return 1
254

    
255
  op = opcodes.OpClusterDestroy()
256
  master = SubmitOpCode(op, opts=opts)
257
  # if we reached this, the opcode didn't fail; we can proceed to
258
  # shutdown all the daemons
259
  bootstrap.FinalizeClusterDestroy(master)
260
  return 0
261

    
262

    
263
def RenameCluster(opts, args):
264
  """Rename the cluster.
265

266
  @param opts: the command line options selected by the user
267
  @type args: list
268
  @param args: should contain only one element, the new cluster name
269
  @rtype: int
270
  @return: the desired exit code
271

272
  """
273
  cl = GetClient()
274

    
275
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
276

    
277
  new_name = args[0]
278
  if not opts.force:
279
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
280
                " connected over the network to the cluster name, the"
281
                " operation is very dangerous as the IP address will be"
282
                " removed from the node and the change may not go through."
283
                " Continue?") % (cluster_name, new_name)
284
    if not AskUser(usertext):
285
      return 1
286

    
287
  op = opcodes.OpClusterRename(name=new_name)
288
  result = SubmitOpCode(op, opts=opts, cl=cl)
289

    
290
  if result:
291
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
292

    
293
  return 0
294

    
295

    
296
def ActivateMasterIp(opts, args):
297
  """Activates the master IP.
298

299
  """
300
  op = opcodes.OpClusterActivateMasterIp()
301
  SubmitOpCode(op)
302
  return 0
303

    
304

    
305
def DeactivateMasterIp(opts, args):
306
  """Deactivates the master IP.
307

308
  """
309
  if not opts.confirm:
310
    usertext = ("This will disable the master IP. All the open connections to"
311
                " the master IP will be closed. To reach the master you will"
312
                " need to use its node IP."
313
                " Continue?")
314
    if not AskUser(usertext):
315
      return 1
316

    
317
  op = opcodes.OpClusterDeactivateMasterIp()
318
  SubmitOpCode(op)
319
  return 0
320

    
321

    
322
def RedistributeConfig(opts, args):
323
  """Forces push of the cluster configuration.
324

325
  @param opts: the command line options selected by the user
326
  @type args: list
327
  @param args: empty list
328
  @rtype: int
329
  @return: the desired exit code
330

331
  """
332
  op = opcodes.OpClusterRedistConf()
333
  SubmitOrSend(op, opts)
334
  return 0
335

    
336

    
337
def ShowClusterVersion(opts, args):
338
  """Write version of ganeti software to the standard output.
339

340
  @param opts: the command line options selected by the user
341
  @type args: list
342
  @param args: should be an empty list
343
  @rtype: int
344
  @return: the desired exit code
345

346
  """
347
  cl = GetClient(query=True)
348
  result = cl.QueryClusterInfo()
349
  ToStdout("Software version: %s", result["software_version"])
350
  ToStdout("Internode protocol: %s", result["protocol_version"])
351
  ToStdout("Configuration format: %s", result["config_version"])
352
  ToStdout("OS api version: %s", result["os_api_version"])
353
  ToStdout("Export interface: %s", result["export_version"])
354
  return 0
355

    
356

    
357
def ShowClusterMaster(opts, args):
358
  """Write name of master node to the standard output.
359

360
  @param opts: the command line options selected by the user
361
  @type args: list
362
  @param args: should be an empty list
363
  @rtype: int
364
  @return: the desired exit code
365

366
  """
367
  master = bootstrap.GetMaster()
368
  ToStdout(master)
369
  return 0
370

    
371

    
372
def _FormatGroupedParams(paramsdict, roman=False):
373
  """Format Grouped parameters (be, nic, disk) by group.
374

375
  @type paramsdict: dict of dicts
376
  @param paramsdict: {group: {param: value, ...}, ...}
377
  @rtype: dict of dicts
378
  @return: copy of the input dictionaries with strings as values
379

380
  """
381
  ret = {}
382
  for (item, val) in paramsdict.items():
383
    if isinstance(val, dict):
384
      ret[item] = _FormatGroupedParams(val, roman=roman)
385
    elif roman and isinstance(val, int):
386
      ret[item] = compat.TryToRoman(val)
387
    else:
388
      ret[item] = str(val)
389
  return ret
390

    
391

    
392
def ShowClusterConfig(opts, args):
393
  """Shows cluster information.
394

395
  @param opts: the command line options selected by the user
396
  @type args: list
397
  @param args: should be an empty list
398
  @rtype: int
399
  @return: the desired exit code
400

401
  """
402
  cl = GetClient(query=True)
403
  result = cl.QueryClusterInfo()
404

    
405
  if result["tags"]:
406
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
407
  else:
408
    tags = "(none)"
409
  if result["reserved_lvs"]:
410
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
411
  else:
412
    reserved_lvs = "(none)"
413

    
414
  enabled_hv = result["enabled_hypervisors"]
415
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
416
                  if k in enabled_hv)
417

    
418
  info = [
419
    ("Cluster name", result["name"]),
420
    ("Cluster UUID", result["uuid"]),
421

    
422
    ("Creation time", utils.FormatTime(result["ctime"])),
423
    ("Modification time", utils.FormatTime(result["mtime"])),
424

    
425
    ("Master node", result["master"]),
426

    
427
    ("Architecture (this node)",
428
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
429

    
430
    ("Tags", tags),
431

    
432
    ("Default hypervisor", result["default_hypervisor"]),
433
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
434

    
435
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
436

    
437
    ("OS-specific hypervisor parameters",
438
     _FormatGroupedParams(result["os_hvp"])),
439

    
440
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
441

    
442
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
443
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
444

    
445
    ("Cluster parameters", [
446
      ("candidate pool size",
447
       compat.TryToRoman(result["candidate_pool_size"],
448
                         convert=opts.roman_integers)),
449
      ("master netdev", result["master_netdev"]),
450
      ("master netmask", result["master_netmask"]),
451
      ("use external master IP address setup script",
452
       result["use_external_mip_script"]),
453
      ("lvm volume group", result["volume_group_name"]),
454
      ("lvm reserved volumes", reserved_lvs),
455
      ("drbd usermode helper", result["drbd_usermode_helper"]),
456
      ("file storage path", result["file_storage_dir"]),
457
      ("shared file storage path", result["shared_file_storage_dir"]),
458
      ("maintenance of node health", result["maintain_node_health"]),
459
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
460
      ("default instance allocator", result["default_iallocator"]),
461
      ("primary ip version", result["primary_ip_version"]),
462
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
463
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
464
      ("ExtStorage Providers search path",
465
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
466
      ("enabled disk templates",
467
       utils.CommaJoin(result["enabled_disk_templates"])),
468
      ]),
469

    
470
    ("Default node parameters",
471
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
472

    
473
    ("Default instance parameters",
474
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
475

    
476
    ("Default nic parameters",
477
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
478

    
479
    ("Default disk parameters",
480
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
481

    
482
    ("Instance policy - limits for instances",
483
     FormatPolicyInfo(result["ipolicy"], None, True)),
484
    ]
485

    
486
  PrintGenericInfo(info)
487
  return 0
488

    
489

    
490
def ClusterCopyFile(opts, args):
491
  """Copy a file from master to some nodes.
492

493
  @param opts: the command line options selected by the user
494
  @type args: list
495
  @param args: should contain only one element, the path of
496
      the file to be copied
497
  @rtype: int
498
  @return: the desired exit code
499

500
  """
501
  filename = args[0]
502
  if not os.path.exists(filename):
503
    raise errors.OpPrereqError("No such filename '%s'" % filename,
504
                               errors.ECODE_INVAL)
505

    
506
  cl = GetClient()
507

    
508
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
509

    
510
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
511
                           secondary_ips=opts.use_replication_network,
512
                           nodegroup=opts.nodegroup)
513

    
514
  srun = ssh.SshRunner(cluster_name)
515
  for node in results:
516
    if not srun.CopyFileToNode(node, filename):
517
      ToStderr("Copy of file %s to node %s failed", filename, node)
518

    
519
  return 0
520

    
521

    
522
def RunClusterCommand(opts, args):
523
  """Run a command on some nodes.
524

525
  @param opts: the command line options selected by the user
526
  @type args: list
527
  @param args: should contain the command to be run and its arguments
528
  @rtype: int
529
  @return: the desired exit code
530

531
  """
532
  cl = GetClient()
533

    
534
  command = " ".join(args)
535

    
536
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
537

    
538
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
539
                                                    "master_node"])
540

    
541
  srun = ssh.SshRunner(cluster_name=cluster_name)
542

    
543
  # Make sure master node is at list end
544
  if master_node in nodes:
545
    nodes.remove(master_node)
546
    nodes.append(master_node)
547

    
548
  for name in nodes:
549
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
550

    
551
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
552
      # Do not output anything for successful commands
553
      continue
554

    
555
    ToStdout("------------------------------------------------")
556
    if opts.show_machine_names:
557
      for line in result.output.splitlines():
558
        ToStdout("%s: %s", name, line)
559
    else:
560
      ToStdout("node: %s", name)
561
      ToStdout("%s", result.output)
562
    ToStdout("return code = %s", result.exit_code)
563

    
564
  return 0
565

    
566

    
567
def VerifyCluster(opts, args):
568
  """Verify integrity of cluster, performing various test on nodes.
569

570
  @param opts: the command line options selected by the user
571
  @type args: list
572
  @param args: should be an empty list
573
  @rtype: int
574
  @return: the desired exit code
575

576
  """
577
  skip_checks = []
578

    
579
  if opts.skip_nplusone_mem:
580
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
581

    
582
  cl = GetClient()
583

    
584
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
585
                               error_codes=opts.error_codes,
586
                               debug_simulate_errors=opts.simulate_errors,
587
                               skip_checks=skip_checks,
588
                               ignore_errors=opts.ignore_errors,
589
                               group_name=opts.nodegroup)
590
  result = SubmitOpCode(op, cl=cl, opts=opts)
591

    
592
  # Keep track of submitted jobs
593
  jex = JobExecutor(cl=cl, opts=opts)
594

    
595
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
596
    jex.AddJobId(None, status, job_id)
597

    
598
  results = jex.GetResults()
599

    
600
  (bad_jobs, bad_results) = \
601
    map(len,
602
        # Convert iterators to lists
603
        map(list,
604
            # Count errors
605
            map(compat.partial(itertools.ifilterfalse, bool),
606
                # Convert result to booleans in a tuple
607
                zip(*((job_success, len(op_results) == 1 and op_results[0])
608
                      for (job_success, op_results) in results)))))
609

    
610
  if bad_jobs == 0 and bad_results == 0:
611
    rcode = constants.EXIT_SUCCESS
612
  else:
613
    rcode = constants.EXIT_FAILURE
614
    if bad_jobs > 0:
615
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
616

    
617
  return rcode
618

    
619

    
620
def VerifyDisks(opts, args):
621
  """Verify integrity of cluster disks.
622

623
  @param opts: the command line options selected by the user
624
  @type args: list
625
  @param args: should be an empty list
626
  @rtype: int
627
  @return: the desired exit code
628

629
  """
630
  cl = GetClient()
631

    
632
  op = opcodes.OpClusterVerifyDisks()
633

    
634
  result = SubmitOpCode(op, cl=cl, opts=opts)
635

    
636
  # Keep track of submitted jobs
637
  jex = JobExecutor(cl=cl, opts=opts)
638

    
639
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
640
    jex.AddJobId(None, status, job_id)
641

    
642
  retcode = constants.EXIT_SUCCESS
643

    
644
  for (status, result) in jex.GetResults():
645
    if not status:
646
      ToStdout("Job failed: %s", result)
647
      continue
648

    
649
    ((bad_nodes, instances, missing), ) = result
650

    
651
    for node, text in bad_nodes.items():
652
      ToStdout("Error gathering data on node %s: %s",
653
               node, utils.SafeEncode(text[-400:]))
654
      retcode = constants.EXIT_FAILURE
655
      ToStdout("You need to fix these nodes first before fixing instances")
656

    
657
    for iname in instances:
658
      if iname in missing:
659
        continue
660
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
661
      try:
662
        ToStdout("Activating disks for instance '%s'", iname)
663
        SubmitOpCode(op, opts=opts, cl=cl)
664
      except errors.GenericError, err:
665
        nret, msg = FormatError(err)
666
        retcode |= nret
667
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
668

    
669
    if missing:
670
      for iname, ival in missing.iteritems():
671
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
672
        if all_missing:
673
          ToStdout("Instance %s cannot be verified as it lives on"
674
                   " broken nodes", iname)
675
        else:
676
          ToStdout("Instance %s has missing logical volumes:", iname)
677
          ival.sort()
678
          for node, vol in ival:
679
            if node in bad_nodes:
680
              ToStdout("\tbroken node %s /dev/%s", node, vol)
681
            else:
682
              ToStdout("\t%s /dev/%s", node, vol)
683

    
684
      ToStdout("You need to replace or recreate disks for all the above"
685
               " instances if this message persists after fixing broken nodes.")
686
      retcode = constants.EXIT_FAILURE
687
    elif not instances:
688
      ToStdout("No disks need to be activated.")
689

    
690
  return retcode
691

    
692

    
693
def RepairDiskSizes(opts, args):
694
  """Verify sizes of cluster disks.
695

696
  @param opts: the command line options selected by the user
697
  @type args: list
698
  @param args: optional list of instances to restrict check to
699
  @rtype: int
700
  @return: the desired exit code
701

702
  """
703
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
704
  SubmitOpCode(op, opts=opts)
705

    
706

    
707
@UsesRPC
708
def MasterFailover(opts, args):
709
  """Failover the master node.
710

711
  This command, when run on a non-master node, will cause the current
712
  master to cease being master, and the non-master to become new
713
  master.
714

715
  @param opts: the command line options selected by the user
716
  @type args: list
717
  @param args: should be an empty list
718
  @rtype: int
719
  @return: the desired exit code
720

721
  """
722
  if opts.no_voting and not opts.yes_do_it:
723
    usertext = ("This will perform the failover even if most other nodes"
724
                " are down, or if this node is outdated. This is dangerous"
725
                " as it can lead to a non-consistent cluster. Check the"
726
                " gnt-cluster(8) man page before proceeding. Continue?")
727
    if not AskUser(usertext):
728
      return 1
729

    
730
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
731

    
732

    
733
def MasterPing(opts, args):
734
  """Checks if the master is alive.
735

736
  @param opts: the command line options selected by the user
737
  @type args: list
738
  @param args: should be an empty list
739
  @rtype: int
740
  @return: the desired exit code
741

742
  """
743
  try:
744
    cl = GetClient()
745
    cl.QueryClusterInfo()
746
    return 0
747
  except Exception: # pylint: disable=W0703
748
    return 1
749

    
750

    
751
def SearchTags(opts, args):
752
  """Searches the tags on all the cluster.
753

754
  @param opts: the command line options selected by the user
755
  @type args: list
756
  @param args: should contain only one element, the tag pattern
757
  @rtype: int
758
  @return: the desired exit code
759

760
  """
761
  op = opcodes.OpTagsSearch(pattern=args[0])
762
  result = SubmitOpCode(op, opts=opts)
763
  if not result:
764
    return 1
765
  result = list(result)
766
  result.sort()
767
  for path, tag in result:
768
    ToStdout("%s %s", path, tag)
769

    
770

    
771
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
772
  """Reads and verifies an X509 certificate.
773

774
  @type cert_filename: string
775
  @param cert_filename: the path of the file containing the certificate to
776
                        verify encoded in PEM format
777
  @type verify_private_key: bool
778
  @param verify_private_key: whether to verify the private key in addition to
779
                             the public certificate
780
  @rtype: string
781
  @return: a string containing the PEM-encoded certificate.
782

783
  """
784
  try:
785
    pem = utils.ReadFile(cert_filename)
786
  except IOError, err:
787
    raise errors.X509CertError(cert_filename,
788
                               "Unable to read certificate: %s" % str(err))
789

    
790
  try:
791
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
792
  except Exception, err:
793
    raise errors.X509CertError(cert_filename,
794
                               "Unable to load certificate: %s" % str(err))
795

    
796
  if verify_private_key:
797
    try:
798
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
799
    except Exception, err:
800
      raise errors.X509CertError(cert_filename,
801
                                 "Unable to load private key: %s" % str(err))
802

    
803
  return pem
804

    
805

    
806
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
807
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
808
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
809
                 cds_filename, force):
810
  """Renews cluster certificates, keys and secrets.
811

812
  @type new_cluster_cert: bool
813
  @param new_cluster_cert: Whether to generate a new cluster certificate
814
  @type new_rapi_cert: bool
815
  @param new_rapi_cert: Whether to generate a new RAPI certificate
816
  @type rapi_cert_filename: string
817
  @param rapi_cert_filename: Path to file containing new RAPI certificate
818
  @type new_spice_cert: bool
819
  @param new_spice_cert: Whether to generate a new SPICE certificate
820
  @type spice_cert_filename: string
821
  @param spice_cert_filename: Path to file containing new SPICE certificate
822
  @type spice_cacert_filename: string
823
  @param spice_cacert_filename: Path to file containing the certificate of the
824
                                CA that signed the SPICE certificate
825
  @type new_confd_hmac_key: bool
826
  @param new_confd_hmac_key: Whether to generate a new HMAC key
827
  @type new_cds: bool
828
  @param new_cds: Whether to generate a new cluster domain secret
829
  @type cds_filename: string
830
  @param cds_filename: Path to file containing new cluster domain secret
831
  @type force: bool
832
  @param force: Whether to ask user for confirmation
833

834
  """
835
  if new_rapi_cert and rapi_cert_filename:
836
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
837
             " options can be specified at the same time.")
838
    return 1
839

    
840
  if new_cds and cds_filename:
841
    ToStderr("Only one of the --new-cluster-domain-secret and"
842
             " --cluster-domain-secret options can be specified at"
843
             " the same time.")
844
    return 1
845

    
846
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
847
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
848
             " and --spice-ca-certificate must not be used.")
849
    return 1
850

    
851
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
852
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
853
             " specified.")
854
    return 1
855

    
856
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
857
  try:
858
    if rapi_cert_filename:
859
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
860
    if spice_cert_filename:
861
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
862
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
863
  except errors.X509CertError, err:
864
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
865
    return 1
866

    
867
  if cds_filename:
868
    try:
869
      cds = utils.ReadFile(cds_filename)
870
    except Exception, err: # pylint: disable=W0703
871
      ToStderr("Can't load new cluster domain secret from %s: %s" %
872
               (cds_filename, str(err)))
873
      return 1
874
  else:
875
    cds = None
876

    
877
  if not force:
878
    usertext = ("This requires all daemons on all nodes to be restarted and"
879
                " may take some time. Continue?")
880
    if not AskUser(usertext):
881
      return 1
882

    
883
  def _RenewCryptoInner(ctx):
884
    ctx.feedback_fn("Updating certificates and keys")
885
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
886
                                    new_rapi_cert,
887
                                    new_spice_cert,
888
                                    new_confd_hmac_key,
889
                                    new_cds,
890
                                    rapi_cert_pem=rapi_cert_pem,
891
                                    spice_cert_pem=spice_cert_pem,
892
                                    spice_cacert_pem=spice_cacert_pem,
893
                                    cds=cds)
894

    
895
    files_to_copy = []
896

    
897
    if new_cluster_cert:
898
      files_to_copy.append(pathutils.NODED_CERT_FILE)
899

    
900
    if new_rapi_cert or rapi_cert_pem:
901
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
902

    
903
    if new_spice_cert or spice_cert_pem:
904
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
905
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
906

    
907
    if new_confd_hmac_key:
908
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
909

    
910
    if new_cds or cds:
911
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
912

    
913
    if files_to_copy:
914
      for node_name in ctx.nonmaster_nodes:
915
        ctx.feedback_fn("Copying %s to %s" %
916
                        (", ".join(files_to_copy), node_name))
917
        for file_name in files_to_copy:
918
          ctx.ssh.CopyFileToNode(node_name, file_name)
919

    
920
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
921

    
922
  ToStdout("All requested certificates and keys have been replaced."
923
           " Running \"gnt-cluster verify\" now is recommended.")
924

    
925
  return 0
926

    
927

    
928
def RenewCrypto(opts, args):
929
  """Renews cluster certificates, keys and secrets.
930

931
  """
932
  return _RenewCrypto(opts.new_cluster_cert,
933
                      opts.new_rapi_cert,
934
                      opts.rapi_cert,
935
                      opts.new_spice_cert,
936
                      opts.spice_cert,
937
                      opts.spice_cacert,
938
                      opts.new_confd_hmac_key,
939
                      opts.new_cluster_domain_secret,
940
                      opts.cluster_domain_secret,
941
                      opts.force)
942

    
943

    
944
def SetClusterParams(opts, args):
945
  """Modify the cluster.
946

947
  @param opts: the command line options selected by the user
948
  @type args: list
949
  @param args: should be an empty list
950
  @rtype: int
951
  @return: the desired exit code
952

953
  """
954
  if not (not opts.lvm_storage or opts.vg_name or
955
          not opts.drbd_storage or opts.drbd_helper or
956
          opts.enabled_hypervisors or opts.hvparams or
957
          opts.beparams or opts.nicparams or
958
          opts.ndparams or opts.diskparams or
959
          opts.candidate_pool_size is not None or
960
          opts.uid_pool is not None or
961
          opts.maintain_node_health is not None or
962
          opts.add_uids is not None or
963
          opts.remove_uids is not None or
964
          opts.default_iallocator is not None or
965
          opts.reserved_lvs is not None or
966
          opts.master_netdev is not None or
967
          opts.master_netmask is not None or
968
          opts.use_external_mip_script is not None or
969
          opts.prealloc_wipe_disks is not None or
970
          opts.hv_state or
971
          opts.enabled_disk_templates or
972
          opts.disk_state or
973
          opts.ipolicy_bounds_specs is not None or
974
          opts.ipolicy_std_specs is not None or
975
          opts.ipolicy_disk_templates is not None or
976
          opts.ipolicy_vcpu_ratio is not None or
977
          opts.ipolicy_spindle_ratio is not None):
978
    ToStderr("Please give at least one of the parameters.")
979
    return 1
980

    
981
  vg_name = opts.vg_name
982
  if not opts.lvm_storage and opts.vg_name:
983
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
984
    return 1
985

    
986
  if not opts.lvm_storage:
987
    vg_name = ""
988

    
989
  drbd_helper = opts.drbd_helper
990
  if not opts.drbd_storage and opts.drbd_helper:
991
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
992
    return 1
993

    
994
  if not opts.drbd_storage:
995
    drbd_helper = ""
996

    
997
  hvlist = opts.enabled_hypervisors
998
  if hvlist is not None:
999
    hvlist = hvlist.split(",")
1000

    
1001
  enabled_disk_templates = opts.enabled_disk_templates
1002
  if enabled_disk_templates:
1003
    enabled_disk_templates = enabled_disk_templates.split(",")
1004

    
1005
  # a list of (name, dict) we can pass directly to dict() (or [])
1006
  hvparams = dict(opts.hvparams)
1007
  for hv_params in hvparams.values():
1008
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1009

    
1010
  diskparams = dict(opts.diskparams)
1011

    
1012
  for dt_params in diskparams.values():
1013
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1014

    
1015
  beparams = opts.beparams
1016
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1017

    
1018
  nicparams = opts.nicparams
1019
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1020

    
1021
  ndparams = opts.ndparams
1022
  if ndparams is not None:
1023
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1024

    
1025
  ipolicy = CreateIPolicyFromOpts(
1026
    minmax_ispecs=opts.ipolicy_bounds_specs,
1027
    std_ispecs=opts.ipolicy_std_specs,
1028
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1029
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1030
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1031
    )
1032

    
1033
  mnh = opts.maintain_node_health
1034

    
1035
  uid_pool = opts.uid_pool
1036
  if uid_pool is not None:
1037
    uid_pool = uidpool.ParseUidPool(uid_pool)
1038

    
1039
  add_uids = opts.add_uids
1040
  if add_uids is not None:
1041
    add_uids = uidpool.ParseUidPool(add_uids)
1042

    
1043
  remove_uids = opts.remove_uids
1044
  if remove_uids is not None:
1045
    remove_uids = uidpool.ParseUidPool(remove_uids)
1046

    
1047
  if opts.reserved_lvs is not None:
1048
    if opts.reserved_lvs == "":
1049
      opts.reserved_lvs = []
1050
    else:
1051
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1052

    
1053
  if opts.master_netmask is not None:
1054
    try:
1055
      opts.master_netmask = int(opts.master_netmask)
1056
    except ValueError:
1057
      ToStderr("The --master-netmask option expects an int parameter.")
1058
      return 1
1059

    
1060
  ext_ip_script = opts.use_external_mip_script
1061

    
1062
  if opts.disk_state:
1063
    disk_state = utils.FlatToDict(opts.disk_state)
1064
  else:
1065
    disk_state = {}
1066

    
1067
  hv_state = dict(opts.hv_state)
1068

    
1069
  op = opcodes.OpClusterSetParams(
1070
    vg_name=vg_name,
1071
    drbd_helper=drbd_helper,
1072
    enabled_hypervisors=hvlist,
1073
    hvparams=hvparams,
1074
    os_hvp=None,
1075
    beparams=beparams,
1076
    nicparams=nicparams,
1077
    ndparams=ndparams,
1078
    diskparams=diskparams,
1079
    ipolicy=ipolicy,
1080
    candidate_pool_size=opts.candidate_pool_size,
1081
    maintain_node_health=mnh,
1082
    uid_pool=uid_pool,
1083
    add_uids=add_uids,
1084
    remove_uids=remove_uids,
1085
    default_iallocator=opts.default_iallocator,
1086
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1087
    master_netdev=opts.master_netdev,
1088
    master_netmask=opts.master_netmask,
1089
    reserved_lvs=opts.reserved_lvs,
1090
    use_external_mip_script=ext_ip_script,
1091
    hv_state=hv_state,
1092
    disk_state=disk_state,
1093
    enabled_disk_templates=enabled_disk_templates,
1094
    )
1095
  SubmitOrSend(op, opts)
1096
  return 0
1097

    
1098

    
1099
def QueueOps(opts, args):
1100
  """Queue operations.
1101

1102
  @param opts: the command line options selected by the user
1103
  @type args: list
1104
  @param args: should contain only one element, the subcommand
1105
  @rtype: int
1106
  @return: the desired exit code
1107

1108
  """
1109
  command = args[0]
1110
  client = GetClient()
1111
  if command in ("drain", "undrain"):
1112
    drain_flag = command == "drain"
1113
    client.SetQueueDrainFlag(drain_flag)
1114
  elif command == "info":
1115
    result = client.QueryConfigValues(["drain_flag"])
1116
    if result[0]:
1117
      val = "set"
1118
    else:
1119
      val = "unset"
1120
    ToStdout("The drain flag is %s" % val)
1121
  else:
1122
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1123
                               errors.ECODE_INVAL)
1124

    
1125
  return 0
1126

    
1127

    
1128
def _ShowWatcherPause(until):
1129
  if until is None or until < time.time():
1130
    ToStdout("The watcher is not paused.")
1131
  else:
1132
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1133

    
1134

    
1135
def WatcherOps(opts, args):
1136
  """Watcher operations.
1137

1138
  @param opts: the command line options selected by the user
1139
  @type args: list
1140
  @param args: should contain only one element, the subcommand
1141
  @rtype: int
1142
  @return: the desired exit code
1143

1144
  """
1145
  command = args[0]
1146
  client = GetClient()
1147

    
1148
  if command == "continue":
1149
    client.SetWatcherPause(None)
1150
    ToStdout("The watcher is no longer paused.")
1151

    
1152
  elif command == "pause":
1153
    if len(args) < 2:
1154
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1155

    
1156
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1157
    _ShowWatcherPause(result)
1158

    
1159
  elif command == "info":
1160
    result = client.QueryConfigValues(["watcher_pause"])
1161
    _ShowWatcherPause(result[0])
1162

    
1163
  else:
1164
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1165
                               errors.ECODE_INVAL)
1166

    
1167
  return 0
1168

    
1169

    
1170
def _OobPower(opts, node_list, power):
1171
  """Puts the node in the list to desired power state.
1172

1173
  @param opts: The command line options selected by the user
1174
  @param node_list: The list of nodes to operate on
1175
  @param power: True if they should be powered on, False otherwise
1176
  @return: The success of the operation (none failed)
1177

1178
  """
1179
  if power:
1180
    command = constants.OOB_POWER_ON
1181
  else:
1182
    command = constants.OOB_POWER_OFF
1183

    
1184
  op = opcodes.OpOobCommand(node_names=node_list,
1185
                            command=command,
1186
                            ignore_status=True,
1187
                            timeout=opts.oob_timeout,
1188
                            power_delay=opts.power_delay)
1189
  result = SubmitOpCode(op, opts=opts)
1190
  errs = 0
1191
  for node_result in result:
1192
    (node_tuple, data_tuple) = node_result
1193
    (_, node_name) = node_tuple
1194
    (data_status, _) = data_tuple
1195
    if data_status != constants.RS_NORMAL:
1196
      assert data_status != constants.RS_UNAVAIL
1197
      errs += 1
1198
      ToStderr("There was a problem changing power for %s, please investigate",
1199
               node_name)
1200

    
1201
  if errs > 0:
1202
    return False
1203

    
1204
  return True
1205

    
1206

    
1207
def _InstanceStart(opts, inst_list, start, no_remember=False):
1208
  """Puts the instances in the list to desired state.
1209

1210
  @param opts: The command line options selected by the user
1211
  @param inst_list: The list of instances to operate on
1212
  @param start: True if they should be started, False for shutdown
1213
  @param no_remember: If the instance state should be remembered
1214
  @return: The success of the operation (none failed)
1215

1216
  """
1217
  if start:
1218
    opcls = opcodes.OpInstanceStartup
1219
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1220
  else:
1221
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1222
                           timeout=opts.shutdown_timeout,
1223
                           no_remember=no_remember)
1224
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1225

    
1226
  jex = JobExecutor(opts=opts)
1227

    
1228
  for inst in inst_list:
1229
    ToStdout("Submit %s of instance %s", text_submit, inst)
1230
    op = opcls(instance_name=inst)
1231
    jex.QueueJob(inst, op)
1232

    
1233
  results = jex.GetResults()
1234
  bad_cnt = len([1 for (success, _) in results if not success])
1235

    
1236
  if bad_cnt == 0:
1237
    ToStdout("All instances have been %s successfully", text_success)
1238
  else:
1239
    ToStderr("There were errors while %s instances:\n"
1240
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1241
             len(results))
1242
    return False
1243

    
1244
  return True
1245

    
1246

    
1247
class _RunWhenNodesReachableHelper:
1248
  """Helper class to make shared internal state sharing easier.
1249

1250
  @ivar success: Indicates if all action_cb calls were successful
1251

1252
  """
1253
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1254
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1255
    """Init the object.
1256

1257
    @param node_list: The list of nodes to be reachable
1258
    @param action_cb: Callback called when a new host is reachable
1259
    @type node2ip: dict
1260
    @param node2ip: Node to ip mapping
1261
    @param port: The port to use for the TCP ping
1262
    @param feedback_fn: The function used for feedback
1263
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1264
    @param _sleep_fn: Function to sleep (for unittest use only)
1265

1266
    """
1267
    self.down = set(node_list)
1268
    self.up = set()
1269
    self.node2ip = node2ip
1270
    self.success = True
1271
    self.action_cb = action_cb
1272
    self.port = port
1273
    self.feedback_fn = feedback_fn
1274
    self._ping_fn = _ping_fn
1275
    self._sleep_fn = _sleep_fn
1276

    
1277
  def __call__(self):
1278
    """When called we run action_cb.
1279

1280
    @raises utils.RetryAgain: When there are still down nodes
1281

1282
    """
1283
    if not self.action_cb(self.up):
1284
      self.success = False
1285

    
1286
    if self.down:
1287
      raise utils.RetryAgain()
1288
    else:
1289
      return self.success
1290

    
1291
  def Wait(self, secs):
1292
    """Checks if a host is up or waits remaining seconds.
1293

1294
    @param secs: The secs remaining
1295

1296
    """
1297
    start = time.time()
1298
    for node in self.down:
1299
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1300
                       live_port_needed=True):
1301
        self.feedback_fn("Node %s became available" % node)
1302
        self.up.add(node)
1303
        self.down -= self.up
1304
        # If we have a node available there is the possibility to run the
1305
        # action callback successfully, therefore we don't wait and return
1306
        return
1307

    
1308
    self._sleep_fn(max(0.0, start + secs - time.time()))
1309

    
1310

    
1311
def _RunWhenNodesReachable(node_list, action_cb, interval):
1312
  """Run action_cb when nodes become reachable.
1313

1314
  @param node_list: The list of nodes to be reachable
1315
  @param action_cb: Callback called when a new host is reachable
1316
  @param interval: The earliest time to retry
1317

1318
  """
1319
  client = GetClient()
1320
  cluster_info = client.QueryClusterInfo()
1321
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1322
    family = netutils.IPAddress.family
1323
  else:
1324
    family = netutils.IP6Address.family
1325

    
1326
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1327
                 for node in node_list)
1328

    
1329
  port = netutils.GetDaemonPort(constants.NODED)
1330
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1331
                                        ToStdout)
1332

    
1333
  try:
1334
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1335
                       wait_fn=helper.Wait)
1336
  except utils.RetryTimeout:
1337
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1338
             " again:\n  - %s", "  - ".join(helper.down))
1339
    return False
1340

    
1341

    
1342
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1343
                          _instance_start_fn=_InstanceStart):
1344
  """Start the instances conditional based on node_states.
1345

1346
  @param opts: The command line options selected by the user
1347
  @param inst_map: A dict of inst -> nodes mapping
1348
  @param nodes_online: A list of nodes online
1349
  @param _instance_start_fn: Callback to start instances (unittest use only)
1350
  @return: Success of the operation on all instances
1351

1352
  """
1353
  start_inst_list = []
1354
  for (inst, nodes) in inst_map.items():
1355
    if not (nodes - nodes_online):
1356
      # All nodes the instance lives on are back online
1357
      start_inst_list.append(inst)
1358

    
1359
  for inst in start_inst_list:
1360
    del inst_map[inst]
1361

    
1362
  if start_inst_list:
1363
    return _instance_start_fn(opts, start_inst_list, True)
1364

    
1365
  return True
1366

    
1367

    
1368
def _EpoOn(opts, full_node_list, node_list, inst_map):
1369
  """Does the actual power on.
1370

1371
  @param opts: The command line options selected by the user
1372
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1373
                         OOB)
1374
  @param node_list: The list of nodes to operate on (all need to support OOB)
1375
  @param inst_map: A dict of inst -> nodes mapping
1376
  @return: The desired exit status
1377

1378
  """
1379
  if node_list and not _OobPower(opts, node_list, False):
1380
    ToStderr("Not all nodes seem to get back up, investigate and start"
1381
             " manually if needed")
1382

    
1383
  # Wait for the nodes to be back up
1384
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1385

    
1386
  ToStdout("Waiting until all nodes are available again")
1387
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1388
    ToStderr("Please investigate and start stopped instances manually")
1389
    return constants.EXIT_FAILURE
1390

    
1391
  return constants.EXIT_SUCCESS
1392

    
1393

    
1394
def _EpoOff(opts, node_list, inst_map):
1395
  """Does the actual power off.
1396

1397
  @param opts: The command line options selected by the user
1398
  @param node_list: The list of nodes to operate on (all need to support OOB)
1399
  @param inst_map: A dict of inst -> nodes mapping
1400
  @return: The desired exit status
1401

1402
  """
1403
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1404
    ToStderr("Please investigate and stop instances manually before continuing")
1405
    return constants.EXIT_FAILURE
1406

    
1407
  if not node_list:
1408
    return constants.EXIT_SUCCESS
1409

    
1410
  if _OobPower(opts, node_list, False):
1411
    return constants.EXIT_SUCCESS
1412
  else:
1413
    return constants.EXIT_FAILURE
1414

    
1415

    
1416
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1417
        _confirm_fn=ConfirmOperation,
1418
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1419
  """EPO operations.
1420

1421
  @param opts: the command line options selected by the user
1422
  @type args: list
1423
  @param args: should contain only one element, the subcommand
1424
  @rtype: int
1425
  @return: the desired exit code
1426

1427
  """
1428
  if opts.groups and opts.show_all:
1429
    _stderr_fn("Only one of --groups or --all are allowed")
1430
    return constants.EXIT_FAILURE
1431
  elif args and opts.show_all:
1432
    _stderr_fn("Arguments in combination with --all are not allowed")
1433
    return constants.EXIT_FAILURE
1434

    
1435
  if cl is None:
1436
    cl = GetClient()
1437

    
1438
  if opts.groups:
1439
    node_query_list = \
1440
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1441
  else:
1442
    node_query_list = args
1443

    
1444
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1445
                                           "sinst_list", "powered", "offline"],
1446
                         False)
1447

    
1448
  all_nodes = map(compat.fst, result)
1449
  node_list = []
1450
  inst_map = {}
1451
  for (node, master, pinsts, sinsts, powered, offline) in result:
1452
    if not offline:
1453
      for inst in (pinsts + sinsts):
1454
        if inst in inst_map:
1455
          if not master:
1456
            inst_map[inst].add(node)
1457
        elif master:
1458
          inst_map[inst] = set()
1459
        else:
1460
          inst_map[inst] = set([node])
1461

    
1462
    if master and opts.on:
1463
      # We ignore the master for turning on the machines, in fact we are
1464
      # already operating on the master at this point :)
1465
      continue
1466
    elif master and not opts.show_all:
1467
      _stderr_fn("%s is the master node, please do a master-failover to another"
1468
                 " node not affected by the EPO or use --all if you intend to"
1469
                 " shutdown the whole cluster", node)
1470
      return constants.EXIT_FAILURE
1471
    elif powered is None:
1472
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1473
                 " handled in a fully automated manner", node)
1474
    elif powered == opts.on:
1475
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1476
    elif not offline or (offline and powered):
1477
      node_list.append(node)
1478

    
1479
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1480
    return constants.EXIT_FAILURE
1481

    
1482
  if opts.on:
1483
    return _on_fn(opts, all_nodes, node_list, inst_map)
1484
  else:
1485
    return _off_fn(opts, node_list, inst_map)
1486

    
1487

    
1488
def _GetCreateCommand(info):
1489
  buf = StringIO()
1490
  buf.write("gnt-cluster init")
1491
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1492
  buf.write(" ")
1493
  buf.write(info["name"])
1494
  return buf.getvalue()
1495

    
1496

    
1497
def ShowCreateCommand(opts, args):
1498
  """Shows the command that can be used to re-create the cluster.
1499

1500
  Currently it works only for ipolicy specs.
1501

1502
  """
1503
  cl = GetClient(query=True)
1504
  result = cl.QueryClusterInfo()
1505
  ToStdout(_GetCreateCommand(result))
1506

    
1507

    
1508
commands = {
1509
  "init": (
1510
    InitCluster, [ArgHost(min=1, max=1)],
1511
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1512
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1513
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1514
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1515
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1516
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1517
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1518
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
1519
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
1520
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1521
  "destroy": (
1522
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1523
    "", "Destroy cluster"),
1524
  "rename": (
1525
    RenameCluster, [ArgHost(min=1, max=1)],
1526
    [FORCE_OPT, DRY_RUN_OPT],
1527
    "<new_name>",
1528
    "Renames the cluster"),
1529
  "redist-conf": (
1530
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1531
    "", "Forces a push of the configuration file and ssconf files"
1532
    " to the nodes in the cluster"),
1533
  "verify": (
1534
    VerifyCluster, ARGS_NONE,
1535
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1536
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1537
    "", "Does a check on the cluster configuration"),
1538
  "verify-disks": (
1539
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1540
    "", "Does a check on the cluster disk status"),
1541
  "repair-disk-sizes": (
1542
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1543
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1544
  "master-failover": (
1545
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1546
    "", "Makes the current node the master"),
1547
  "master-ping": (
1548
    MasterPing, ARGS_NONE, [],
1549
    "", "Checks if the master is alive"),
1550
  "version": (
1551
    ShowClusterVersion, ARGS_NONE, [],
1552
    "", "Shows the cluster version"),
1553
  "getmaster": (
1554
    ShowClusterMaster, ARGS_NONE, [],
1555
    "", "Shows the cluster master"),
1556
  "copyfile": (
1557
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1558
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1559
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1560
  "command": (
1561
    RunClusterCommand, [ArgCommand(min=1)],
1562
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1563
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1564
  "info": (
1565
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1566
    "[--roman]", "Show cluster configuration"),
1567
  "list-tags": (
1568
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1569
  "add-tags": (
1570
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1571
    "tag...", "Add tags to the cluster"),
1572
  "remove-tags": (
1573
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1574
    "tag...", "Remove tags from the cluster"),
1575
  "search-tags": (
1576
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1577
    "Searches the tags on all objects on"
1578
    " the cluster for a given pattern (regex)"),
1579
  "queue": (
1580
    QueueOps,
1581
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1582
    [], "drain|undrain|info", "Change queue properties"),
1583
  "watcher": (
1584
    WatcherOps,
1585
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1586
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1587
    [],
1588
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1589
  "modify": (
1590
    SetClusterParams, ARGS_NONE,
1591
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1592
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1593
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1594
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1595
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1596
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1597
     DISK_STATE_OPT, SUBMIT_OPT, ENABLED_DISK_TEMPLATES_OPT,
1598
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS,
1599
    "[opts...]",
1600
    "Alters the parameters of the cluster"),
1601
  "renew-crypto": (
1602
    RenewCrypto, ARGS_NONE,
1603
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1604
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1605
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1606
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1607
    "[opts...]",
1608
    "Renews cluster certificates, keys and secrets"),
1609
  "epo": (
1610
    Epo, [ArgUnknown()],
1611
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1612
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1613
    "[opts...] [args]",
1614
    "Performs an emergency power-off on given args"),
1615
  "activate-master-ip": (
1616
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1617
  "deactivate-master-ip": (
1618
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1619
    "Deactivates the master IP"),
1620
  "show-ispecs-cmd": (
1621
    ShowCreateCommand, ARGS_NONE, [], "",
1622
    "Show the command line to re-create the cluster"),
1623
  }
1624

    
1625

    
1626
#: dictionary with aliases for commands
1627
aliases = {
1628
  "masterfailover": "master-failover",
1629
  "show": "info",
1630
}
1631

    
1632

    
1633
def Main():
1634
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1635
                     aliases=aliases)