Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 0e79564a

History | View | Annotate | Download (51.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45
from ganeti import pathutils
46

    
47

    
48
ON_OPT = cli_option("--on", default=False,
49
                    action="store_true", dest="on",
50
                    help="Recover from an EPO")
51

    
52
GROUPS_OPT = cli_option("--groups", default=False,
53
                        action="store_true", dest="groups",
54
                        help="Arguments are node groups instead of nodes")
55

    
56
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
57
                            help="Override interactive check for --no-voting",
58
                            default=False, action="store_true")
59

    
60
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
61
_EPO_PING_TIMEOUT = 1 # 1 second
62
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
63

    
64

    
65
@UsesRPC
66
def InitCluster(opts, args):
67
  """Initialize the cluster.
68

69
  @param opts: the command line options selected by the user
70
  @type args: list
71
  @param args: should contain only one element, the desired
72
      cluster name
73
  @rtype: int
74
  @return: the desired exit code
75

76
  """
77
  if not opts.lvm_storage and opts.vg_name:
78
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
79
    return 1
80

    
81
  vg_name = opts.vg_name
82
  if opts.lvm_storage and not opts.vg_name:
83
    vg_name = constants.DEFAULT_VG
84

    
85
  if not opts.drbd_storage and opts.drbd_helper:
86
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
87
    return 1
88

    
89
  drbd_helper = opts.drbd_helper
90
  if opts.drbd_storage and not opts.drbd_helper:
91
    drbd_helper = constants.DEFAULT_DRBD_HELPER
92

    
93
  master_netdev = opts.master_netdev
94
  if master_netdev is None:
95
    master_netdev = constants.DEFAULT_BRIDGE
96

    
97
  hvlist = opts.enabled_hypervisors
98
  if hvlist is None:
99
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
100
  hvlist = hvlist.split(",")
101

    
102
  hvparams = dict(opts.hvparams)
103
  beparams = opts.beparams
104
  nicparams = opts.nicparams
105

    
106
  diskparams = dict(opts.diskparams)
107

    
108
  # check the disk template types here, as we cannot rely on the type check done
109
  # by the opcode parameter types
110
  diskparams_keys = set(diskparams.keys())
111
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
112
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
113
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
114
    return 1
115

    
116
  # prepare beparams dict
117
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
118
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
119

    
120
  # prepare nicparams dict
121
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
122
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
123

    
124
  # prepare ndparams dict
125
  if opts.ndparams is None:
126
    ndparams = dict(constants.NDC_DEFAULTS)
127
  else:
128
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
129
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
130

    
131
  # prepare hvparams dict
132
  for hv in constants.HYPER_TYPES:
133
    if hv not in hvparams:
134
      hvparams[hv] = {}
135
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
136
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
137

    
138
  # prepare diskparams dict
139
  for templ in constants.DISK_TEMPLATES:
140
    if templ not in diskparams:
141
      diskparams[templ] = {}
142
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
143
                                         diskparams[templ])
144
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
145

    
146
  # prepare ipolicy dict
147
  ipolicy_raw = CreateIPolicyFromOpts(
148
    ispecs_mem_size=opts.ispecs_mem_size,
149
    ispecs_cpu_count=opts.ispecs_cpu_count,
150
    ispecs_disk_count=opts.ispecs_disk_count,
151
    ispecs_disk_size=opts.ispecs_disk_size,
152
    ispecs_nic_count=opts.ispecs_nic_count,
153
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
154
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
155
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
156
    fill_all=True)
157
  ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_raw)
158

    
159
  if opts.candidate_pool_size is None:
160
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
161

    
162
  if opts.mac_prefix is None:
163
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
164

    
165
  uid_pool = opts.uid_pool
166
  if uid_pool is not None:
167
    uid_pool = uidpool.ParseUidPool(uid_pool)
168

    
169
  if opts.prealloc_wipe_disks is None:
170
    opts.prealloc_wipe_disks = False
171

    
172
  external_ip_setup_script = opts.use_external_mip_script
173
  if external_ip_setup_script is None:
174
    external_ip_setup_script = False
175

    
176
  try:
177
    primary_ip_version = int(opts.primary_ip_version)
178
  except (ValueError, TypeError), err:
179
    ToStderr("Invalid primary ip version value: %s" % str(err))
180
    return 1
181

    
182
  master_netmask = opts.master_netmask
183
  try:
184
    if master_netmask is not None:
185
      master_netmask = int(master_netmask)
186
  except (ValueError, TypeError), err:
187
    ToStderr("Invalid master netmask value: %s" % str(err))
188
    return 1
189

    
190
  if opts.disk_state:
191
    disk_state = utils.FlatToDict(opts.disk_state)
192
  else:
193
    disk_state = {}
194

    
195
  hv_state = dict(opts.hv_state)
196

    
197
  enabled_storage_types = opts.enabled_storage_types
198
  if enabled_storage_types is not None:
199
    enabled_storage_types = enabled_storage_types.split(",")
200
  else:
201
    enabled_storage_types = list(constants.DEFAULT_ENABLED_STORAGE_TYPES)
202

    
203
  bootstrap.InitCluster(cluster_name=args[0],
204
                        secondary_ip=opts.secondary_ip,
205
                        vg_name=vg_name,
206
                        mac_prefix=opts.mac_prefix,
207
                        master_netmask=master_netmask,
208
                        master_netdev=master_netdev,
209
                        file_storage_dir=opts.file_storage_dir,
210
                        shared_file_storage_dir=opts.shared_file_storage_dir,
211
                        enabled_hypervisors=hvlist,
212
                        hvparams=hvparams,
213
                        beparams=beparams,
214
                        nicparams=nicparams,
215
                        ndparams=ndparams,
216
                        diskparams=diskparams,
217
                        ipolicy=ipolicy,
218
                        candidate_pool_size=opts.candidate_pool_size,
219
                        modify_etc_hosts=opts.modify_etc_hosts,
220
                        modify_ssh_setup=opts.modify_ssh_setup,
221
                        maintain_node_health=opts.maintain_node_health,
222
                        drbd_helper=drbd_helper,
223
                        uid_pool=uid_pool,
224
                        default_iallocator=opts.default_iallocator,
225
                        primary_ip_version=primary_ip_version,
226
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
227
                        use_external_mip_script=external_ip_setup_script,
228
                        hv_state=hv_state,
229
                        disk_state=disk_state,
230
                        enabled_storage_types=enabled_storage_types,
231
                        )
232
  op = opcodes.OpClusterPostInit()
233
  SubmitOpCode(op, opts=opts)
234
  return 0
235

    
236

    
237
@UsesRPC
238
def DestroyCluster(opts, args):
239
  """Destroy the cluster.
240

241
  @param opts: the command line options selected by the user
242
  @type args: list
243
  @param args: should be an empty list
244
  @rtype: int
245
  @return: the desired exit code
246

247
  """
248
  if not opts.yes_do_it:
249
    ToStderr("Destroying a cluster is irreversible. If you really want"
250
             " destroy this cluster, supply the --yes-do-it option.")
251
    return 1
252

    
253
  op = opcodes.OpClusterDestroy()
254
  master = SubmitOpCode(op, opts=opts)
255
  # if we reached this, the opcode didn't fail; we can proceed to
256
  # shutdown all the daemons
257
  bootstrap.FinalizeClusterDestroy(master)
258
  return 0
259

    
260

    
261
def RenameCluster(opts, args):
262
  """Rename the cluster.
263

264
  @param opts: the command line options selected by the user
265
  @type args: list
266
  @param args: should contain only one element, the new cluster name
267
  @rtype: int
268
  @return: the desired exit code
269

270
  """
271
  cl = GetClient()
272

    
273
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
274

    
275
  new_name = args[0]
276
  if not opts.force:
277
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
278
                " connected over the network to the cluster name, the"
279
                " operation is very dangerous as the IP address will be"
280
                " removed from the node and the change may not go through."
281
                " Continue?") % (cluster_name, new_name)
282
    if not AskUser(usertext):
283
      return 1
284

    
285
  op = opcodes.OpClusterRename(name=new_name)
286
  result = SubmitOpCode(op, opts=opts, cl=cl)
287

    
288
  if result:
289
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
290

    
291
  return 0
292

    
293

    
294
def ActivateMasterIp(opts, args):
295
  """Activates the master IP.
296

297
  """
298
  op = opcodes.OpClusterActivateMasterIp()
299
  SubmitOpCode(op)
300
  return 0
301

    
302

    
303
def DeactivateMasterIp(opts, args):
304
  """Deactivates the master IP.
305

306
  """
307
  if not opts.confirm:
308
    usertext = ("This will disable the master IP. All the open connections to"
309
                " the master IP will be closed. To reach the master you will"
310
                " need to use its node IP."
311
                " Continue?")
312
    if not AskUser(usertext):
313
      return 1
314

    
315
  op = opcodes.OpClusterDeactivateMasterIp()
316
  SubmitOpCode(op)
317
  return 0
318

    
319

    
320
def RedistributeConfig(opts, args):
321
  """Forces push of the cluster configuration.
322

323
  @param opts: the command line options selected by the user
324
  @type args: list
325
  @param args: empty list
326
  @rtype: int
327
  @return: the desired exit code
328

329
  """
330
  op = opcodes.OpClusterRedistConf()
331
  SubmitOrSend(op, opts)
332
  return 0
333

    
334

    
335
def ShowClusterVersion(opts, args):
336
  """Write version of ganeti software to the standard output.
337

338
  @param opts: the command line options selected by the user
339
  @type args: list
340
  @param args: should be an empty list
341
  @rtype: int
342
  @return: the desired exit code
343

344
  """
345
  cl = GetClient(query=True)
346
  result = cl.QueryClusterInfo()
347
  ToStdout("Software version: %s", result["software_version"])
348
  ToStdout("Internode protocol: %s", result["protocol_version"])
349
  ToStdout("Configuration format: %s", result["config_version"])
350
  ToStdout("OS api version: %s", result["os_api_version"])
351
  ToStdout("Export interface: %s", result["export_version"])
352
  return 0
353

    
354

    
355
def ShowClusterMaster(opts, args):
356
  """Write name of master node to the standard output.
357

358
  @param opts: the command line options selected by the user
359
  @type args: list
360
  @param args: should be an empty list
361
  @rtype: int
362
  @return: the desired exit code
363

364
  """
365
  master = bootstrap.GetMaster()
366
  ToStdout(master)
367
  return 0
368

    
369

    
370
def _FormatGroupedParams(paramsdict, roman=False):
371
  """Format Grouped parameters (be, nic, disk) by group.
372

373
  @type paramsdict: dict of dicts
374
  @param paramsdict: {group: {param: value, ...}, ...}
375
  @rtype: dict of dicts
376
  @return: copy of the input dictionaries with strings as values
377

378
  """
379
  ret = {}
380
  for (item, val) in paramsdict.items():
381
    if isinstance(val, dict):
382
      ret[item] = _FormatGroupedParams(val, roman=roman)
383
    elif roman and isinstance(val, int):
384
      ret[item] = compat.TryToRoman(val)
385
    else:
386
      ret[item] = str(val)
387
  return ret
388

    
389

    
390
def ShowClusterConfig(opts, args):
391
  """Shows cluster information.
392

393
  @param opts: the command line options selected by the user
394
  @type args: list
395
  @param args: should be an empty list
396
  @rtype: int
397
  @return: the desired exit code
398

399
  """
400
  cl = GetClient(query=True)
401
  result = cl.QueryClusterInfo()
402

    
403
  if result["tags"]:
404
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
405
  else:
406
    tags = "(none)"
407
  if result["reserved_lvs"]:
408
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
409
  else:
410
    reserved_lvs = "(none)"
411

    
412
  info = [
413
    ("Cluster name", result["name"]),
414
    ("Cluster UUID", result["uuid"]),
415

    
416
    ("Creation time", utils.FormatTime(result["ctime"])),
417
    ("Modification time", utils.FormatTime(result["mtime"])),
418

    
419
    ("Master node", result["master"]),
420

    
421
    ("Architecture (this node)",
422
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
423

    
424
    ("Tags", tags),
425

    
426
    ("Default hypervisor", result["default_hypervisor"]),
427
    ("Enabled hypervisors",
428
     utils.CommaJoin(result["enabled_hypervisors"])),
429

    
430
    ("Hypervisor parameters", _FormatGroupedParams(result["hvparams"])),
431

    
432
    ("OS-specific hypervisor parameters",
433
     _FormatGroupedParams(result["os_hvp"])),
434

    
435
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
436

    
437
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
438
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
439

    
440
    ("Cluster parameters", [
441
      ("candidate pool size",
442
       compat.TryToRoman(result["candidate_pool_size"],
443
                         convert=opts.roman_integers)),
444
      ("master netdev", result["master_netdev"]),
445
      ("master netmask", result["master_netmask"]),
446
      ("use external master IP address setup script",
447
       result["use_external_mip_script"]),
448
      ("lvm volume group", result["volume_group_name"]),
449
      ("lvm reserved volumes", reserved_lvs),
450
      ("drbd usermode helper", result["drbd_usermode_helper"]),
451
      ("file storage path", result["file_storage_dir"]),
452
      ("shared file storage path", result["shared_file_storage_dir"]),
453
      ("maintenance of node health", result["maintain_node_health"]),
454
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
455
      ("default instance allocator", result["default_iallocator"]),
456
      ("primary ip version", result["primary_ip_version"]),
457
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
458
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
459
      ("ExtStorage Providers search path",
460
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
461
      ("enabled storage types",
462
       utils.CommaJoin(result["enabled_storage_types"])),
463
      ]),
464

    
465
    ("Default node parameters",
466
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
467

    
468
    ("Default instance parameters",
469
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
470

    
471
    ("Default nic parameters",
472
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
473

    
474
    ("Default disk parameters",
475
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
476

    
477
    ("Instance policy - limits for instances",
478
     [
479
       (key,
480
        _FormatGroupedParams(result["ipolicy"][key], roman=opts.roman_integers))
481
       for key in constants.IPOLICY_ISPECS
482
       ] +
483
     [
484
       ("enabled disk templates",
485
        utils.CommaJoin(result["ipolicy"][constants.IPOLICY_DTS])),
486
       ] +
487
     [
488
       (key, result["ipolicy"][key])
489
       for key in constants.IPOLICY_PARAMETERS
490
       ]),
491
    ]
492

    
493
  PrintGenericInfo(info)
494
  return 0
495

    
496

    
497
def ClusterCopyFile(opts, args):
498
  """Copy a file from master to some nodes.
499

500
  @param opts: the command line options selected by the user
501
  @type args: list
502
  @param args: should contain only one element, the path of
503
      the file to be copied
504
  @rtype: int
505
  @return: the desired exit code
506

507
  """
508
  filename = args[0]
509
  if not os.path.exists(filename):
510
    raise errors.OpPrereqError("No such filename '%s'" % filename,
511
                               errors.ECODE_INVAL)
512

    
513
  cl = GetClient()
514

    
515
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
516

    
517
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
518
                           secondary_ips=opts.use_replication_network,
519
                           nodegroup=opts.nodegroup)
520

    
521
  srun = ssh.SshRunner(cluster_name)
522
  for node in results:
523
    if not srun.CopyFileToNode(node, filename):
524
      ToStderr("Copy of file %s to node %s failed", filename, node)
525

    
526
  return 0
527

    
528

    
529
def RunClusterCommand(opts, args):
530
  """Run a command on some nodes.
531

532
  @param opts: the command line options selected by the user
533
  @type args: list
534
  @param args: should contain the command to be run and its arguments
535
  @rtype: int
536
  @return: the desired exit code
537

538
  """
539
  cl = GetClient()
540

    
541
  command = " ".join(args)
542

    
543
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
544

    
545
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
546
                                                    "master_node"])
547

    
548
  srun = ssh.SshRunner(cluster_name=cluster_name)
549

    
550
  # Make sure master node is at list end
551
  if master_node in nodes:
552
    nodes.remove(master_node)
553
    nodes.append(master_node)
554

    
555
  for name in nodes:
556
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
557

    
558
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
559
      # Do not output anything for successful commands
560
      continue
561

    
562
    ToStdout("------------------------------------------------")
563
    if opts.show_machine_names:
564
      for line in result.output.splitlines():
565
        ToStdout("%s: %s", name, line)
566
    else:
567
      ToStdout("node: %s", name)
568
      ToStdout("%s", result.output)
569
    ToStdout("return code = %s", result.exit_code)
570

    
571
  return 0
572

    
573

    
574
def VerifyCluster(opts, args):
575
  """Verify integrity of cluster, performing various test on nodes.
576

577
  @param opts: the command line options selected by the user
578
  @type args: list
579
  @param args: should be an empty list
580
  @rtype: int
581
  @return: the desired exit code
582

583
  """
584
  skip_checks = []
585

    
586
  if opts.skip_nplusone_mem:
587
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
588

    
589
  cl = GetClient()
590

    
591
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
592
                               error_codes=opts.error_codes,
593
                               debug_simulate_errors=opts.simulate_errors,
594
                               skip_checks=skip_checks,
595
                               ignore_errors=opts.ignore_errors,
596
                               group_name=opts.nodegroup)
597
  result = SubmitOpCode(op, cl=cl, opts=opts)
598

    
599
  # Keep track of submitted jobs
600
  jex = JobExecutor(cl=cl, opts=opts)
601

    
602
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
603
    jex.AddJobId(None, status, job_id)
604

    
605
  results = jex.GetResults()
606

    
607
  (bad_jobs, bad_results) = \
608
    map(len,
609
        # Convert iterators to lists
610
        map(list,
611
            # Count errors
612
            map(compat.partial(itertools.ifilterfalse, bool),
613
                # Convert result to booleans in a tuple
614
                zip(*((job_success, len(op_results) == 1 and op_results[0])
615
                      for (job_success, op_results) in results)))))
616

    
617
  if bad_jobs == 0 and bad_results == 0:
618
    rcode = constants.EXIT_SUCCESS
619
  else:
620
    rcode = constants.EXIT_FAILURE
621
    if bad_jobs > 0:
622
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
623

    
624
  return rcode
625

    
626

    
627
def VerifyDisks(opts, args):
628
  """Verify integrity of cluster disks.
629

630
  @param opts: the command line options selected by the user
631
  @type args: list
632
  @param args: should be an empty list
633
  @rtype: int
634
  @return: the desired exit code
635

636
  """
637
  cl = GetClient()
638

    
639
  op = opcodes.OpClusterVerifyDisks()
640

    
641
  result = SubmitOpCode(op, cl=cl, opts=opts)
642

    
643
  # Keep track of submitted jobs
644
  jex = JobExecutor(cl=cl, opts=opts)
645

    
646
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
647
    jex.AddJobId(None, status, job_id)
648

    
649
  retcode = constants.EXIT_SUCCESS
650

    
651
  for (status, result) in jex.GetResults():
652
    if not status:
653
      ToStdout("Job failed: %s", result)
654
      continue
655

    
656
    ((bad_nodes, instances, missing), ) = result
657

    
658
    for node, text in bad_nodes.items():
659
      ToStdout("Error gathering data on node %s: %s",
660
               node, utils.SafeEncode(text[-400:]))
661
      retcode = constants.EXIT_FAILURE
662
      ToStdout("You need to fix these nodes first before fixing instances")
663

    
664
    for iname in instances:
665
      if iname in missing:
666
        continue
667
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
668
      try:
669
        ToStdout("Activating disks for instance '%s'", iname)
670
        SubmitOpCode(op, opts=opts, cl=cl)
671
      except errors.GenericError, err:
672
        nret, msg = FormatError(err)
673
        retcode |= nret
674
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
675

    
676
    if missing:
677
      for iname, ival in missing.iteritems():
678
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
679
        if all_missing:
680
          ToStdout("Instance %s cannot be verified as it lives on"
681
                   " broken nodes", iname)
682
        else:
683
          ToStdout("Instance %s has missing logical volumes:", iname)
684
          ival.sort()
685
          for node, vol in ival:
686
            if node in bad_nodes:
687
              ToStdout("\tbroken node %s /dev/%s", node, vol)
688
            else:
689
              ToStdout("\t%s /dev/%s", node, vol)
690

    
691
      ToStdout("You need to replace or recreate disks for all the above"
692
               " instances if this message persists after fixing broken nodes.")
693
      retcode = constants.EXIT_FAILURE
694
    elif not instances:
695
      ToStdout("No disks need to be activated.")
696

    
697
  return retcode
698

    
699

    
700
def RepairDiskSizes(opts, args):
701
  """Verify sizes of cluster disks.
702

703
  @param opts: the command line options selected by the user
704
  @type args: list
705
  @param args: optional list of instances to restrict check to
706
  @rtype: int
707
  @return: the desired exit code
708

709
  """
710
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
711
  SubmitOpCode(op, opts=opts)
712

    
713

    
714
@UsesRPC
715
def MasterFailover(opts, args):
716
  """Failover the master node.
717

718
  This command, when run on a non-master node, will cause the current
719
  master to cease being master, and the non-master to become new
720
  master.
721

722
  @param opts: the command line options selected by the user
723
  @type args: list
724
  @param args: should be an empty list
725
  @rtype: int
726
  @return: the desired exit code
727

728
  """
729
  if opts.no_voting and not opts.yes_do_it:
730
    usertext = ("This will perform the failover even if most other nodes"
731
                " are down, or if this node is outdated. This is dangerous"
732
                " as it can lead to a non-consistent cluster. Check the"
733
                " gnt-cluster(8) man page before proceeding. Continue?")
734
    if not AskUser(usertext):
735
      return 1
736

    
737
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
738

    
739

    
740
def MasterPing(opts, args):
741
  """Checks if the master is alive.
742

743
  @param opts: the command line options selected by the user
744
  @type args: list
745
  @param args: should be an empty list
746
  @rtype: int
747
  @return: the desired exit code
748

749
  """
750
  try:
751
    cl = GetClient()
752
    cl.QueryClusterInfo()
753
    return 0
754
  except Exception: # pylint: disable=W0703
755
    return 1
756

    
757

    
758
def SearchTags(opts, args):
759
  """Searches the tags on all the cluster.
760

761
  @param opts: the command line options selected by the user
762
  @type args: list
763
  @param args: should contain only one element, the tag pattern
764
  @rtype: int
765
  @return: the desired exit code
766

767
  """
768
  op = opcodes.OpTagsSearch(pattern=args[0])
769
  result = SubmitOpCode(op, opts=opts)
770
  if not result:
771
    return 1
772
  result = list(result)
773
  result.sort()
774
  for path, tag in result:
775
    ToStdout("%s %s", path, tag)
776

    
777

    
778
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
779
  """Reads and verifies an X509 certificate.
780

781
  @type cert_filename: string
782
  @param cert_filename: the path of the file containing the certificate to
783
                        verify encoded in PEM format
784
  @type verify_private_key: bool
785
  @param verify_private_key: whether to verify the private key in addition to
786
                             the public certificate
787
  @rtype: string
788
  @return: a string containing the PEM-encoded certificate.
789

790
  """
791
  try:
792
    pem = utils.ReadFile(cert_filename)
793
  except IOError, err:
794
    raise errors.X509CertError(cert_filename,
795
                               "Unable to read certificate: %s" % str(err))
796

    
797
  try:
798
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
799
  except Exception, err:
800
    raise errors.X509CertError(cert_filename,
801
                               "Unable to load certificate: %s" % str(err))
802

    
803
  if verify_private_key:
804
    try:
805
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
806
    except Exception, err:
807
      raise errors.X509CertError(cert_filename,
808
                                 "Unable to load private key: %s" % str(err))
809

    
810
  return pem
811

    
812

    
813
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
814
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
815
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
816
                 cds_filename, force):
817
  """Renews cluster certificates, keys and secrets.
818

819
  @type new_cluster_cert: bool
820
  @param new_cluster_cert: Whether to generate a new cluster certificate
821
  @type new_rapi_cert: bool
822
  @param new_rapi_cert: Whether to generate a new RAPI certificate
823
  @type rapi_cert_filename: string
824
  @param rapi_cert_filename: Path to file containing new RAPI certificate
825
  @type new_spice_cert: bool
826
  @param new_spice_cert: Whether to generate a new SPICE certificate
827
  @type spice_cert_filename: string
828
  @param spice_cert_filename: Path to file containing new SPICE certificate
829
  @type spice_cacert_filename: string
830
  @param spice_cacert_filename: Path to file containing the certificate of the
831
                                CA that signed the SPICE certificate
832
  @type new_confd_hmac_key: bool
833
  @param new_confd_hmac_key: Whether to generate a new HMAC key
834
  @type new_cds: bool
835
  @param new_cds: Whether to generate a new cluster domain secret
836
  @type cds_filename: string
837
  @param cds_filename: Path to file containing new cluster domain secret
838
  @type force: bool
839
  @param force: Whether to ask user for confirmation
840

841
  """
842
  if new_rapi_cert and rapi_cert_filename:
843
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
844
             " options can be specified at the same time.")
845
    return 1
846

    
847
  if new_cds and cds_filename:
848
    ToStderr("Only one of the --new-cluster-domain-secret and"
849
             " --cluster-domain-secret options can be specified at"
850
             " the same time.")
851
    return 1
852

    
853
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
854
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
855
             " and --spice-ca-certificate must not be used.")
856
    return 1
857

    
858
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
859
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
860
             " specified.")
861
    return 1
862

    
863
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
864
  try:
865
    if rapi_cert_filename:
866
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
867
    if spice_cert_filename:
868
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
869
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
870
  except errors.X509CertError, err:
871
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
872
    return 1
873

    
874
  if cds_filename:
875
    try:
876
      cds = utils.ReadFile(cds_filename)
877
    except Exception, err: # pylint: disable=W0703
878
      ToStderr("Can't load new cluster domain secret from %s: %s" %
879
               (cds_filename, str(err)))
880
      return 1
881
  else:
882
    cds = None
883

    
884
  if not force:
885
    usertext = ("This requires all daemons on all nodes to be restarted and"
886
                " may take some time. Continue?")
887
    if not AskUser(usertext):
888
      return 1
889

    
890
  def _RenewCryptoInner(ctx):
891
    ctx.feedback_fn("Updating certificates and keys")
892
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
893
                                    new_rapi_cert,
894
                                    new_spice_cert,
895
                                    new_confd_hmac_key,
896
                                    new_cds,
897
                                    rapi_cert_pem=rapi_cert_pem,
898
                                    spice_cert_pem=spice_cert_pem,
899
                                    spice_cacert_pem=spice_cacert_pem,
900
                                    cds=cds)
901

    
902
    files_to_copy = []
903

    
904
    if new_cluster_cert:
905
      files_to_copy.append(pathutils.NODED_CERT_FILE)
906

    
907
    if new_rapi_cert or rapi_cert_pem:
908
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
909

    
910
    if new_spice_cert or spice_cert_pem:
911
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
912
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
913

    
914
    if new_confd_hmac_key:
915
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
916

    
917
    if new_cds or cds:
918
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
919

    
920
    if files_to_copy:
921
      for node_name in ctx.nonmaster_nodes:
922
        ctx.feedback_fn("Copying %s to %s" %
923
                        (", ".join(files_to_copy), node_name))
924
        for file_name in files_to_copy:
925
          ctx.ssh.CopyFileToNode(node_name, file_name)
926

    
927
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
928

    
929
  ToStdout("All requested certificates and keys have been replaced."
930
           " Running \"gnt-cluster verify\" now is recommended.")
931

    
932
  return 0
933

    
934

    
935
def RenewCrypto(opts, args):
936
  """Renews cluster certificates, keys and secrets.
937

938
  """
939
  return _RenewCrypto(opts.new_cluster_cert,
940
                      opts.new_rapi_cert,
941
                      opts.rapi_cert,
942
                      opts.new_spice_cert,
943
                      opts.spice_cert,
944
                      opts.spice_cacert,
945
                      opts.new_confd_hmac_key,
946
                      opts.new_cluster_domain_secret,
947
                      opts.cluster_domain_secret,
948
                      opts.force)
949

    
950

    
951
def SetClusterParams(opts, args):
952
  """Modify the cluster.
953

954
  @param opts: the command line options selected by the user
955
  @type args: list
956
  @param args: should be an empty list
957
  @rtype: int
958
  @return: the desired exit code
959

960
  """
961
  if not (not opts.lvm_storage or opts.vg_name or
962
          not opts.drbd_storage or opts.drbd_helper or
963
          opts.enabled_hypervisors or opts.hvparams or
964
          opts.beparams or opts.nicparams or
965
          opts.ndparams or opts.diskparams or
966
          opts.candidate_pool_size is not None or
967
          opts.uid_pool is not None or
968
          opts.maintain_node_health is not None or
969
          opts.add_uids is not None or
970
          opts.remove_uids is not None or
971
          opts.default_iallocator is not None or
972
          opts.reserved_lvs is not None or
973
          opts.master_netdev is not None or
974
          opts.master_netmask is not None or
975
          opts.use_external_mip_script is not None or
976
          opts.prealloc_wipe_disks is not None or
977
          opts.hv_state or
978
          opts.enabled_storage_types or
979
          opts.disk_state or
980
          opts.ispecs_mem_size or
981
          opts.ispecs_cpu_count or
982
          opts.ispecs_disk_count or
983
          opts.ispecs_disk_size or
984
          opts.ispecs_nic_count or
985
          opts.ipolicy_disk_templates is not None or
986
          opts.ipolicy_vcpu_ratio is not None or
987
          opts.ipolicy_spindle_ratio is not None):
988
    ToStderr("Please give at least one of the parameters.")
989
    return 1
990

    
991
  vg_name = opts.vg_name
992
  if not opts.lvm_storage and opts.vg_name:
993
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
994
    return 1
995

    
996
  if not opts.lvm_storage:
997
    vg_name = ""
998

    
999
  drbd_helper = opts.drbd_helper
1000
  if not opts.drbd_storage and opts.drbd_helper:
1001
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
1002
    return 1
1003

    
1004
  if not opts.drbd_storage:
1005
    drbd_helper = ""
1006

    
1007
  hvlist = opts.enabled_hypervisors
1008
  if hvlist is not None:
1009
    hvlist = hvlist.split(",")
1010

    
1011
  enabled_storage_types = opts.enabled_storage_types
1012
  if enabled_storage_types is not None:
1013
    enabled_storage_types = enabled_storage_types.split(",")
1014

    
1015
  # a list of (name, dict) we can pass directly to dict() (or [])
1016
  hvparams = dict(opts.hvparams)
1017
  for hv_params in hvparams.values():
1018
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1019

    
1020
  diskparams = dict(opts.diskparams)
1021

    
1022
  for dt_params in diskparams.values():
1023
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1024

    
1025
  beparams = opts.beparams
1026
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1027

    
1028
  nicparams = opts.nicparams
1029
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1030

    
1031
  ndparams = opts.ndparams
1032
  if ndparams is not None:
1033
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1034

    
1035
  ipolicy = CreateIPolicyFromOpts(
1036
    ispecs_mem_size=opts.ispecs_mem_size,
1037
    ispecs_cpu_count=opts.ispecs_cpu_count,
1038
    ispecs_disk_count=opts.ispecs_disk_count,
1039
    ispecs_disk_size=opts.ispecs_disk_size,
1040
    ispecs_nic_count=opts.ispecs_nic_count,
1041
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1042
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1043
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1044
    )
1045

    
1046
  mnh = opts.maintain_node_health
1047

    
1048
  uid_pool = opts.uid_pool
1049
  if uid_pool is not None:
1050
    uid_pool = uidpool.ParseUidPool(uid_pool)
1051

    
1052
  add_uids = opts.add_uids
1053
  if add_uids is not None:
1054
    add_uids = uidpool.ParseUidPool(add_uids)
1055

    
1056
  remove_uids = opts.remove_uids
1057
  if remove_uids is not None:
1058
    remove_uids = uidpool.ParseUidPool(remove_uids)
1059

    
1060
  if opts.reserved_lvs is not None:
1061
    if opts.reserved_lvs == "":
1062
      opts.reserved_lvs = []
1063
    else:
1064
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1065

    
1066
  if opts.master_netmask is not None:
1067
    try:
1068
      opts.master_netmask = int(opts.master_netmask)
1069
    except ValueError:
1070
      ToStderr("The --master-netmask option expects an int parameter.")
1071
      return 1
1072

    
1073
  ext_ip_script = opts.use_external_mip_script
1074

    
1075
  if opts.disk_state:
1076
    disk_state = utils.FlatToDict(opts.disk_state)
1077
  else:
1078
    disk_state = {}
1079

    
1080
  hv_state = dict(opts.hv_state)
1081

    
1082
  op = opcodes.OpClusterSetParams(
1083
    vg_name=vg_name,
1084
    drbd_helper=drbd_helper,
1085
    enabled_hypervisors=hvlist,
1086
    hvparams=hvparams,
1087
    os_hvp=None,
1088
    beparams=beparams,
1089
    nicparams=nicparams,
1090
    ndparams=ndparams,
1091
    diskparams=diskparams,
1092
    ipolicy=ipolicy,
1093
    candidate_pool_size=opts.candidate_pool_size,
1094
    maintain_node_health=mnh,
1095
    uid_pool=uid_pool,
1096
    add_uids=add_uids,
1097
    remove_uids=remove_uids,
1098
    default_iallocator=opts.default_iallocator,
1099
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1100
    master_netdev=opts.master_netdev,
1101
    master_netmask=opts.master_netmask,
1102
    reserved_lvs=opts.reserved_lvs,
1103
    use_external_mip_script=ext_ip_script,
1104
    hv_state=hv_state,
1105
    disk_state=disk_state,
1106
    enabled_storage_types=enabled_storage_types,
1107
    )
1108
  SubmitOrSend(op, opts)
1109
  return 0
1110

    
1111

    
1112
def QueueOps(opts, args):
1113
  """Queue operations.
1114

1115
  @param opts: the command line options selected by the user
1116
  @type args: list
1117
  @param args: should contain only one element, the subcommand
1118
  @rtype: int
1119
  @return: the desired exit code
1120

1121
  """
1122
  command = args[0]
1123
  client = GetClient()
1124
  if command in ("drain", "undrain"):
1125
    drain_flag = command == "drain"
1126
    client.SetQueueDrainFlag(drain_flag)
1127
  elif command == "info":
1128
    result = client.QueryConfigValues(["drain_flag"])
1129
    if result[0]:
1130
      val = "set"
1131
    else:
1132
      val = "unset"
1133
    ToStdout("The drain flag is %s" % val)
1134
  else:
1135
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1136
                               errors.ECODE_INVAL)
1137

    
1138
  return 0
1139

    
1140

    
1141
def _ShowWatcherPause(until):
1142
  if until is None or until < time.time():
1143
    ToStdout("The watcher is not paused.")
1144
  else:
1145
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1146

    
1147

    
1148
def WatcherOps(opts, args):
1149
  """Watcher operations.
1150

1151
  @param opts: the command line options selected by the user
1152
  @type args: list
1153
  @param args: should contain only one element, the subcommand
1154
  @rtype: int
1155
  @return: the desired exit code
1156

1157
  """
1158
  command = args[0]
1159
  client = GetClient()
1160

    
1161
  if command == "continue":
1162
    client.SetWatcherPause(None)
1163
    ToStdout("The watcher is no longer paused.")
1164

    
1165
  elif command == "pause":
1166
    if len(args) < 2:
1167
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1168

    
1169
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1170
    _ShowWatcherPause(result)
1171

    
1172
  elif command == "info":
1173
    result = client.QueryConfigValues(["watcher_pause"])
1174
    _ShowWatcherPause(result[0])
1175

    
1176
  else:
1177
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1178
                               errors.ECODE_INVAL)
1179

    
1180
  return 0
1181

    
1182

    
1183
def _OobPower(opts, node_list, power):
1184
  """Puts the node in the list to desired power state.
1185

1186
  @param opts: The command line options selected by the user
1187
  @param node_list: The list of nodes to operate on
1188
  @param power: True if they should be powered on, False otherwise
1189
  @return: The success of the operation (none failed)
1190

1191
  """
1192
  if power:
1193
    command = constants.OOB_POWER_ON
1194
  else:
1195
    command = constants.OOB_POWER_OFF
1196

    
1197
  op = opcodes.OpOobCommand(node_names=node_list,
1198
                            command=command,
1199
                            ignore_status=True,
1200
                            timeout=opts.oob_timeout,
1201
                            power_delay=opts.power_delay)
1202
  result = SubmitOpCode(op, opts=opts)
1203
  errs = 0
1204
  for node_result in result:
1205
    (node_tuple, data_tuple) = node_result
1206
    (_, node_name) = node_tuple
1207
    (data_status, _) = data_tuple
1208
    if data_status != constants.RS_NORMAL:
1209
      assert data_status != constants.RS_UNAVAIL
1210
      errs += 1
1211
      ToStderr("There was a problem changing power for %s, please investigate",
1212
               node_name)
1213

    
1214
  if errs > 0:
1215
    return False
1216

    
1217
  return True
1218

    
1219

    
1220
def _InstanceStart(opts, inst_list, start, no_remember=False):
1221
  """Puts the instances in the list to desired state.
1222

1223
  @param opts: The command line options selected by the user
1224
  @param inst_list: The list of instances to operate on
1225
  @param start: True if they should be started, False for shutdown
1226
  @param no_remember: If the instance state should be remembered
1227
  @return: The success of the operation (none failed)
1228

1229
  """
1230
  if start:
1231
    opcls = opcodes.OpInstanceStartup
1232
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1233
  else:
1234
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1235
                           timeout=opts.shutdown_timeout,
1236
                           no_remember=no_remember)
1237
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1238

    
1239
  jex = JobExecutor(opts=opts)
1240

    
1241
  for inst in inst_list:
1242
    ToStdout("Submit %s of instance %s", text_submit, inst)
1243
    op = opcls(instance_name=inst)
1244
    jex.QueueJob(inst, op)
1245

    
1246
  results = jex.GetResults()
1247
  bad_cnt = len([1 for (success, _) in results if not success])
1248

    
1249
  if bad_cnt == 0:
1250
    ToStdout("All instances have been %s successfully", text_success)
1251
  else:
1252
    ToStderr("There were errors while %s instances:\n"
1253
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1254
             len(results))
1255
    return False
1256

    
1257
  return True
1258

    
1259

    
1260
class _RunWhenNodesReachableHelper:
1261
  """Helper class to make shared internal state sharing easier.
1262

1263
  @ivar success: Indicates if all action_cb calls were successful
1264

1265
  """
1266
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1267
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1268
    """Init the object.
1269

1270
    @param node_list: The list of nodes to be reachable
1271
    @param action_cb: Callback called when a new host is reachable
1272
    @type node2ip: dict
1273
    @param node2ip: Node to ip mapping
1274
    @param port: The port to use for the TCP ping
1275
    @param feedback_fn: The function used for feedback
1276
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1277
    @param _sleep_fn: Function to sleep (for unittest use only)
1278

1279
    """
1280
    self.down = set(node_list)
1281
    self.up = set()
1282
    self.node2ip = node2ip
1283
    self.success = True
1284
    self.action_cb = action_cb
1285
    self.port = port
1286
    self.feedback_fn = feedback_fn
1287
    self._ping_fn = _ping_fn
1288
    self._sleep_fn = _sleep_fn
1289

    
1290
  def __call__(self):
1291
    """When called we run action_cb.
1292

1293
    @raises utils.RetryAgain: When there are still down nodes
1294

1295
    """
1296
    if not self.action_cb(self.up):
1297
      self.success = False
1298

    
1299
    if self.down:
1300
      raise utils.RetryAgain()
1301
    else:
1302
      return self.success
1303

    
1304
  def Wait(self, secs):
1305
    """Checks if a host is up or waits remaining seconds.
1306

1307
    @param secs: The secs remaining
1308

1309
    """
1310
    start = time.time()
1311
    for node in self.down:
1312
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1313
                       live_port_needed=True):
1314
        self.feedback_fn("Node %s became available" % node)
1315
        self.up.add(node)
1316
        self.down -= self.up
1317
        # If we have a node available there is the possibility to run the
1318
        # action callback successfully, therefore we don't wait and return
1319
        return
1320

    
1321
    self._sleep_fn(max(0.0, start + secs - time.time()))
1322

    
1323

    
1324
def _RunWhenNodesReachable(node_list, action_cb, interval):
1325
  """Run action_cb when nodes become reachable.
1326

1327
  @param node_list: The list of nodes to be reachable
1328
  @param action_cb: Callback called when a new host is reachable
1329
  @param interval: The earliest time to retry
1330

1331
  """
1332
  client = GetClient()
1333
  cluster_info = client.QueryClusterInfo()
1334
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1335
    family = netutils.IPAddress.family
1336
  else:
1337
    family = netutils.IP6Address.family
1338

    
1339
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1340
                 for node in node_list)
1341

    
1342
  port = netutils.GetDaemonPort(constants.NODED)
1343
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1344
                                        ToStdout)
1345

    
1346
  try:
1347
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1348
                       wait_fn=helper.Wait)
1349
  except utils.RetryTimeout:
1350
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1351
             " again:\n  - %s", "  - ".join(helper.down))
1352
    return False
1353

    
1354

    
1355
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1356
                          _instance_start_fn=_InstanceStart):
1357
  """Start the instances conditional based on node_states.
1358

1359
  @param opts: The command line options selected by the user
1360
  @param inst_map: A dict of inst -> nodes mapping
1361
  @param nodes_online: A list of nodes online
1362
  @param _instance_start_fn: Callback to start instances (unittest use only)
1363
  @return: Success of the operation on all instances
1364

1365
  """
1366
  start_inst_list = []
1367
  for (inst, nodes) in inst_map.items():
1368
    if not (nodes - nodes_online):
1369
      # All nodes the instance lives on are back online
1370
      start_inst_list.append(inst)
1371

    
1372
  for inst in start_inst_list:
1373
    del inst_map[inst]
1374

    
1375
  if start_inst_list:
1376
    return _instance_start_fn(opts, start_inst_list, True)
1377

    
1378
  return True
1379

    
1380

    
1381
def _EpoOn(opts, full_node_list, node_list, inst_map):
1382
  """Does the actual power on.
1383

1384
  @param opts: The command line options selected by the user
1385
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1386
                         OOB)
1387
  @param node_list: The list of nodes to operate on (all need to support OOB)
1388
  @param inst_map: A dict of inst -> nodes mapping
1389
  @return: The desired exit status
1390

1391
  """
1392
  if node_list and not _OobPower(opts, node_list, False):
1393
    ToStderr("Not all nodes seem to get back up, investigate and start"
1394
             " manually if needed")
1395

    
1396
  # Wait for the nodes to be back up
1397
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1398

    
1399
  ToStdout("Waiting until all nodes are available again")
1400
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1401
    ToStderr("Please investigate and start stopped instances manually")
1402
    return constants.EXIT_FAILURE
1403

    
1404
  return constants.EXIT_SUCCESS
1405

    
1406

    
1407
def _EpoOff(opts, node_list, inst_map):
1408
  """Does the actual power off.
1409

1410
  @param opts: The command line options selected by the user
1411
  @param node_list: The list of nodes to operate on (all need to support OOB)
1412
  @param inst_map: A dict of inst -> nodes mapping
1413
  @return: The desired exit status
1414

1415
  """
1416
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1417
    ToStderr("Please investigate and stop instances manually before continuing")
1418
    return constants.EXIT_FAILURE
1419

    
1420
  if not node_list:
1421
    return constants.EXIT_SUCCESS
1422

    
1423
  if _OobPower(opts, node_list, False):
1424
    return constants.EXIT_SUCCESS
1425
  else:
1426
    return constants.EXIT_FAILURE
1427

    
1428

    
1429
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1430
        _confirm_fn=ConfirmOperation,
1431
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1432
  """EPO operations.
1433

1434
  @param opts: the command line options selected by the user
1435
  @type args: list
1436
  @param args: should contain only one element, the subcommand
1437
  @rtype: int
1438
  @return: the desired exit code
1439

1440
  """
1441
  if opts.groups and opts.show_all:
1442
    _stderr_fn("Only one of --groups or --all are allowed")
1443
    return constants.EXIT_FAILURE
1444
  elif args and opts.show_all:
1445
    _stderr_fn("Arguments in combination with --all are not allowed")
1446
    return constants.EXIT_FAILURE
1447

    
1448
  if cl is None:
1449
    cl = GetClient()
1450

    
1451
  if opts.groups:
1452
    node_query_list = \
1453
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1454
  else:
1455
    node_query_list = args
1456

    
1457
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1458
                                           "sinst_list", "powered", "offline"],
1459
                         False)
1460

    
1461
  all_nodes = map(compat.fst, result)
1462
  node_list = []
1463
  inst_map = {}
1464
  for (node, master, pinsts, sinsts, powered, offline) in result:
1465
    if not offline:
1466
      for inst in (pinsts + sinsts):
1467
        if inst in inst_map:
1468
          if not master:
1469
            inst_map[inst].add(node)
1470
        elif master:
1471
          inst_map[inst] = set()
1472
        else:
1473
          inst_map[inst] = set([node])
1474

    
1475
    if master and opts.on:
1476
      # We ignore the master for turning on the machines, in fact we are
1477
      # already operating on the master at this point :)
1478
      continue
1479
    elif master and not opts.show_all:
1480
      _stderr_fn("%s is the master node, please do a master-failover to another"
1481
                 " node not affected by the EPO or use --all if you intend to"
1482
                 " shutdown the whole cluster", node)
1483
      return constants.EXIT_FAILURE
1484
    elif powered is None:
1485
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1486
                 " handled in a fully automated manner", node)
1487
    elif powered == opts.on:
1488
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1489
    elif not offline or (offline and powered):
1490
      node_list.append(node)
1491

    
1492
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1493
    return constants.EXIT_FAILURE
1494

    
1495
  if opts.on:
1496
    return _on_fn(opts, all_nodes, node_list, inst_map)
1497
  else:
1498
    return _off_fn(opts, node_list, inst_map)
1499

    
1500

    
1501
commands = {
1502
  "init": (
1503
    InitCluster, [ArgHost(min=1, max=1)],
1504
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1505
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1506
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1507
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1508
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1509
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1510
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1511
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_STORAGE_TYPES_OPT]
1512
     + INSTANCE_POLICY_OPTS,
1513
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1514
  "destroy": (
1515
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1516
    "", "Destroy cluster"),
1517
  "rename": (
1518
    RenameCluster, [ArgHost(min=1, max=1)],
1519
    [FORCE_OPT, DRY_RUN_OPT],
1520
    "<new_name>",
1521
    "Renames the cluster"),
1522
  "redist-conf": (
1523
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1524
    "", "Forces a push of the configuration file and ssconf files"
1525
    " to the nodes in the cluster"),
1526
  "verify": (
1527
    VerifyCluster, ARGS_NONE,
1528
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1529
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1530
    "", "Does a check on the cluster configuration"),
1531
  "verify-disks": (
1532
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1533
    "", "Does a check on the cluster disk status"),
1534
  "repair-disk-sizes": (
1535
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1536
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1537
  "master-failover": (
1538
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1539
    "", "Makes the current node the master"),
1540
  "master-ping": (
1541
    MasterPing, ARGS_NONE, [],
1542
    "", "Checks if the master is alive"),
1543
  "version": (
1544
    ShowClusterVersion, ARGS_NONE, [],
1545
    "", "Shows the cluster version"),
1546
  "getmaster": (
1547
    ShowClusterMaster, ARGS_NONE, [],
1548
    "", "Shows the cluster master"),
1549
  "copyfile": (
1550
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1551
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1552
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1553
  "command": (
1554
    RunClusterCommand, [ArgCommand(min=1)],
1555
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1556
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1557
  "info": (
1558
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1559
    "[--roman]", "Show cluster configuration"),
1560
  "list-tags": (
1561
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1562
  "add-tags": (
1563
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1564
    "tag...", "Add tags to the cluster"),
1565
  "remove-tags": (
1566
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1567
    "tag...", "Remove tags from the cluster"),
1568
  "search-tags": (
1569
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1570
    "Searches the tags on all objects on"
1571
    " the cluster for a given pattern (regex)"),
1572
  "queue": (
1573
    QueueOps,
1574
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1575
    [], "drain|undrain|info", "Change queue properties"),
1576
  "watcher": (
1577
    WatcherOps,
1578
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1579
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1580
    [],
1581
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1582
  "modify": (
1583
    SetClusterParams, ARGS_NONE,
1584
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1585
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1586
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1587
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1588
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1589
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1590
     DISK_STATE_OPT, SUBMIT_OPT, ENABLED_STORAGE_TYPES_OPT] +
1591
    INSTANCE_POLICY_OPTS,
1592
    "[opts...]",
1593
    "Alters the parameters of the cluster"),
1594
  "renew-crypto": (
1595
    RenewCrypto, ARGS_NONE,
1596
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1597
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1598
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1599
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1600
    "[opts...]",
1601
    "Renews cluster certificates, keys and secrets"),
1602
  "epo": (
1603
    Epo, [ArgUnknown()],
1604
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1605
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1606
    "[opts...] [args]",
1607
    "Performs an emergency power-off on given args"),
1608
  "activate-master-ip": (
1609
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1610
  "deactivate-master-ip": (
1611
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1612
    "Deactivates the master IP"),
1613
  }
1614

    
1615

    
1616
#: dictionary with aliases for commands
1617
aliases = {
1618
  "masterfailover": "master-failover",
1619
  "show": "info",
1620
}
1621

    
1622

    
1623
def Main():
1624
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1625
                     aliases=aliases)