Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ da5f09ef

History | View | Annotate | Download (51.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45
from ganeti import pathutils
46

    
47

    
48
ON_OPT = cli_option("--on", default=False,
49
                    action="store_true", dest="on",
50
                    help="Recover from an EPO")
51

    
52
GROUPS_OPT = cli_option("--groups", default=False,
53
                        action="store_true", dest="groups",
54
                        help="Arguments are node groups instead of nodes")
55

    
56
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
57
                            help="Override interactive check for --no-voting",
58
                            default=False, action="store_true")
59

    
60
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
61
_EPO_PING_TIMEOUT = 1 # 1 second
62
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
63

    
64

    
65
@UsesRPC
66
def InitCluster(opts, args):
67
  """Initialize the cluster.
68

69
  @param opts: the command line options selected by the user
70
  @type args: list
71
  @param args: should contain only one element, the desired
72
      cluster name
73
  @rtype: int
74
  @return: the desired exit code
75

76
  """
77
  if not opts.lvm_storage and opts.vg_name:
78
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
79
    return 1
80

    
81
  vg_name = opts.vg_name
82
  if opts.lvm_storage and not opts.vg_name:
83
    vg_name = constants.DEFAULT_VG
84

    
85
  if not opts.drbd_storage and opts.drbd_helper:
86
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
87
    return 1
88

    
89
  drbd_helper = opts.drbd_helper
90
  if opts.drbd_storage and not opts.drbd_helper:
91
    drbd_helper = constants.DEFAULT_DRBD_HELPER
92

    
93
  master_netdev = opts.master_netdev
94
  if master_netdev is None:
95
    master_netdev = constants.DEFAULT_BRIDGE
96

    
97
  hvlist = opts.enabled_hypervisors
98
  if hvlist is None:
99
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
100
  hvlist = hvlist.split(",")
101

    
102
  hvparams = dict(opts.hvparams)
103
  beparams = opts.beparams
104
  nicparams = opts.nicparams
105

    
106
  diskparams = dict(opts.diskparams)
107

    
108
  # check the disk template types here, as we cannot rely on the type check done
109
  # by the opcode parameter types
110
  diskparams_keys = set(diskparams.keys())
111
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
112
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
113
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
114
    return 1
115

    
116
  # prepare beparams dict
117
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
118
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
119

    
120
  # prepare nicparams dict
121
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
122
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
123

    
124
  # prepare ndparams dict
125
  if opts.ndparams is None:
126
    ndparams = dict(constants.NDC_DEFAULTS)
127
  else:
128
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
129
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
130

    
131
  # prepare hvparams dict
132
  for hv in constants.HYPER_TYPES:
133
    if hv not in hvparams:
134
      hvparams[hv] = {}
135
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
136
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
137

    
138
  # prepare diskparams dict
139
  for templ in constants.DISK_TEMPLATES:
140
    if templ not in diskparams:
141
      diskparams[templ] = {}
142
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
143
                                         diskparams[templ])
144
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
145

    
146
  # prepare ipolicy dict
147
  ipolicy = CreateIPolicyFromOpts(
148
    ispecs_mem_size=opts.ispecs_mem_size,
149
    ispecs_cpu_count=opts.ispecs_cpu_count,
150
    ispecs_disk_count=opts.ispecs_disk_count,
151
    ispecs_disk_size=opts.ispecs_disk_size,
152
    ispecs_nic_count=opts.ispecs_nic_count,
153
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
154
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
155
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
156
    fill_all=True)
157

    
158
  if opts.candidate_pool_size is None:
159
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
160

    
161
  if opts.mac_prefix is None:
162
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
163

    
164
  uid_pool = opts.uid_pool
165
  if uid_pool is not None:
166
    uid_pool = uidpool.ParseUidPool(uid_pool)
167

    
168
  if opts.prealloc_wipe_disks is None:
169
    opts.prealloc_wipe_disks = False
170

    
171
  external_ip_setup_script = opts.use_external_mip_script
172
  if external_ip_setup_script is None:
173
    external_ip_setup_script = False
174

    
175
  try:
176
    primary_ip_version = int(opts.primary_ip_version)
177
  except (ValueError, TypeError), err:
178
    ToStderr("Invalid primary ip version value: %s" % str(err))
179
    return 1
180

    
181
  master_netmask = opts.master_netmask
182
  try:
183
    if master_netmask is not None:
184
      master_netmask = int(master_netmask)
185
  except (ValueError, TypeError), err:
186
    ToStderr("Invalid master netmask value: %s" % str(err))
187
    return 1
188

    
189
  if opts.disk_state:
190
    disk_state = utils.FlatToDict(opts.disk_state)
191
  else:
192
    disk_state = {}
193

    
194
  hv_state = dict(opts.hv_state)
195

    
196
  enabled_storage_types = opts.enabled_storage_types
197
  if enabled_storage_types is not None:
198
    enabled_storage_types = enabled_storage_types.split(",")
199
  else:
200
    enabled_storage_types = list(constants.DEFAULT_ENABLED_STORAGE_TYPES)
201

    
202
  bootstrap.InitCluster(cluster_name=args[0],
203
                        secondary_ip=opts.secondary_ip,
204
                        vg_name=vg_name,
205
                        mac_prefix=opts.mac_prefix,
206
                        master_netmask=master_netmask,
207
                        master_netdev=master_netdev,
208
                        file_storage_dir=opts.file_storage_dir,
209
                        shared_file_storage_dir=opts.shared_file_storage_dir,
210
                        enabled_hypervisors=hvlist,
211
                        hvparams=hvparams,
212
                        beparams=beparams,
213
                        nicparams=nicparams,
214
                        ndparams=ndparams,
215
                        diskparams=diskparams,
216
                        ipolicy=ipolicy,
217
                        candidate_pool_size=opts.candidate_pool_size,
218
                        modify_etc_hosts=opts.modify_etc_hosts,
219
                        modify_ssh_setup=opts.modify_ssh_setup,
220
                        maintain_node_health=opts.maintain_node_health,
221
                        drbd_helper=drbd_helper,
222
                        uid_pool=uid_pool,
223
                        default_iallocator=opts.default_iallocator,
224
                        primary_ip_version=primary_ip_version,
225
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
226
                        use_external_mip_script=external_ip_setup_script,
227
                        hv_state=hv_state,
228
                        disk_state=disk_state,
229
                        enabled_storage_types=enabled_storage_types,
230
                        )
231
  op = opcodes.OpClusterPostInit()
232
  SubmitOpCode(op, opts=opts)
233
  return 0
234

    
235

    
236
@UsesRPC
237
def DestroyCluster(opts, args):
238
  """Destroy the cluster.
239

240
  @param opts: the command line options selected by the user
241
  @type args: list
242
  @param args: should be an empty list
243
  @rtype: int
244
  @return: the desired exit code
245

246
  """
247
  if not opts.yes_do_it:
248
    ToStderr("Destroying a cluster is irreversible. If you really want"
249
             " destroy this cluster, supply the --yes-do-it option.")
250
    return 1
251

    
252
  op = opcodes.OpClusterDestroy()
253
  master = SubmitOpCode(op, opts=opts)
254
  # if we reached this, the opcode didn't fail; we can proceed to
255
  # shutdown all the daemons
256
  bootstrap.FinalizeClusterDestroy(master)
257
  return 0
258

    
259

    
260
def RenameCluster(opts, args):
261
  """Rename the cluster.
262

263
  @param opts: the command line options selected by the user
264
  @type args: list
265
  @param args: should contain only one element, the new cluster name
266
  @rtype: int
267
  @return: the desired exit code
268

269
  """
270
  cl = GetClient()
271

    
272
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
273

    
274
  new_name = args[0]
275
  if not opts.force:
276
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
277
                " connected over the network to the cluster name, the"
278
                " operation is very dangerous as the IP address will be"
279
                " removed from the node and the change may not go through."
280
                " Continue?") % (cluster_name, new_name)
281
    if not AskUser(usertext):
282
      return 1
283

    
284
  op = opcodes.OpClusterRename(name=new_name)
285
  result = SubmitOpCode(op, opts=opts, cl=cl)
286

    
287
  if result:
288
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
289

    
290
  return 0
291

    
292

    
293
def ActivateMasterIp(opts, args):
294
  """Activates the master IP.
295

296
  """
297
  op = opcodes.OpClusterActivateMasterIp()
298
  SubmitOpCode(op)
299
  return 0
300

    
301

    
302
def DeactivateMasterIp(opts, args):
303
  """Deactivates the master IP.
304

305
  """
306
  if not opts.confirm:
307
    usertext = ("This will disable the master IP. All the open connections to"
308
                " the master IP will be closed. To reach the master you will"
309
                " need to use its node IP."
310
                " Continue?")
311
    if not AskUser(usertext):
312
      return 1
313

    
314
  op = opcodes.OpClusterDeactivateMasterIp()
315
  SubmitOpCode(op)
316
  return 0
317

    
318

    
319
def RedistributeConfig(opts, args):
320
  """Forces push of the cluster configuration.
321

322
  @param opts: the command line options selected by the user
323
  @type args: list
324
  @param args: empty list
325
  @rtype: int
326
  @return: the desired exit code
327

328
  """
329
  op = opcodes.OpClusterRedistConf()
330
  SubmitOrSend(op, opts)
331
  return 0
332

    
333

    
334
def ShowClusterVersion(opts, args):
335
  """Write version of ganeti software to the standard output.
336

337
  @param opts: the command line options selected by the user
338
  @type args: list
339
  @param args: should be an empty list
340
  @rtype: int
341
  @return: the desired exit code
342

343
  """
344
  cl = GetClient(query=True)
345
  result = cl.QueryClusterInfo()
346
  ToStdout("Software version: %s", result["software_version"])
347
  ToStdout("Internode protocol: %s", result["protocol_version"])
348
  ToStdout("Configuration format: %s", result["config_version"])
349
  ToStdout("OS api version: %s", result["os_api_version"])
350
  ToStdout("Export interface: %s", result["export_version"])
351
  return 0
352

    
353

    
354
def ShowClusterMaster(opts, args):
355
  """Write name of master node to the standard output.
356

357
  @param opts: the command line options selected by the user
358
  @type args: list
359
  @param args: should be an empty list
360
  @rtype: int
361
  @return: the desired exit code
362

363
  """
364
  master = bootstrap.GetMaster()
365
  ToStdout(master)
366
  return 0
367

    
368

    
369
def _FormatGroupedParams(paramsdict, roman=False):
370
  """Format Grouped parameters (be, nic, disk) by group.
371

372
  @type paramsdict: dict of dicts
373
  @param paramsdict: {group: {param: value, ...}, ...}
374
  @rtype: dict of dicts
375
  @return: copy of the input dictionaries with strings as values
376

377
  """
378
  ret = {}
379
  for (item, val) in paramsdict.items():
380
    if isinstance(val, dict):
381
      ret[item] = _FormatGroupedParams(val, roman=roman)
382
    elif roman and isinstance(val, int):
383
      ret[item] = compat.TryToRoman(val)
384
    else:
385
      ret[item] = str(val)
386
  return ret
387

    
388

    
389
def ShowClusterConfig(opts, args):
390
  """Shows cluster information.
391

392
  @param opts: the command line options selected by the user
393
  @type args: list
394
  @param args: should be an empty list
395
  @rtype: int
396
  @return: the desired exit code
397

398
  """
399
  cl = GetClient(query=True)
400
  result = cl.QueryClusterInfo()
401

    
402
  if result["tags"]:
403
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
404
  else:
405
    tags = "(none)"
406
  if result["reserved_lvs"]:
407
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
408
  else:
409
    reserved_lvs = "(none)"
410

    
411
  info = [
412
    ("Cluster name", result["name"]),
413
    ("Cluster UUID", result["uuid"]),
414

    
415
    ("Creation time", utils.FormatTime(result["ctime"])),
416
    ("Modification time", utils.FormatTime(result["mtime"])),
417

    
418
    ("Master node", result["master"]),
419

    
420
    ("Architecture (this node)",
421
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
422

    
423
    ("Tags", tags),
424

    
425
    ("Default hypervisor", result["default_hypervisor"]),
426
    ("Enabled hypervisors",
427
     utils.CommaJoin(result["enabled_hypervisors"])),
428

    
429
    ("Hypervisor parameters", _FormatGroupedParams(result["hvparams"])),
430

    
431
    ("OS-specific hypervisor parameters",
432
     _FormatGroupedParams(result["os_hvp"])),
433

    
434
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
435

    
436
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
437
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
438

    
439
    ("Cluster parameters", [
440
      ("candidate pool size",
441
       compat.TryToRoman(result["candidate_pool_size"],
442
                         convert=opts.roman_integers)),
443
      ("master netdev", result["master_netdev"]),
444
      ("master netmask", result["master_netmask"]),
445
      ("use external master IP address setup script",
446
       result["use_external_mip_script"]),
447
      ("lvm volume group", result["volume_group_name"]),
448
      ("lvm reserved volumes", reserved_lvs),
449
      ("drbd usermode helper", result["drbd_usermode_helper"]),
450
      ("file storage path", result["file_storage_dir"]),
451
      ("shared file storage path", result["shared_file_storage_dir"]),
452
      ("maintenance of node health", result["maintain_node_health"]),
453
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
454
      ("default instance allocator", result["default_iallocator"]),
455
      ("primary ip version", result["primary_ip_version"]),
456
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
457
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
458
      ("ExtStorage Providers search path",
459
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
460
      ("enabled storage types",
461
       utils.CommaJoin(result["enabled_storage_types"])),
462
      ]),
463

    
464
    ("Default node parameters",
465
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
466

    
467
    ("Default instance parameters",
468
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
469

    
470
    ("Default nic parameters",
471
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
472

    
473
    ("Default disk parameters",
474
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
475

    
476
    ("Instance policy - limits for instances",
477
     [
478
       (key,
479
        _FormatGroupedParams(result["ipolicy"][constants.ISPECS_MINMAX][key],
480
                             roman=opts.roman_integers))
481
       for key in constants.ISPECS_MINMAX_KEYS
482
       ] +
483
     [
484
       (constants.ISPECS_STD,
485
        _FormatGroupedParams(result["ipolicy"][constants.ISPECS_STD],
486
                             roman=opts.roman_integers)),
487
       ("enabled disk templates",
488
        utils.CommaJoin(result["ipolicy"][constants.IPOLICY_DTS])),
489
       ] +
490
     [
491
       (key, result["ipolicy"][key])
492
       for key in constants.IPOLICY_PARAMETERS
493
       ]),
494
    ]
495

    
496
  PrintGenericInfo(info)
497
  return 0
498

    
499

    
500
def ClusterCopyFile(opts, args):
501
  """Copy a file from master to some nodes.
502

503
  @param opts: the command line options selected by the user
504
  @type args: list
505
  @param args: should contain only one element, the path of
506
      the file to be copied
507
  @rtype: int
508
  @return: the desired exit code
509

510
  """
511
  filename = args[0]
512
  if not os.path.exists(filename):
513
    raise errors.OpPrereqError("No such filename '%s'" % filename,
514
                               errors.ECODE_INVAL)
515

    
516
  cl = GetClient()
517

    
518
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
519

    
520
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
521
                           secondary_ips=opts.use_replication_network,
522
                           nodegroup=opts.nodegroup)
523

    
524
  srun = ssh.SshRunner(cluster_name)
525
  for node in results:
526
    if not srun.CopyFileToNode(node, filename):
527
      ToStderr("Copy of file %s to node %s failed", filename, node)
528

    
529
  return 0
530

    
531

    
532
def RunClusterCommand(opts, args):
533
  """Run a command on some nodes.
534

535
  @param opts: the command line options selected by the user
536
  @type args: list
537
  @param args: should contain the command to be run and its arguments
538
  @rtype: int
539
  @return: the desired exit code
540

541
  """
542
  cl = GetClient()
543

    
544
  command = " ".join(args)
545

    
546
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
547

    
548
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
549
                                                    "master_node"])
550

    
551
  srun = ssh.SshRunner(cluster_name=cluster_name)
552

    
553
  # Make sure master node is at list end
554
  if master_node in nodes:
555
    nodes.remove(master_node)
556
    nodes.append(master_node)
557

    
558
  for name in nodes:
559
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
560

    
561
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
562
      # Do not output anything for successful commands
563
      continue
564

    
565
    ToStdout("------------------------------------------------")
566
    if opts.show_machine_names:
567
      for line in result.output.splitlines():
568
        ToStdout("%s: %s", name, line)
569
    else:
570
      ToStdout("node: %s", name)
571
      ToStdout("%s", result.output)
572
    ToStdout("return code = %s", result.exit_code)
573

    
574
  return 0
575

    
576

    
577
def VerifyCluster(opts, args):
578
  """Verify integrity of cluster, performing various test on nodes.
579

580
  @param opts: the command line options selected by the user
581
  @type args: list
582
  @param args: should be an empty list
583
  @rtype: int
584
  @return: the desired exit code
585

586
  """
587
  skip_checks = []
588

    
589
  if opts.skip_nplusone_mem:
590
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
591

    
592
  cl = GetClient()
593

    
594
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
595
                               error_codes=opts.error_codes,
596
                               debug_simulate_errors=opts.simulate_errors,
597
                               skip_checks=skip_checks,
598
                               ignore_errors=opts.ignore_errors,
599
                               group_name=opts.nodegroup)
600
  result = SubmitOpCode(op, cl=cl, opts=opts)
601

    
602
  # Keep track of submitted jobs
603
  jex = JobExecutor(cl=cl, opts=opts)
604

    
605
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
606
    jex.AddJobId(None, status, job_id)
607

    
608
  results = jex.GetResults()
609

    
610
  (bad_jobs, bad_results) = \
611
    map(len,
612
        # Convert iterators to lists
613
        map(list,
614
            # Count errors
615
            map(compat.partial(itertools.ifilterfalse, bool),
616
                # Convert result to booleans in a tuple
617
                zip(*((job_success, len(op_results) == 1 and op_results[0])
618
                      for (job_success, op_results) in results)))))
619

    
620
  if bad_jobs == 0 and bad_results == 0:
621
    rcode = constants.EXIT_SUCCESS
622
  else:
623
    rcode = constants.EXIT_FAILURE
624
    if bad_jobs > 0:
625
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
626

    
627
  return rcode
628

    
629

    
630
def VerifyDisks(opts, args):
631
  """Verify integrity of cluster disks.
632

633
  @param opts: the command line options selected by the user
634
  @type args: list
635
  @param args: should be an empty list
636
  @rtype: int
637
  @return: the desired exit code
638

639
  """
640
  cl = GetClient()
641

    
642
  op = opcodes.OpClusterVerifyDisks()
643

    
644
  result = SubmitOpCode(op, cl=cl, opts=opts)
645

    
646
  # Keep track of submitted jobs
647
  jex = JobExecutor(cl=cl, opts=opts)
648

    
649
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
650
    jex.AddJobId(None, status, job_id)
651

    
652
  retcode = constants.EXIT_SUCCESS
653

    
654
  for (status, result) in jex.GetResults():
655
    if not status:
656
      ToStdout("Job failed: %s", result)
657
      continue
658

    
659
    ((bad_nodes, instances, missing), ) = result
660

    
661
    for node, text in bad_nodes.items():
662
      ToStdout("Error gathering data on node %s: %s",
663
               node, utils.SafeEncode(text[-400:]))
664
      retcode = constants.EXIT_FAILURE
665
      ToStdout("You need to fix these nodes first before fixing instances")
666

    
667
    for iname in instances:
668
      if iname in missing:
669
        continue
670
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
671
      try:
672
        ToStdout("Activating disks for instance '%s'", iname)
673
        SubmitOpCode(op, opts=opts, cl=cl)
674
      except errors.GenericError, err:
675
        nret, msg = FormatError(err)
676
        retcode |= nret
677
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
678

    
679
    if missing:
680
      for iname, ival in missing.iteritems():
681
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
682
        if all_missing:
683
          ToStdout("Instance %s cannot be verified as it lives on"
684
                   " broken nodes", iname)
685
        else:
686
          ToStdout("Instance %s has missing logical volumes:", iname)
687
          ival.sort()
688
          for node, vol in ival:
689
            if node in bad_nodes:
690
              ToStdout("\tbroken node %s /dev/%s", node, vol)
691
            else:
692
              ToStdout("\t%s /dev/%s", node, vol)
693

    
694
      ToStdout("You need to replace or recreate disks for all the above"
695
               " instances if this message persists after fixing broken nodes.")
696
      retcode = constants.EXIT_FAILURE
697
    elif not instances:
698
      ToStdout("No disks need to be activated.")
699

    
700
  return retcode
701

    
702

    
703
def RepairDiskSizes(opts, args):
704
  """Verify sizes of cluster disks.
705

706
  @param opts: the command line options selected by the user
707
  @type args: list
708
  @param args: optional list of instances to restrict check to
709
  @rtype: int
710
  @return: the desired exit code
711

712
  """
713
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
714
  SubmitOpCode(op, opts=opts)
715

    
716

    
717
@UsesRPC
718
def MasterFailover(opts, args):
719
  """Failover the master node.
720

721
  This command, when run on a non-master node, will cause the current
722
  master to cease being master, and the non-master to become new
723
  master.
724

725
  @param opts: the command line options selected by the user
726
  @type args: list
727
  @param args: should be an empty list
728
  @rtype: int
729
  @return: the desired exit code
730

731
  """
732
  if opts.no_voting and not opts.yes_do_it:
733
    usertext = ("This will perform the failover even if most other nodes"
734
                " are down, or if this node is outdated. This is dangerous"
735
                " as it can lead to a non-consistent cluster. Check the"
736
                " gnt-cluster(8) man page before proceeding. Continue?")
737
    if not AskUser(usertext):
738
      return 1
739

    
740
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
741

    
742

    
743
def MasterPing(opts, args):
744
  """Checks if the master is alive.
745

746
  @param opts: the command line options selected by the user
747
  @type args: list
748
  @param args: should be an empty list
749
  @rtype: int
750
  @return: the desired exit code
751

752
  """
753
  try:
754
    cl = GetClient()
755
    cl.QueryClusterInfo()
756
    return 0
757
  except Exception: # pylint: disable=W0703
758
    return 1
759

    
760

    
761
def SearchTags(opts, args):
762
  """Searches the tags on all the cluster.
763

764
  @param opts: the command line options selected by the user
765
  @type args: list
766
  @param args: should contain only one element, the tag pattern
767
  @rtype: int
768
  @return: the desired exit code
769

770
  """
771
  op = opcodes.OpTagsSearch(pattern=args[0])
772
  result = SubmitOpCode(op, opts=opts)
773
  if not result:
774
    return 1
775
  result = list(result)
776
  result.sort()
777
  for path, tag in result:
778
    ToStdout("%s %s", path, tag)
779

    
780

    
781
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
782
  """Reads and verifies an X509 certificate.
783

784
  @type cert_filename: string
785
  @param cert_filename: the path of the file containing the certificate to
786
                        verify encoded in PEM format
787
  @type verify_private_key: bool
788
  @param verify_private_key: whether to verify the private key in addition to
789
                             the public certificate
790
  @rtype: string
791
  @return: a string containing the PEM-encoded certificate.
792

793
  """
794
  try:
795
    pem = utils.ReadFile(cert_filename)
796
  except IOError, err:
797
    raise errors.X509CertError(cert_filename,
798
                               "Unable to read certificate: %s" % str(err))
799

    
800
  try:
801
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
802
  except Exception, err:
803
    raise errors.X509CertError(cert_filename,
804
                               "Unable to load certificate: %s" % str(err))
805

    
806
  if verify_private_key:
807
    try:
808
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
809
    except Exception, err:
810
      raise errors.X509CertError(cert_filename,
811
                                 "Unable to load private key: %s" % str(err))
812

    
813
  return pem
814

    
815

    
816
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
817
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
818
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
819
                 cds_filename, force):
820
  """Renews cluster certificates, keys and secrets.
821

822
  @type new_cluster_cert: bool
823
  @param new_cluster_cert: Whether to generate a new cluster certificate
824
  @type new_rapi_cert: bool
825
  @param new_rapi_cert: Whether to generate a new RAPI certificate
826
  @type rapi_cert_filename: string
827
  @param rapi_cert_filename: Path to file containing new RAPI certificate
828
  @type new_spice_cert: bool
829
  @param new_spice_cert: Whether to generate a new SPICE certificate
830
  @type spice_cert_filename: string
831
  @param spice_cert_filename: Path to file containing new SPICE certificate
832
  @type spice_cacert_filename: string
833
  @param spice_cacert_filename: Path to file containing the certificate of the
834
                                CA that signed the SPICE certificate
835
  @type new_confd_hmac_key: bool
836
  @param new_confd_hmac_key: Whether to generate a new HMAC key
837
  @type new_cds: bool
838
  @param new_cds: Whether to generate a new cluster domain secret
839
  @type cds_filename: string
840
  @param cds_filename: Path to file containing new cluster domain secret
841
  @type force: bool
842
  @param force: Whether to ask user for confirmation
843

844
  """
845
  if new_rapi_cert and rapi_cert_filename:
846
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
847
             " options can be specified at the same time.")
848
    return 1
849

    
850
  if new_cds and cds_filename:
851
    ToStderr("Only one of the --new-cluster-domain-secret and"
852
             " --cluster-domain-secret options can be specified at"
853
             " the same time.")
854
    return 1
855

    
856
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
857
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
858
             " and --spice-ca-certificate must not be used.")
859
    return 1
860

    
861
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
862
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
863
             " specified.")
864
    return 1
865

    
866
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
867
  try:
868
    if rapi_cert_filename:
869
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
870
    if spice_cert_filename:
871
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
872
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
873
  except errors.X509CertError, err:
874
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
875
    return 1
876

    
877
  if cds_filename:
878
    try:
879
      cds = utils.ReadFile(cds_filename)
880
    except Exception, err: # pylint: disable=W0703
881
      ToStderr("Can't load new cluster domain secret from %s: %s" %
882
               (cds_filename, str(err)))
883
      return 1
884
  else:
885
    cds = None
886

    
887
  if not force:
888
    usertext = ("This requires all daemons on all nodes to be restarted and"
889
                " may take some time. Continue?")
890
    if not AskUser(usertext):
891
      return 1
892

    
893
  def _RenewCryptoInner(ctx):
894
    ctx.feedback_fn("Updating certificates and keys")
895
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
896
                                    new_rapi_cert,
897
                                    new_spice_cert,
898
                                    new_confd_hmac_key,
899
                                    new_cds,
900
                                    rapi_cert_pem=rapi_cert_pem,
901
                                    spice_cert_pem=spice_cert_pem,
902
                                    spice_cacert_pem=spice_cacert_pem,
903
                                    cds=cds)
904

    
905
    files_to_copy = []
906

    
907
    if new_cluster_cert:
908
      files_to_copy.append(pathutils.NODED_CERT_FILE)
909

    
910
    if new_rapi_cert or rapi_cert_pem:
911
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
912

    
913
    if new_spice_cert or spice_cert_pem:
914
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
915
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
916

    
917
    if new_confd_hmac_key:
918
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
919

    
920
    if new_cds or cds:
921
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
922

    
923
    if files_to_copy:
924
      for node_name in ctx.nonmaster_nodes:
925
        ctx.feedback_fn("Copying %s to %s" %
926
                        (", ".join(files_to_copy), node_name))
927
        for file_name in files_to_copy:
928
          ctx.ssh.CopyFileToNode(node_name, file_name)
929

    
930
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
931

    
932
  ToStdout("All requested certificates and keys have been replaced."
933
           " Running \"gnt-cluster verify\" now is recommended.")
934

    
935
  return 0
936

    
937

    
938
def RenewCrypto(opts, args):
939
  """Renews cluster certificates, keys and secrets.
940

941
  """
942
  return _RenewCrypto(opts.new_cluster_cert,
943
                      opts.new_rapi_cert,
944
                      opts.rapi_cert,
945
                      opts.new_spice_cert,
946
                      opts.spice_cert,
947
                      opts.spice_cacert,
948
                      opts.new_confd_hmac_key,
949
                      opts.new_cluster_domain_secret,
950
                      opts.cluster_domain_secret,
951
                      opts.force)
952

    
953

    
954
def SetClusterParams(opts, args):
955
  """Modify the cluster.
956

957
  @param opts: the command line options selected by the user
958
  @type args: list
959
  @param args: should be an empty list
960
  @rtype: int
961
  @return: the desired exit code
962

963
  """
964
  if not (not opts.lvm_storage or opts.vg_name or
965
          not opts.drbd_storage or opts.drbd_helper or
966
          opts.enabled_hypervisors or opts.hvparams or
967
          opts.beparams or opts.nicparams or
968
          opts.ndparams or opts.diskparams or
969
          opts.candidate_pool_size is not None or
970
          opts.uid_pool is not None or
971
          opts.maintain_node_health is not None or
972
          opts.add_uids is not None or
973
          opts.remove_uids is not None or
974
          opts.default_iallocator is not None or
975
          opts.reserved_lvs is not None or
976
          opts.master_netdev is not None or
977
          opts.master_netmask is not None or
978
          opts.use_external_mip_script is not None or
979
          opts.prealloc_wipe_disks is not None or
980
          opts.hv_state or
981
          opts.enabled_storage_types or
982
          opts.disk_state or
983
          opts.ispecs_mem_size or
984
          opts.ispecs_cpu_count or
985
          opts.ispecs_disk_count or
986
          opts.ispecs_disk_size or
987
          opts.ispecs_nic_count or
988
          opts.ipolicy_disk_templates is not None or
989
          opts.ipolicy_vcpu_ratio is not None or
990
          opts.ipolicy_spindle_ratio is not None):
991
    ToStderr("Please give at least one of the parameters.")
992
    return 1
993

    
994
  vg_name = opts.vg_name
995
  if not opts.lvm_storage and opts.vg_name:
996
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
997
    return 1
998

    
999
  if not opts.lvm_storage:
1000
    vg_name = ""
1001

    
1002
  drbd_helper = opts.drbd_helper
1003
  if not opts.drbd_storage and opts.drbd_helper:
1004
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
1005
    return 1
1006

    
1007
  if not opts.drbd_storage:
1008
    drbd_helper = ""
1009

    
1010
  hvlist = opts.enabled_hypervisors
1011
  if hvlist is not None:
1012
    hvlist = hvlist.split(",")
1013

    
1014
  enabled_storage_types = opts.enabled_storage_types
1015
  if enabled_storage_types is not None:
1016
    enabled_storage_types = enabled_storage_types.split(",")
1017

    
1018
  # a list of (name, dict) we can pass directly to dict() (or [])
1019
  hvparams = dict(opts.hvparams)
1020
  for hv_params in hvparams.values():
1021
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1022

    
1023
  diskparams = dict(opts.diskparams)
1024

    
1025
  for dt_params in diskparams.values():
1026
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1027

    
1028
  beparams = opts.beparams
1029
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1030

    
1031
  nicparams = opts.nicparams
1032
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1033

    
1034
  ndparams = opts.ndparams
1035
  if ndparams is not None:
1036
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1037

    
1038
  ipolicy = CreateIPolicyFromOpts(
1039
    ispecs_mem_size=opts.ispecs_mem_size,
1040
    ispecs_cpu_count=opts.ispecs_cpu_count,
1041
    ispecs_disk_count=opts.ispecs_disk_count,
1042
    ispecs_disk_size=opts.ispecs_disk_size,
1043
    ispecs_nic_count=opts.ispecs_nic_count,
1044
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1045
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1046
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1047
    )
1048

    
1049
  mnh = opts.maintain_node_health
1050

    
1051
  uid_pool = opts.uid_pool
1052
  if uid_pool is not None:
1053
    uid_pool = uidpool.ParseUidPool(uid_pool)
1054

    
1055
  add_uids = opts.add_uids
1056
  if add_uids is not None:
1057
    add_uids = uidpool.ParseUidPool(add_uids)
1058

    
1059
  remove_uids = opts.remove_uids
1060
  if remove_uids is not None:
1061
    remove_uids = uidpool.ParseUidPool(remove_uids)
1062

    
1063
  if opts.reserved_lvs is not None:
1064
    if opts.reserved_lvs == "":
1065
      opts.reserved_lvs = []
1066
    else:
1067
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1068

    
1069
  if opts.master_netmask is not None:
1070
    try:
1071
      opts.master_netmask = int(opts.master_netmask)
1072
    except ValueError:
1073
      ToStderr("The --master-netmask option expects an int parameter.")
1074
      return 1
1075

    
1076
  ext_ip_script = opts.use_external_mip_script
1077

    
1078
  if opts.disk_state:
1079
    disk_state = utils.FlatToDict(opts.disk_state)
1080
  else:
1081
    disk_state = {}
1082

    
1083
  hv_state = dict(opts.hv_state)
1084

    
1085
  op = opcodes.OpClusterSetParams(
1086
    vg_name=vg_name,
1087
    drbd_helper=drbd_helper,
1088
    enabled_hypervisors=hvlist,
1089
    hvparams=hvparams,
1090
    os_hvp=None,
1091
    beparams=beparams,
1092
    nicparams=nicparams,
1093
    ndparams=ndparams,
1094
    diskparams=diskparams,
1095
    ipolicy=ipolicy,
1096
    candidate_pool_size=opts.candidate_pool_size,
1097
    maintain_node_health=mnh,
1098
    uid_pool=uid_pool,
1099
    add_uids=add_uids,
1100
    remove_uids=remove_uids,
1101
    default_iallocator=opts.default_iallocator,
1102
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1103
    master_netdev=opts.master_netdev,
1104
    master_netmask=opts.master_netmask,
1105
    reserved_lvs=opts.reserved_lvs,
1106
    use_external_mip_script=ext_ip_script,
1107
    hv_state=hv_state,
1108
    disk_state=disk_state,
1109
    enabled_storage_types=enabled_storage_types,
1110
    )
1111
  SubmitOrSend(op, opts)
1112
  return 0
1113

    
1114

    
1115
def QueueOps(opts, args):
1116
  """Queue operations.
1117

1118
  @param opts: the command line options selected by the user
1119
  @type args: list
1120
  @param args: should contain only one element, the subcommand
1121
  @rtype: int
1122
  @return: the desired exit code
1123

1124
  """
1125
  command = args[0]
1126
  client = GetClient()
1127
  if command in ("drain", "undrain"):
1128
    drain_flag = command == "drain"
1129
    client.SetQueueDrainFlag(drain_flag)
1130
  elif command == "info":
1131
    result = client.QueryConfigValues(["drain_flag"])
1132
    if result[0]:
1133
      val = "set"
1134
    else:
1135
      val = "unset"
1136
    ToStdout("The drain flag is %s" % val)
1137
  else:
1138
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1139
                               errors.ECODE_INVAL)
1140

    
1141
  return 0
1142

    
1143

    
1144
def _ShowWatcherPause(until):
1145
  if until is None or until < time.time():
1146
    ToStdout("The watcher is not paused.")
1147
  else:
1148
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1149

    
1150

    
1151
def WatcherOps(opts, args):
1152
  """Watcher operations.
1153

1154
  @param opts: the command line options selected by the user
1155
  @type args: list
1156
  @param args: should contain only one element, the subcommand
1157
  @rtype: int
1158
  @return: the desired exit code
1159

1160
  """
1161
  command = args[0]
1162
  client = GetClient()
1163

    
1164
  if command == "continue":
1165
    client.SetWatcherPause(None)
1166
    ToStdout("The watcher is no longer paused.")
1167

    
1168
  elif command == "pause":
1169
    if len(args) < 2:
1170
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1171

    
1172
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1173
    _ShowWatcherPause(result)
1174

    
1175
  elif command == "info":
1176
    result = client.QueryConfigValues(["watcher_pause"])
1177
    _ShowWatcherPause(result[0])
1178

    
1179
  else:
1180
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1181
                               errors.ECODE_INVAL)
1182

    
1183
  return 0
1184

    
1185

    
1186
def _OobPower(opts, node_list, power):
1187
  """Puts the node in the list to desired power state.
1188

1189
  @param opts: The command line options selected by the user
1190
  @param node_list: The list of nodes to operate on
1191
  @param power: True if they should be powered on, False otherwise
1192
  @return: The success of the operation (none failed)
1193

1194
  """
1195
  if power:
1196
    command = constants.OOB_POWER_ON
1197
  else:
1198
    command = constants.OOB_POWER_OFF
1199

    
1200
  op = opcodes.OpOobCommand(node_names=node_list,
1201
                            command=command,
1202
                            ignore_status=True,
1203
                            timeout=opts.oob_timeout,
1204
                            power_delay=opts.power_delay)
1205
  result = SubmitOpCode(op, opts=opts)
1206
  errs = 0
1207
  for node_result in result:
1208
    (node_tuple, data_tuple) = node_result
1209
    (_, node_name) = node_tuple
1210
    (data_status, _) = data_tuple
1211
    if data_status != constants.RS_NORMAL:
1212
      assert data_status != constants.RS_UNAVAIL
1213
      errs += 1
1214
      ToStderr("There was a problem changing power for %s, please investigate",
1215
               node_name)
1216

    
1217
  if errs > 0:
1218
    return False
1219

    
1220
  return True
1221

    
1222

    
1223
def _InstanceStart(opts, inst_list, start, no_remember=False):
1224
  """Puts the instances in the list to desired state.
1225

1226
  @param opts: The command line options selected by the user
1227
  @param inst_list: The list of instances to operate on
1228
  @param start: True if they should be started, False for shutdown
1229
  @param no_remember: If the instance state should be remembered
1230
  @return: The success of the operation (none failed)
1231

1232
  """
1233
  if start:
1234
    opcls = opcodes.OpInstanceStartup
1235
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1236
  else:
1237
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1238
                           timeout=opts.shutdown_timeout,
1239
                           no_remember=no_remember)
1240
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1241

    
1242
  jex = JobExecutor(opts=opts)
1243

    
1244
  for inst in inst_list:
1245
    ToStdout("Submit %s of instance %s", text_submit, inst)
1246
    op = opcls(instance_name=inst)
1247
    jex.QueueJob(inst, op)
1248

    
1249
  results = jex.GetResults()
1250
  bad_cnt = len([1 for (success, _) in results if not success])
1251

    
1252
  if bad_cnt == 0:
1253
    ToStdout("All instances have been %s successfully", text_success)
1254
  else:
1255
    ToStderr("There were errors while %s instances:\n"
1256
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1257
             len(results))
1258
    return False
1259

    
1260
  return True
1261

    
1262

    
1263
class _RunWhenNodesReachableHelper:
1264
  """Helper class to make shared internal state sharing easier.
1265

1266
  @ivar success: Indicates if all action_cb calls were successful
1267

1268
  """
1269
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1270
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1271
    """Init the object.
1272

1273
    @param node_list: The list of nodes to be reachable
1274
    @param action_cb: Callback called when a new host is reachable
1275
    @type node2ip: dict
1276
    @param node2ip: Node to ip mapping
1277
    @param port: The port to use for the TCP ping
1278
    @param feedback_fn: The function used for feedback
1279
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1280
    @param _sleep_fn: Function to sleep (for unittest use only)
1281

1282
    """
1283
    self.down = set(node_list)
1284
    self.up = set()
1285
    self.node2ip = node2ip
1286
    self.success = True
1287
    self.action_cb = action_cb
1288
    self.port = port
1289
    self.feedback_fn = feedback_fn
1290
    self._ping_fn = _ping_fn
1291
    self._sleep_fn = _sleep_fn
1292

    
1293
  def __call__(self):
1294
    """When called we run action_cb.
1295

1296
    @raises utils.RetryAgain: When there are still down nodes
1297

1298
    """
1299
    if not self.action_cb(self.up):
1300
      self.success = False
1301

    
1302
    if self.down:
1303
      raise utils.RetryAgain()
1304
    else:
1305
      return self.success
1306

    
1307
  def Wait(self, secs):
1308
    """Checks if a host is up or waits remaining seconds.
1309

1310
    @param secs: The secs remaining
1311

1312
    """
1313
    start = time.time()
1314
    for node in self.down:
1315
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1316
                       live_port_needed=True):
1317
        self.feedback_fn("Node %s became available" % node)
1318
        self.up.add(node)
1319
        self.down -= self.up
1320
        # If we have a node available there is the possibility to run the
1321
        # action callback successfully, therefore we don't wait and return
1322
        return
1323

    
1324
    self._sleep_fn(max(0.0, start + secs - time.time()))
1325

    
1326

    
1327
def _RunWhenNodesReachable(node_list, action_cb, interval):
1328
  """Run action_cb when nodes become reachable.
1329

1330
  @param node_list: The list of nodes to be reachable
1331
  @param action_cb: Callback called when a new host is reachable
1332
  @param interval: The earliest time to retry
1333

1334
  """
1335
  client = GetClient()
1336
  cluster_info = client.QueryClusterInfo()
1337
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1338
    family = netutils.IPAddress.family
1339
  else:
1340
    family = netutils.IP6Address.family
1341

    
1342
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1343
                 for node in node_list)
1344

    
1345
  port = netutils.GetDaemonPort(constants.NODED)
1346
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1347
                                        ToStdout)
1348

    
1349
  try:
1350
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1351
                       wait_fn=helper.Wait)
1352
  except utils.RetryTimeout:
1353
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1354
             " again:\n  - %s", "  - ".join(helper.down))
1355
    return False
1356

    
1357

    
1358
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1359
                          _instance_start_fn=_InstanceStart):
1360
  """Start the instances conditional based on node_states.
1361

1362
  @param opts: The command line options selected by the user
1363
  @param inst_map: A dict of inst -> nodes mapping
1364
  @param nodes_online: A list of nodes online
1365
  @param _instance_start_fn: Callback to start instances (unittest use only)
1366
  @return: Success of the operation on all instances
1367

1368
  """
1369
  start_inst_list = []
1370
  for (inst, nodes) in inst_map.items():
1371
    if not (nodes - nodes_online):
1372
      # All nodes the instance lives on are back online
1373
      start_inst_list.append(inst)
1374

    
1375
  for inst in start_inst_list:
1376
    del inst_map[inst]
1377

    
1378
  if start_inst_list:
1379
    return _instance_start_fn(opts, start_inst_list, True)
1380

    
1381
  return True
1382

    
1383

    
1384
def _EpoOn(opts, full_node_list, node_list, inst_map):
1385
  """Does the actual power on.
1386

1387
  @param opts: The command line options selected by the user
1388
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1389
                         OOB)
1390
  @param node_list: The list of nodes to operate on (all need to support OOB)
1391
  @param inst_map: A dict of inst -> nodes mapping
1392
  @return: The desired exit status
1393

1394
  """
1395
  if node_list and not _OobPower(opts, node_list, False):
1396
    ToStderr("Not all nodes seem to get back up, investigate and start"
1397
             " manually if needed")
1398

    
1399
  # Wait for the nodes to be back up
1400
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1401

    
1402
  ToStdout("Waiting until all nodes are available again")
1403
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1404
    ToStderr("Please investigate and start stopped instances manually")
1405
    return constants.EXIT_FAILURE
1406

    
1407
  return constants.EXIT_SUCCESS
1408

    
1409

    
1410
def _EpoOff(opts, node_list, inst_map):
1411
  """Does the actual power off.
1412

1413
  @param opts: The command line options selected by the user
1414
  @param node_list: The list of nodes to operate on (all need to support OOB)
1415
  @param inst_map: A dict of inst -> nodes mapping
1416
  @return: The desired exit status
1417

1418
  """
1419
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1420
    ToStderr("Please investigate and stop instances manually before continuing")
1421
    return constants.EXIT_FAILURE
1422

    
1423
  if not node_list:
1424
    return constants.EXIT_SUCCESS
1425

    
1426
  if _OobPower(opts, node_list, False):
1427
    return constants.EXIT_SUCCESS
1428
  else:
1429
    return constants.EXIT_FAILURE
1430

    
1431

    
1432
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1433
        _confirm_fn=ConfirmOperation,
1434
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1435
  """EPO operations.
1436

1437
  @param opts: the command line options selected by the user
1438
  @type args: list
1439
  @param args: should contain only one element, the subcommand
1440
  @rtype: int
1441
  @return: the desired exit code
1442

1443
  """
1444
  if opts.groups and opts.show_all:
1445
    _stderr_fn("Only one of --groups or --all are allowed")
1446
    return constants.EXIT_FAILURE
1447
  elif args and opts.show_all:
1448
    _stderr_fn("Arguments in combination with --all are not allowed")
1449
    return constants.EXIT_FAILURE
1450

    
1451
  if cl is None:
1452
    cl = GetClient()
1453

    
1454
  if opts.groups:
1455
    node_query_list = \
1456
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1457
  else:
1458
    node_query_list = args
1459

    
1460
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1461
                                           "sinst_list", "powered", "offline"],
1462
                         False)
1463

    
1464
  all_nodes = map(compat.fst, result)
1465
  node_list = []
1466
  inst_map = {}
1467
  for (node, master, pinsts, sinsts, powered, offline) in result:
1468
    if not offline:
1469
      for inst in (pinsts + sinsts):
1470
        if inst in inst_map:
1471
          if not master:
1472
            inst_map[inst].add(node)
1473
        elif master:
1474
          inst_map[inst] = set()
1475
        else:
1476
          inst_map[inst] = set([node])
1477

    
1478
    if master and opts.on:
1479
      # We ignore the master for turning on the machines, in fact we are
1480
      # already operating on the master at this point :)
1481
      continue
1482
    elif master and not opts.show_all:
1483
      _stderr_fn("%s is the master node, please do a master-failover to another"
1484
                 " node not affected by the EPO or use --all if you intend to"
1485
                 " shutdown the whole cluster", node)
1486
      return constants.EXIT_FAILURE
1487
    elif powered is None:
1488
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1489
                 " handled in a fully automated manner", node)
1490
    elif powered == opts.on:
1491
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1492
    elif not offline or (offline and powered):
1493
      node_list.append(node)
1494

    
1495
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1496
    return constants.EXIT_FAILURE
1497

    
1498
  if opts.on:
1499
    return _on_fn(opts, all_nodes, node_list, inst_map)
1500
  else:
1501
    return _off_fn(opts, node_list, inst_map)
1502

    
1503

    
1504
commands = {
1505
  "init": (
1506
    InitCluster, [ArgHost(min=1, max=1)],
1507
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1508
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1509
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1510
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1511
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1512
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1513
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1514
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_STORAGE_TYPES_OPT]
1515
     + INSTANCE_POLICY_OPTS,
1516
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1517
  "destroy": (
1518
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1519
    "", "Destroy cluster"),
1520
  "rename": (
1521
    RenameCluster, [ArgHost(min=1, max=1)],
1522
    [FORCE_OPT, DRY_RUN_OPT],
1523
    "<new_name>",
1524
    "Renames the cluster"),
1525
  "redist-conf": (
1526
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1527
    "", "Forces a push of the configuration file and ssconf files"
1528
    " to the nodes in the cluster"),
1529
  "verify": (
1530
    VerifyCluster, ARGS_NONE,
1531
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1532
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1533
    "", "Does a check on the cluster configuration"),
1534
  "verify-disks": (
1535
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1536
    "", "Does a check on the cluster disk status"),
1537
  "repair-disk-sizes": (
1538
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1539
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1540
  "master-failover": (
1541
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1542
    "", "Makes the current node the master"),
1543
  "master-ping": (
1544
    MasterPing, ARGS_NONE, [],
1545
    "", "Checks if the master is alive"),
1546
  "version": (
1547
    ShowClusterVersion, ARGS_NONE, [],
1548
    "", "Shows the cluster version"),
1549
  "getmaster": (
1550
    ShowClusterMaster, ARGS_NONE, [],
1551
    "", "Shows the cluster master"),
1552
  "copyfile": (
1553
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1554
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1555
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1556
  "command": (
1557
    RunClusterCommand, [ArgCommand(min=1)],
1558
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1559
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1560
  "info": (
1561
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1562
    "[--roman]", "Show cluster configuration"),
1563
  "list-tags": (
1564
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1565
  "add-tags": (
1566
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1567
    "tag...", "Add tags to the cluster"),
1568
  "remove-tags": (
1569
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1570
    "tag...", "Remove tags from the cluster"),
1571
  "search-tags": (
1572
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1573
    "Searches the tags on all objects on"
1574
    " the cluster for a given pattern (regex)"),
1575
  "queue": (
1576
    QueueOps,
1577
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1578
    [], "drain|undrain|info", "Change queue properties"),
1579
  "watcher": (
1580
    WatcherOps,
1581
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1582
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1583
    [],
1584
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1585
  "modify": (
1586
    SetClusterParams, ARGS_NONE,
1587
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1588
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1589
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1590
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1591
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1592
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1593
     DISK_STATE_OPT, SUBMIT_OPT, ENABLED_STORAGE_TYPES_OPT] +
1594
    INSTANCE_POLICY_OPTS,
1595
    "[opts...]",
1596
    "Alters the parameters of the cluster"),
1597
  "renew-crypto": (
1598
    RenewCrypto, ARGS_NONE,
1599
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1600
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1601
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1602
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1603
    "[opts...]",
1604
    "Renews cluster certificates, keys and secrets"),
1605
  "epo": (
1606
    Epo, [ArgUnknown()],
1607
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1608
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1609
    "[opts...] [args]",
1610
    "Performs an emergency power-off on given args"),
1611
  "activate-master-ip": (
1612
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1613
  "deactivate-master-ip": (
1614
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1615
    "Deactivates the master IP"),
1616
  }
1617

    
1618

    
1619
#: dictionary with aliases for commands
1620
aliases = {
1621
  "masterfailover": "master-failover",
1622
  "show": "info",
1623
}
1624

    
1625

    
1626
def Main():
1627
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1628
                     aliases=aliases)