Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 84e110aa

History | View | Annotate | Download (51.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45
from ganeti import pathutils
46

    
47

    
48
ON_OPT = cli_option("--on", default=False,
49
                    action="store_true", dest="on",
50
                    help="Recover from an EPO")
51

    
52
GROUPS_OPT = cli_option("--groups", default=False,
53
                        action="store_true", dest="groups",
54
                        help="Arguments are node groups instead of nodes")
55

    
56
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
57
                            help="Override interactive check for --no-voting",
58
                            default=False, action="store_true")
59

    
60
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
61
_EPO_PING_TIMEOUT = 1 # 1 second
62
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
63

    
64

    
65
@UsesRPC
66
def InitCluster(opts, args):
67
  """Initialize the cluster.
68

69
  @param opts: the command line options selected by the user
70
  @type args: list
71
  @param args: should contain only one element, the desired
72
      cluster name
73
  @rtype: int
74
  @return: the desired exit code
75

76
  """
77
  if not opts.lvm_storage and opts.vg_name:
78
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
79
    return 1
80

    
81
  vg_name = opts.vg_name
82
  if opts.lvm_storage and not opts.vg_name:
83
    vg_name = constants.DEFAULT_VG
84

    
85
  if not opts.drbd_storage and opts.drbd_helper:
86
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
87
    return 1
88

    
89
  drbd_helper = opts.drbd_helper
90
  if opts.drbd_storage and not opts.drbd_helper:
91
    drbd_helper = constants.DEFAULT_DRBD_HELPER
92

    
93
  master_netdev = opts.master_netdev
94
  if master_netdev is None:
95
    master_netdev = constants.DEFAULT_BRIDGE
96

    
97
  hvlist = opts.enabled_hypervisors
98
  if hvlist is None:
99
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
100
  hvlist = hvlist.split(",")
101

    
102
  hvparams = dict(opts.hvparams)
103
  beparams = opts.beparams
104
  nicparams = opts.nicparams
105

    
106
  diskparams = dict(opts.diskparams)
107

    
108
  # check the disk template types here, as we cannot rely on the type check done
109
  # by the opcode parameter types
110
  diskparams_keys = set(diskparams.keys())
111
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
112
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
113
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
114
    return 1
115

    
116
  # prepare beparams dict
117
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
118
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
119

    
120
  # prepare nicparams dict
121
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
122
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
123

    
124
  # prepare ndparams dict
125
  if opts.ndparams is None:
126
    ndparams = dict(constants.NDC_DEFAULTS)
127
  else:
128
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
129
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
130

    
131
  # prepare hvparams dict
132
  for hv in constants.HYPER_TYPES:
133
    if hv not in hvparams:
134
      hvparams[hv] = {}
135
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
136
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
137

    
138
  # prepare diskparams dict
139
  for templ in constants.DISK_TEMPLATES:
140
    if templ not in diskparams:
141
      diskparams[templ] = {}
142
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
143
                                         diskparams[templ])
144
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
145

    
146
  # prepare ipolicy dict
147
  ipolicy_raw = CreateIPolicyFromOpts(
148
    ispecs_mem_size=opts.ispecs_mem_size,
149
    ispecs_cpu_count=opts.ispecs_cpu_count,
150
    ispecs_disk_count=opts.ispecs_disk_count,
151
    ispecs_disk_size=opts.ispecs_disk_size,
152
    ispecs_nic_count=opts.ispecs_nic_count,
153
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
154
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
155
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
156
    fill_all=True)
157
  ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_raw)
158

    
159
  if opts.candidate_pool_size is None:
160
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
161

    
162
  if opts.mac_prefix is None:
163
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
164

    
165
  uid_pool = opts.uid_pool
166
  if uid_pool is not None:
167
    uid_pool = uidpool.ParseUidPool(uid_pool)
168

    
169
  if opts.prealloc_wipe_disks is None:
170
    opts.prealloc_wipe_disks = False
171

    
172
  external_ip_setup_script = opts.use_external_mip_script
173
  if external_ip_setup_script is None:
174
    external_ip_setup_script = False
175

    
176
  try:
177
    primary_ip_version = int(opts.primary_ip_version)
178
  except (ValueError, TypeError), err:
179
    ToStderr("Invalid primary ip version value: %s" % str(err))
180
    return 1
181

    
182
  master_netmask = opts.master_netmask
183
  try:
184
    if master_netmask is not None:
185
      master_netmask = int(master_netmask)
186
  except (ValueError, TypeError), err:
187
    ToStderr("Invalid master netmask value: %s" % str(err))
188
    return 1
189

    
190
  if opts.disk_state:
191
    disk_state = utils.FlatToDict(opts.disk_state)
192
  else:
193
    disk_state = {}
194

    
195
  hv_state = dict(opts.hv_state)
196

    
197
  bootstrap.InitCluster(cluster_name=args[0],
198
                        secondary_ip=opts.secondary_ip,
199
                        vg_name=vg_name,
200
                        mac_prefix=opts.mac_prefix,
201
                        master_netmask=master_netmask,
202
                        master_netdev=master_netdev,
203
                        file_storage_dir=opts.file_storage_dir,
204
                        shared_file_storage_dir=opts.shared_file_storage_dir,
205
                        enabled_hypervisors=hvlist,
206
                        hvparams=hvparams,
207
                        beparams=beparams,
208
                        nicparams=nicparams,
209
                        ndparams=ndparams,
210
                        diskparams=diskparams,
211
                        ipolicy=ipolicy,
212
                        candidate_pool_size=opts.candidate_pool_size,
213
                        modify_etc_hosts=opts.modify_etc_hosts,
214
                        modify_ssh_setup=opts.modify_ssh_setup,
215
                        maintain_node_health=opts.maintain_node_health,
216
                        drbd_helper=drbd_helper,
217
                        uid_pool=uid_pool,
218
                        default_iallocator=opts.default_iallocator,
219
                        primary_ip_version=primary_ip_version,
220
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
221
                        use_external_mip_script=external_ip_setup_script,
222
                        hv_state=hv_state,
223
                        disk_state=disk_state,
224
                        )
225
  op = opcodes.OpClusterPostInit()
226
  SubmitOpCode(op, opts=opts)
227
  return 0
228

    
229

    
230
@UsesRPC
231
def DestroyCluster(opts, args):
232
  """Destroy the cluster.
233

234
  @param opts: the command line options selected by the user
235
  @type args: list
236
  @param args: should be an empty list
237
  @rtype: int
238
  @return: the desired exit code
239

240
  """
241
  if not opts.yes_do_it:
242
    ToStderr("Destroying a cluster is irreversible. If you really want"
243
             " destroy this cluster, supply the --yes-do-it option.")
244
    return 1
245

    
246
  op = opcodes.OpClusterDestroy()
247
  master = SubmitOpCode(op, opts=opts)
248
  # if we reached this, the opcode didn't fail; we can proceed to
249
  # shutdown all the daemons
250
  bootstrap.FinalizeClusterDestroy(master)
251
  return 0
252

    
253

    
254
def RenameCluster(opts, args):
255
  """Rename the cluster.
256

257
  @param opts: the command line options selected by the user
258
  @type args: list
259
  @param args: should contain only one element, the new cluster name
260
  @rtype: int
261
  @return: the desired exit code
262

263
  """
264
  cl = GetClient()
265

    
266
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
267

    
268
  new_name = args[0]
269
  if not opts.force:
270
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
271
                " connected over the network to the cluster name, the"
272
                " operation is very dangerous as the IP address will be"
273
                " removed from the node and the change may not go through."
274
                " Continue?") % (cluster_name, new_name)
275
    if not AskUser(usertext):
276
      return 1
277

    
278
  op = opcodes.OpClusterRename(name=new_name)
279
  result = SubmitOpCode(op, opts=opts, cl=cl)
280

    
281
  if result:
282
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
283

    
284
  return 0
285

    
286

    
287
def ActivateMasterIp(opts, args):
288
  """Activates the master IP.
289

290
  """
291
  op = opcodes.OpClusterActivateMasterIp()
292
  SubmitOpCode(op)
293
  return 0
294

    
295

    
296
def DeactivateMasterIp(opts, args):
297
  """Deactivates the master IP.
298

299
  """
300
  if not opts.confirm:
301
    usertext = ("This will disable the master IP. All the open connections to"
302
                " the master IP will be closed. To reach the master you will"
303
                " need to use its node IP."
304
                " Continue?")
305
    if not AskUser(usertext):
306
      return 1
307

    
308
  op = opcodes.OpClusterDeactivateMasterIp()
309
  SubmitOpCode(op)
310
  return 0
311

    
312

    
313
def RedistributeConfig(opts, args):
314
  """Forces push of the cluster configuration.
315

316
  @param opts: the command line options selected by the user
317
  @type args: list
318
  @param args: empty list
319
  @rtype: int
320
  @return: the desired exit code
321

322
  """
323
  op = opcodes.OpClusterRedistConf()
324
  SubmitOrSend(op, opts)
325
  return 0
326

    
327

    
328
def ShowClusterVersion(opts, args):
329
  """Write version of ganeti software to the standard output.
330

331
  @param opts: the command line options selected by the user
332
  @type args: list
333
  @param args: should be an empty list
334
  @rtype: int
335
  @return: the desired exit code
336

337
  """
338
  cl = GetClient(query=True)
339
  result = cl.QueryClusterInfo()
340
  ToStdout("Software version: %s", result["software_version"])
341
  ToStdout("Internode protocol: %s", result["protocol_version"])
342
  ToStdout("Configuration format: %s", result["config_version"])
343
  ToStdout("OS api version: %s", result["os_api_version"])
344
  ToStdout("Export interface: %s", result["export_version"])
345
  return 0
346

    
347

    
348
def ShowClusterMaster(opts, args):
349
  """Write name of master node to the standard output.
350

351
  @param opts: the command line options selected by the user
352
  @type args: list
353
  @param args: should be an empty list
354
  @rtype: int
355
  @return: the desired exit code
356

357
  """
358
  master = bootstrap.GetMaster()
359
  ToStdout(master)
360
  return 0
361

    
362

    
363
def _PrintGroupedParams(paramsdict, level=1, roman=False):
364
  """Print Grouped parameters (be, nic, disk) by group.
365

366
  @type paramsdict: dict of dicts
367
  @param paramsdict: {group: {param: value, ...}, ...}
368
  @type level: int
369
  @param level: Level of indention
370

371
  """
372
  indent = "  " * level
373
  for item, val in sorted(paramsdict.items()):
374
    if isinstance(val, dict):
375
      ToStdout("%s- %s:", indent, item)
376
      _PrintGroupedParams(val, level=level + 1, roman=roman)
377
    elif roman and isinstance(val, int):
378
      ToStdout("%s  %s: %s", indent, item, compat.TryToRoman(val))
379
    else:
380
      ToStdout("%s  %s: %s", indent, item, val)
381

    
382

    
383
def ShowClusterConfig(opts, args):
384
  """Shows cluster information.
385

386
  @param opts: the command line options selected by the user
387
  @type args: list
388
  @param args: should be an empty list
389
  @rtype: int
390
  @return: the desired exit code
391

392
  """
393
  cl = GetClient(query=True)
394
  result = cl.QueryClusterInfo()
395

    
396
  ToStdout("Cluster name: %s", result["name"])
397
  ToStdout("Cluster UUID: %s", result["uuid"])
398

    
399
  ToStdout("Creation time: %s", utils.FormatTime(result["ctime"]))
400
  ToStdout("Modification time: %s", utils.FormatTime(result["mtime"]))
401

    
402
  ToStdout("Master node: %s", result["master"])
403

    
404
  ToStdout("Architecture (this node): %s (%s)",
405
           result["architecture"][0], result["architecture"][1])
406

    
407
  if result["tags"]:
408
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
409
  else:
410
    tags = "(none)"
411

    
412
  ToStdout("Tags: %s", tags)
413

    
414
  ToStdout("Default hypervisor: %s", result["default_hypervisor"])
415
  ToStdout("Enabled hypervisors: %s",
416
           utils.CommaJoin(result["enabled_hypervisors"]))
417

    
418
  ToStdout("Hypervisor parameters:")
419
  _PrintGroupedParams(result["hvparams"])
420

    
421
  ToStdout("OS-specific hypervisor parameters:")
422
  _PrintGroupedParams(result["os_hvp"])
423

    
424
  ToStdout("OS parameters:")
425
  _PrintGroupedParams(result["osparams"])
426

    
427
  ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"]))
428
  ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"]))
429

    
430
  ToStdout("Cluster parameters:")
431
  ToStdout("  - candidate pool size: %s",
432
            compat.TryToRoman(result["candidate_pool_size"],
433
                              convert=opts.roman_integers))
434
  ToStdout("  - master netdev: %s", result["master_netdev"])
435
  ToStdout("  - master netmask: %s", result["master_netmask"])
436
  ToStdout("  - use external master IP address setup script: %s",
437
           result["use_external_mip_script"])
438
  ToStdout("  - lvm volume group: %s", result["volume_group_name"])
439
  if result["reserved_lvs"]:
440
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
441
  else:
442
    reserved_lvs = "(none)"
443
  ToStdout("  - lvm reserved volumes: %s", reserved_lvs)
444
  ToStdout("  - drbd usermode helper: %s", result["drbd_usermode_helper"])
445
  ToStdout("  - file storage path: %s", result["file_storage_dir"])
446
  ToStdout("  - shared file storage path: %s",
447
           result["shared_file_storage_dir"])
448
  ToStdout("  - maintenance of node health: %s",
449
           result["maintain_node_health"])
450
  ToStdout("  - uid pool: %s", uidpool.FormatUidPool(result["uid_pool"]))
451
  ToStdout("  - default instance allocator: %s", result["default_iallocator"])
452
  ToStdout("  - primary ip version: %d", result["primary_ip_version"])
453
  ToStdout("  - preallocation wipe disks: %s", result["prealloc_wipe_disks"])
454
  ToStdout("  - OS search path: %s", utils.CommaJoin(pathutils.OS_SEARCH_PATH))
455

    
456
  ToStdout("Default node parameters:")
457
  _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers)
458

    
459
  ToStdout("Default instance parameters:")
460
  _PrintGroupedParams(result["beparams"], roman=opts.roman_integers)
461

    
462
  ToStdout("Default nic parameters:")
463
  _PrintGroupedParams(result["nicparams"], roman=opts.roman_integers)
464

    
465
  ToStdout("Default disk parameters:")
466
  _PrintGroupedParams(result["diskparams"], roman=opts.roman_integers)
467

    
468
  ToStdout("Instance policy - limits for instances:")
469
  for key in constants.IPOLICY_ISPECS:
470
    ToStdout("  - %s", key)
471
    _PrintGroupedParams(result["ipolicy"][key], roman=opts.roman_integers)
472
  ToStdout("  - enabled disk templates: %s",
473
           utils.CommaJoin(result["ipolicy"][constants.IPOLICY_DTS]))
474
  for key in constants.IPOLICY_PARAMETERS:
475
    ToStdout("  - %s: %s", key, result["ipolicy"][key])
476

    
477
  return 0
478

    
479

    
480
def ClusterCopyFile(opts, args):
481
  """Copy a file from master to some nodes.
482

483
  @param opts: the command line options selected by the user
484
  @type args: list
485
  @param args: should contain only one element, the path of
486
      the file to be copied
487
  @rtype: int
488
  @return: the desired exit code
489

490
  """
491
  filename = args[0]
492
  if not os.path.exists(filename):
493
    raise errors.OpPrereqError("No such filename '%s'" % filename,
494
                               errors.ECODE_INVAL)
495

    
496
  cl = GetClient()
497

    
498
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
499

    
500
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
501
                           secondary_ips=opts.use_replication_network,
502
                           nodegroup=opts.nodegroup)
503

    
504
  srun = ssh.SshRunner(cluster_name)
505
  for node in results:
506
    if not srun.CopyFileToNode(node, filename):
507
      ToStderr("Copy of file %s to node %s failed", filename, node)
508

    
509
  return 0
510

    
511

    
512
def RunClusterCommand(opts, args):
513
  """Run a command on some nodes.
514

515
  @param opts: the command line options selected by the user
516
  @type args: list
517
  @param args: should contain the command to be run and its arguments
518
  @rtype: int
519
  @return: the desired exit code
520

521
  """
522
  cl = GetClient()
523

    
524
  command = " ".join(args)
525

    
526
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
527

    
528
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
529
                                                    "master_node"])
530

    
531
  srun = ssh.SshRunner(cluster_name=cluster_name)
532

    
533
  # Make sure master node is at list end
534
  if master_node in nodes:
535
    nodes.remove(master_node)
536
    nodes.append(master_node)
537

    
538
  for name in nodes:
539
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
540

    
541
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
542
      # Do not output anything for successful commands
543
      continue
544

    
545
    ToStdout("------------------------------------------------")
546
    if opts.show_machine_names:
547
      for line in result.output.splitlines():
548
        ToStdout("%s: %s", name, line)
549
    else:
550
      ToStdout("node: %s", name)
551
      ToStdout("%s", result.output)
552
    ToStdout("return code = %s", result.exit_code)
553

    
554
  return 0
555

    
556

    
557
def VerifyCluster(opts, args):
558
  """Verify integrity of cluster, performing various test on nodes.
559

560
  @param opts: the command line options selected by the user
561
  @type args: list
562
  @param args: should be an empty list
563
  @rtype: int
564
  @return: the desired exit code
565

566
  """
567
  skip_checks = []
568

    
569
  if opts.skip_nplusone_mem:
570
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
571

    
572
  cl = GetClient()
573

    
574
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
575
                               error_codes=opts.error_codes,
576
                               debug_simulate_errors=opts.simulate_errors,
577
                               skip_checks=skip_checks,
578
                               ignore_errors=opts.ignore_errors,
579
                               group_name=opts.nodegroup)
580
  result = SubmitOpCode(op, cl=cl, opts=opts)
581

    
582
  # Keep track of submitted jobs
583
  jex = JobExecutor(cl=cl, opts=opts)
584

    
585
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
586
    jex.AddJobId(None, status, job_id)
587

    
588
  results = jex.GetResults()
589

    
590
  (bad_jobs, bad_results) = \
591
    map(len,
592
        # Convert iterators to lists
593
        map(list,
594
            # Count errors
595
            map(compat.partial(itertools.ifilterfalse, bool),
596
                # Convert result to booleans in a tuple
597
                zip(*((job_success, len(op_results) == 1 and op_results[0])
598
                      for (job_success, op_results) in results)))))
599

    
600
  if bad_jobs == 0 and bad_results == 0:
601
    rcode = constants.EXIT_SUCCESS
602
  else:
603
    rcode = constants.EXIT_FAILURE
604
    if bad_jobs > 0:
605
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
606

    
607
  return rcode
608

    
609

    
610
def VerifyDisks(opts, args):
611
  """Verify integrity of cluster disks.
612

613
  @param opts: the command line options selected by the user
614
  @type args: list
615
  @param args: should be an empty list
616
  @rtype: int
617
  @return: the desired exit code
618

619
  """
620
  cl = GetClient()
621

    
622
  op = opcodes.OpClusterVerifyDisks()
623

    
624
  result = SubmitOpCode(op, cl=cl, opts=opts)
625

    
626
  # Keep track of submitted jobs
627
  jex = JobExecutor(cl=cl, opts=opts)
628

    
629
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
630
    jex.AddJobId(None, status, job_id)
631

    
632
  retcode = constants.EXIT_SUCCESS
633

    
634
  for (status, result) in jex.GetResults():
635
    if not status:
636
      ToStdout("Job failed: %s", result)
637
      continue
638

    
639
    ((bad_nodes, instances, missing), ) = result
640

    
641
    for node, text in bad_nodes.items():
642
      ToStdout("Error gathering data on node %s: %s",
643
               node, utils.SafeEncode(text[-400:]))
644
      retcode = constants.EXIT_FAILURE
645
      ToStdout("You need to fix these nodes first before fixing instances")
646

    
647
    for iname in instances:
648
      if iname in missing:
649
        continue
650
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
651
      try:
652
        ToStdout("Activating disks for instance '%s'", iname)
653
        SubmitOpCode(op, opts=opts, cl=cl)
654
      except errors.GenericError, err:
655
        nret, msg = FormatError(err)
656
        retcode |= nret
657
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
658

    
659
    if missing:
660
      for iname, ival in missing.iteritems():
661
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
662
        if all_missing:
663
          ToStdout("Instance %s cannot be verified as it lives on"
664
                   " broken nodes", iname)
665
        else:
666
          ToStdout("Instance %s has missing logical volumes:", iname)
667
          ival.sort()
668
          for node, vol in ival:
669
            if node in bad_nodes:
670
              ToStdout("\tbroken node %s /dev/%s", node, vol)
671
            else:
672
              ToStdout("\t%s /dev/%s", node, vol)
673

    
674
      ToStdout("You need to replace or recreate disks for all the above"
675
               " instances if this message persists after fixing broken nodes.")
676
      retcode = constants.EXIT_FAILURE
677
    elif not instances:
678
      ToStdout("No disks need to be activated.")
679

    
680
  return retcode
681

    
682

    
683
def RepairDiskSizes(opts, args):
684
  """Verify sizes of cluster disks.
685

686
  @param opts: the command line options selected by the user
687
  @type args: list
688
  @param args: optional list of instances to restrict check to
689
  @rtype: int
690
  @return: the desired exit code
691

692
  """
693
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
694
  SubmitOpCode(op, opts=opts)
695

    
696

    
697
@UsesRPC
698
def MasterFailover(opts, args):
699
  """Failover the master node.
700

701
  This command, when run on a non-master node, will cause the current
702
  master to cease being master, and the non-master to become new
703
  master.
704

705
  @param opts: the command line options selected by the user
706
  @type args: list
707
  @param args: should be an empty list
708
  @rtype: int
709
  @return: the desired exit code
710

711
  """
712
  if opts.no_voting and not opts.yes_do_it:
713
    usertext = ("This will perform the failover even if most other nodes"
714
                " are down, or if this node is outdated. This is dangerous"
715
                " as it can lead to a non-consistent cluster. Check the"
716
                " gnt-cluster(8) man page before proceeding. Continue?")
717
    if not AskUser(usertext):
718
      return 1
719

    
720
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
721

    
722

    
723
def MasterPing(opts, args):
724
  """Checks if the master is alive.
725

726
  @param opts: the command line options selected by the user
727
  @type args: list
728
  @param args: should be an empty list
729
  @rtype: int
730
  @return: the desired exit code
731

732
  """
733
  try:
734
    cl = GetClient()
735
    cl.QueryClusterInfo()
736
    return 0
737
  except Exception: # pylint: disable=W0703
738
    return 1
739

    
740

    
741
def SearchTags(opts, args):
742
  """Searches the tags on all the cluster.
743

744
  @param opts: the command line options selected by the user
745
  @type args: list
746
  @param args: should contain only one element, the tag pattern
747
  @rtype: int
748
  @return: the desired exit code
749

750
  """
751
  op = opcodes.OpTagsSearch(pattern=args[0])
752
  result = SubmitOpCode(op, opts=opts)
753
  if not result:
754
    return 1
755
  result = list(result)
756
  result.sort()
757
  for path, tag in result:
758
    ToStdout("%s %s", path, tag)
759

    
760

    
761
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
762
  """Reads and verifies an X509 certificate.
763

764
  @type cert_filename: string
765
  @param cert_filename: the path of the file containing the certificate to
766
                        verify encoded in PEM format
767
  @type verify_private_key: bool
768
  @param verify_private_key: whether to verify the private key in addition to
769
                             the public certificate
770
  @rtype: string
771
  @return: a string containing the PEM-encoded certificate.
772

773
  """
774
  try:
775
    pem = utils.ReadFile(cert_filename)
776
  except IOError, err:
777
    raise errors.X509CertError(cert_filename,
778
                               "Unable to read certificate: %s" % str(err))
779

    
780
  try:
781
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
782
  except Exception, err:
783
    raise errors.X509CertError(cert_filename,
784
                               "Unable to load certificate: %s" % str(err))
785

    
786
  if verify_private_key:
787
    try:
788
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
789
    except Exception, err:
790
      raise errors.X509CertError(cert_filename,
791
                                 "Unable to load private key: %s" % str(err))
792

    
793
  return pem
794

    
795

    
796
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
797
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
798
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
799
                 cds_filename, force):
800
  """Renews cluster certificates, keys and secrets.
801

802
  @type new_cluster_cert: bool
803
  @param new_cluster_cert: Whether to generate a new cluster certificate
804
  @type new_rapi_cert: bool
805
  @param new_rapi_cert: Whether to generate a new RAPI certificate
806
  @type rapi_cert_filename: string
807
  @param rapi_cert_filename: Path to file containing new RAPI certificate
808
  @type new_spice_cert: bool
809
  @param new_spice_cert: Whether to generate a new SPICE certificate
810
  @type spice_cert_filename: string
811
  @param spice_cert_filename: Path to file containing new SPICE certificate
812
  @type spice_cacert_filename: string
813
  @param spice_cacert_filename: Path to file containing the certificate of the
814
                                CA that signed the SPICE certificate
815
  @type new_confd_hmac_key: bool
816
  @param new_confd_hmac_key: Whether to generate a new HMAC key
817
  @type new_cds: bool
818
  @param new_cds: Whether to generate a new cluster domain secret
819
  @type cds_filename: string
820
  @param cds_filename: Path to file containing new cluster domain secret
821
  @type force: bool
822
  @param force: Whether to ask user for confirmation
823

824
  """
825
  if new_rapi_cert and rapi_cert_filename:
826
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
827
             " options can be specified at the same time.")
828
    return 1
829

    
830
  if new_cds and cds_filename:
831
    ToStderr("Only one of the --new-cluster-domain-secret and"
832
             " --cluster-domain-secret options can be specified at"
833
             " the same time.")
834
    return 1
835

    
836
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
837
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
838
             " and --spice-ca-certificate must not be used.")
839
    return 1
840

    
841
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
842
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
843
             " specified.")
844
    return 1
845

    
846
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
847
  try:
848
    if rapi_cert_filename:
849
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
850
    if spice_cert_filename:
851
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
852
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
853
  except errors.X509CertError, err:
854
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
855
    return 1
856

    
857
  if cds_filename:
858
    try:
859
      cds = utils.ReadFile(cds_filename)
860
    except Exception, err: # pylint: disable=W0703
861
      ToStderr("Can't load new cluster domain secret from %s: %s" %
862
               (cds_filename, str(err)))
863
      return 1
864
  else:
865
    cds = None
866

    
867
  if not force:
868
    usertext = ("This requires all daemons on all nodes to be restarted and"
869
                " may take some time. Continue?")
870
    if not AskUser(usertext):
871
      return 1
872

    
873
  def _RenewCryptoInner(ctx):
874
    ctx.feedback_fn("Updating certificates and keys")
875
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
876
                                    new_rapi_cert,
877
                                    new_spice_cert,
878
                                    new_confd_hmac_key,
879
                                    new_cds,
880
                                    rapi_cert_pem=rapi_cert_pem,
881
                                    spice_cert_pem=spice_cert_pem,
882
                                    spice_cacert_pem=spice_cacert_pem,
883
                                    cds=cds)
884

    
885
    files_to_copy = []
886

    
887
    if new_cluster_cert:
888
      files_to_copy.append(pathutils.NODED_CERT_FILE)
889

    
890
    if new_rapi_cert or rapi_cert_pem:
891
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
892

    
893
    if new_spice_cert or spice_cert_pem:
894
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
895
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
896

    
897
    if new_confd_hmac_key:
898
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
899

    
900
    if new_cds or cds:
901
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
902

    
903
    if files_to_copy:
904
      for node_name in ctx.nonmaster_nodes:
905
        ctx.feedback_fn("Copying %s to %s" %
906
                        (", ".join(files_to_copy), node_name))
907
        for file_name in files_to_copy:
908
          ctx.ssh.CopyFileToNode(node_name, file_name)
909

    
910
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
911

    
912
  ToStdout("All requested certificates and keys have been replaced."
913
           " Running \"gnt-cluster verify\" now is recommended.")
914

    
915
  return 0
916

    
917

    
918
def RenewCrypto(opts, args):
919
  """Renews cluster certificates, keys and secrets.
920

921
  """
922
  return _RenewCrypto(opts.new_cluster_cert,
923
                      opts.new_rapi_cert,
924
                      opts.rapi_cert,
925
                      opts.new_spice_cert,
926
                      opts.spice_cert,
927
                      opts.spice_cacert,
928
                      opts.new_confd_hmac_key,
929
                      opts.new_cluster_domain_secret,
930
                      opts.cluster_domain_secret,
931
                      opts.force)
932

    
933

    
934
def SetClusterParams(opts, args):
935
  """Modify the cluster.
936

937
  @param opts: the command line options selected by the user
938
  @type args: list
939
  @param args: should be an empty list
940
  @rtype: int
941
  @return: the desired exit code
942

943
  """
944
  if not (not opts.lvm_storage or opts.vg_name or
945
          not opts.drbd_storage or opts.drbd_helper or
946
          opts.enabled_hypervisors or opts.hvparams or
947
          opts.beparams or opts.nicparams or
948
          opts.ndparams or opts.diskparams or
949
          opts.candidate_pool_size is not None or
950
          opts.uid_pool is not None or
951
          opts.maintain_node_health is not None or
952
          opts.add_uids is not None or
953
          opts.remove_uids is not None or
954
          opts.default_iallocator is not None or
955
          opts.reserved_lvs is not None or
956
          opts.master_netdev is not None or
957
          opts.master_netmask is not None or
958
          opts.use_external_mip_script is not None or
959
          opts.prealloc_wipe_disks is not None or
960
          opts.hv_state or
961
          opts.disk_state or
962
          opts.ispecs_mem_size or
963
          opts.ispecs_cpu_count or
964
          opts.ispecs_disk_count or
965
          opts.ispecs_disk_size or
966
          opts.ispecs_nic_count or
967
          opts.ipolicy_disk_templates is not None or
968
          opts.ipolicy_vcpu_ratio is not None or
969
          opts.ipolicy_spindle_ratio is not None):
970
    ToStderr("Please give at least one of the parameters.")
971
    return 1
972

    
973
  vg_name = opts.vg_name
974
  if not opts.lvm_storage and opts.vg_name:
975
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
976
    return 1
977

    
978
  if not opts.lvm_storage:
979
    vg_name = ""
980

    
981
  drbd_helper = opts.drbd_helper
982
  if not opts.drbd_storage and opts.drbd_helper:
983
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
984
    return 1
985

    
986
  if not opts.drbd_storage:
987
    drbd_helper = ""
988

    
989
  hvlist = opts.enabled_hypervisors
990
  if hvlist is not None:
991
    hvlist = hvlist.split(",")
992

    
993
  # a list of (name, dict) we can pass directly to dict() (or [])
994
  hvparams = dict(opts.hvparams)
995
  for hv_params in hvparams.values():
996
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
997

    
998
  diskparams = dict(opts.diskparams)
999

    
1000
  for dt_params in diskparams.values():
1001
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1002

    
1003
  beparams = opts.beparams
1004
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1005

    
1006
  nicparams = opts.nicparams
1007
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1008

    
1009
  ndparams = opts.ndparams
1010
  if ndparams is not None:
1011
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1012

    
1013
  ipolicy = CreateIPolicyFromOpts(
1014
    ispecs_mem_size=opts.ispecs_mem_size,
1015
    ispecs_cpu_count=opts.ispecs_cpu_count,
1016
    ispecs_disk_count=opts.ispecs_disk_count,
1017
    ispecs_disk_size=opts.ispecs_disk_size,
1018
    ispecs_nic_count=opts.ispecs_nic_count,
1019
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1020
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1021
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1022
    )
1023

    
1024
  mnh = opts.maintain_node_health
1025

    
1026
  uid_pool = opts.uid_pool
1027
  if uid_pool is not None:
1028
    uid_pool = uidpool.ParseUidPool(uid_pool)
1029

    
1030
  add_uids = opts.add_uids
1031
  if add_uids is not None:
1032
    add_uids = uidpool.ParseUidPool(add_uids)
1033

    
1034
  remove_uids = opts.remove_uids
1035
  if remove_uids is not None:
1036
    remove_uids = uidpool.ParseUidPool(remove_uids)
1037

    
1038
  if opts.reserved_lvs is not None:
1039
    if opts.reserved_lvs == "":
1040
      opts.reserved_lvs = []
1041
    else:
1042
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1043

    
1044
  if opts.master_netmask is not None:
1045
    try:
1046
      opts.master_netmask = int(opts.master_netmask)
1047
    except ValueError:
1048
      ToStderr("The --master-netmask option expects an int parameter.")
1049
      return 1
1050

    
1051
  ext_ip_script = opts.use_external_mip_script
1052

    
1053
  if opts.disk_state:
1054
    disk_state = utils.FlatToDict(opts.disk_state)
1055
  else:
1056
    disk_state = {}
1057

    
1058
  hv_state = dict(opts.hv_state)
1059

    
1060
  op = opcodes.OpClusterSetParams(vg_name=vg_name,
1061
                                  drbd_helper=drbd_helper,
1062
                                  enabled_hypervisors=hvlist,
1063
                                  hvparams=hvparams,
1064
                                  os_hvp=None,
1065
                                  beparams=beparams,
1066
                                  nicparams=nicparams,
1067
                                  ndparams=ndparams,
1068
                                  diskparams=diskparams,
1069
                                  ipolicy=ipolicy,
1070
                                  candidate_pool_size=opts.candidate_pool_size,
1071
                                  maintain_node_health=mnh,
1072
                                  uid_pool=uid_pool,
1073
                                  add_uids=add_uids,
1074
                                  remove_uids=remove_uids,
1075
                                  default_iallocator=opts.default_iallocator,
1076
                                  prealloc_wipe_disks=opts.prealloc_wipe_disks,
1077
                                  master_netdev=opts.master_netdev,
1078
                                  master_netmask=opts.master_netmask,
1079
                                  reserved_lvs=opts.reserved_lvs,
1080
                                  use_external_mip_script=ext_ip_script,
1081
                                  hv_state=hv_state,
1082
                                  disk_state=disk_state,
1083
                                  )
1084
  SubmitOrSend(op, opts)
1085
  return 0
1086

    
1087

    
1088
def QueueOps(opts, args):
1089
  """Queue operations.
1090

1091
  @param opts: the command line options selected by the user
1092
  @type args: list
1093
  @param args: should contain only one element, the subcommand
1094
  @rtype: int
1095
  @return: the desired exit code
1096

1097
  """
1098
  command = args[0]
1099
  client = GetClient()
1100
  if command in ("drain", "undrain"):
1101
    drain_flag = command == "drain"
1102
    client.SetQueueDrainFlag(drain_flag)
1103
  elif command == "info":
1104
    result = client.QueryConfigValues(["drain_flag"])
1105
    if result[0]:
1106
      val = "set"
1107
    else:
1108
      val = "unset"
1109
    ToStdout("The drain flag is %s" % val)
1110
  else:
1111
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1112
                               errors.ECODE_INVAL)
1113

    
1114
  return 0
1115

    
1116

    
1117
def _ShowWatcherPause(until):
1118
  if until is None or until < time.time():
1119
    ToStdout("The watcher is not paused.")
1120
  else:
1121
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1122

    
1123

    
1124
def WatcherOps(opts, args):
1125
  """Watcher operations.
1126

1127
  @param opts: the command line options selected by the user
1128
  @type args: list
1129
  @param args: should contain only one element, the subcommand
1130
  @rtype: int
1131
  @return: the desired exit code
1132

1133
  """
1134
  command = args[0]
1135
  client = GetClient()
1136

    
1137
  if command == "continue":
1138
    client.SetWatcherPause(None)
1139
    ToStdout("The watcher is no longer paused.")
1140

    
1141
  elif command == "pause":
1142
    if len(args) < 2:
1143
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1144

    
1145
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1146
    _ShowWatcherPause(result)
1147

    
1148
  elif command == "info":
1149
    result = client.QueryConfigValues(["watcher_pause"])
1150
    _ShowWatcherPause(result[0])
1151

    
1152
  else:
1153
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1154
                               errors.ECODE_INVAL)
1155

    
1156
  return 0
1157

    
1158

    
1159
def _OobPower(opts, node_list, power):
1160
  """Puts the node in the list to desired power state.
1161

1162
  @param opts: The command line options selected by the user
1163
  @param node_list: The list of nodes to operate on
1164
  @param power: True if they should be powered on, False otherwise
1165
  @return: The success of the operation (none failed)
1166

1167
  """
1168
  if power:
1169
    command = constants.OOB_POWER_ON
1170
  else:
1171
    command = constants.OOB_POWER_OFF
1172

    
1173
  op = opcodes.OpOobCommand(node_names=node_list,
1174
                            command=command,
1175
                            ignore_status=True,
1176
                            timeout=opts.oob_timeout,
1177
                            power_delay=opts.power_delay)
1178
  result = SubmitOpCode(op, opts=opts)
1179
  errs = 0
1180
  for node_result in result:
1181
    (node_tuple, data_tuple) = node_result
1182
    (_, node_name) = node_tuple
1183
    (data_status, _) = data_tuple
1184
    if data_status != constants.RS_NORMAL:
1185
      assert data_status != constants.RS_UNAVAIL
1186
      errs += 1
1187
      ToStderr("There was a problem changing power for %s, please investigate",
1188
               node_name)
1189

    
1190
  if errs > 0:
1191
    return False
1192

    
1193
  return True
1194

    
1195

    
1196
def _InstanceStart(opts, inst_list, start, no_remember=False):
1197
  """Puts the instances in the list to desired state.
1198

1199
  @param opts: The command line options selected by the user
1200
  @param inst_list: The list of instances to operate on
1201
  @param start: True if they should be started, False for shutdown
1202
  @param no_remember: If the instance state should be remembered
1203
  @return: The success of the operation (none failed)
1204

1205
  """
1206
  if start:
1207
    opcls = opcodes.OpInstanceStartup
1208
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1209
  else:
1210
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1211
                           timeout=opts.shutdown_timeout,
1212
                           no_remember=no_remember)
1213
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1214

    
1215
  jex = JobExecutor(opts=opts)
1216

    
1217
  for inst in inst_list:
1218
    ToStdout("Submit %s of instance %s", text_submit, inst)
1219
    op = opcls(instance_name=inst)
1220
    jex.QueueJob(inst, op)
1221

    
1222
  results = jex.GetResults()
1223
  bad_cnt = len([1 for (success, _) in results if not success])
1224

    
1225
  if bad_cnt == 0:
1226
    ToStdout("All instances have been %s successfully", text_success)
1227
  else:
1228
    ToStderr("There were errors while %s instances:\n"
1229
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1230
             len(results))
1231
    return False
1232

    
1233
  return True
1234

    
1235

    
1236
class _RunWhenNodesReachableHelper:
1237
  """Helper class to make shared internal state sharing easier.
1238

1239
  @ivar success: Indicates if all action_cb calls were successful
1240

1241
  """
1242
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1243
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1244
    """Init the object.
1245

1246
    @param node_list: The list of nodes to be reachable
1247
    @param action_cb: Callback called when a new host is reachable
1248
    @type node2ip: dict
1249
    @param node2ip: Node to ip mapping
1250
    @param port: The port to use for the TCP ping
1251
    @param feedback_fn: The function used for feedback
1252
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1253
    @param _sleep_fn: Function to sleep (for unittest use only)
1254

1255
    """
1256
    self.down = set(node_list)
1257
    self.up = set()
1258
    self.node2ip = node2ip
1259
    self.success = True
1260
    self.action_cb = action_cb
1261
    self.port = port
1262
    self.feedback_fn = feedback_fn
1263
    self._ping_fn = _ping_fn
1264
    self._sleep_fn = _sleep_fn
1265

    
1266
  def __call__(self):
1267
    """When called we run action_cb.
1268

1269
    @raises utils.RetryAgain: When there are still down nodes
1270

1271
    """
1272
    if not self.action_cb(self.up):
1273
      self.success = False
1274

    
1275
    if self.down:
1276
      raise utils.RetryAgain()
1277
    else:
1278
      return self.success
1279

    
1280
  def Wait(self, secs):
1281
    """Checks if a host is up or waits remaining seconds.
1282

1283
    @param secs: The secs remaining
1284

1285
    """
1286
    start = time.time()
1287
    for node in self.down:
1288
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1289
                       live_port_needed=True):
1290
        self.feedback_fn("Node %s became available" % node)
1291
        self.up.add(node)
1292
        self.down -= self.up
1293
        # If we have a node available there is the possibility to run the
1294
        # action callback successfully, therefore we don't wait and return
1295
        return
1296

    
1297
    self._sleep_fn(max(0.0, start + secs - time.time()))
1298

    
1299

    
1300
def _RunWhenNodesReachable(node_list, action_cb, interval):
1301
  """Run action_cb when nodes become reachable.
1302

1303
  @param node_list: The list of nodes to be reachable
1304
  @param action_cb: Callback called when a new host is reachable
1305
  @param interval: The earliest time to retry
1306

1307
  """
1308
  client = GetClient()
1309
  cluster_info = client.QueryClusterInfo()
1310
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1311
    family = netutils.IPAddress.family
1312
  else:
1313
    family = netutils.IP6Address.family
1314

    
1315
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1316
                 for node in node_list)
1317

    
1318
  port = netutils.GetDaemonPort(constants.NODED)
1319
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1320
                                        ToStdout)
1321

    
1322
  try:
1323
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1324
                       wait_fn=helper.Wait)
1325
  except utils.RetryTimeout:
1326
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1327
             " again:\n  - %s", "  - ".join(helper.down))
1328
    return False
1329

    
1330

    
1331
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1332
                          _instance_start_fn=_InstanceStart):
1333
  """Start the instances conditional based on node_states.
1334

1335
  @param opts: The command line options selected by the user
1336
  @param inst_map: A dict of inst -> nodes mapping
1337
  @param nodes_online: A list of nodes online
1338
  @param _instance_start_fn: Callback to start instances (unittest use only)
1339
  @return: Success of the operation on all instances
1340

1341
  """
1342
  start_inst_list = []
1343
  for (inst, nodes) in inst_map.items():
1344
    if not (nodes - nodes_online):
1345
      # All nodes the instance lives on are back online
1346
      start_inst_list.append(inst)
1347

    
1348
  for inst in start_inst_list:
1349
    del inst_map[inst]
1350

    
1351
  if start_inst_list:
1352
    return _instance_start_fn(opts, start_inst_list, True)
1353

    
1354
  return True
1355

    
1356

    
1357
def _EpoOn(opts, full_node_list, node_list, inst_map):
1358
  """Does the actual power on.
1359

1360
  @param opts: The command line options selected by the user
1361
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1362
                         OOB)
1363
  @param node_list: The list of nodes to operate on (all need to support OOB)
1364
  @param inst_map: A dict of inst -> nodes mapping
1365
  @return: The desired exit status
1366

1367
  """
1368
  if node_list and not _OobPower(opts, node_list, False):
1369
    ToStderr("Not all nodes seem to get back up, investigate and start"
1370
             " manually if needed")
1371

    
1372
  # Wait for the nodes to be back up
1373
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1374

    
1375
  ToStdout("Waiting until all nodes are available again")
1376
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1377
    ToStderr("Please investigate and start stopped instances manually")
1378
    return constants.EXIT_FAILURE
1379

    
1380
  return constants.EXIT_SUCCESS
1381

    
1382

    
1383
def _EpoOff(opts, node_list, inst_map):
1384
  """Does the actual power off.
1385

1386
  @param opts: The command line options selected by the user
1387
  @param node_list: The list of nodes to operate on (all need to support OOB)
1388
  @param inst_map: A dict of inst -> nodes mapping
1389
  @return: The desired exit status
1390

1391
  """
1392
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1393
    ToStderr("Please investigate and stop instances manually before continuing")
1394
    return constants.EXIT_FAILURE
1395

    
1396
  if not node_list:
1397
    return constants.EXIT_SUCCESS
1398

    
1399
  if _OobPower(opts, node_list, False):
1400
    return constants.EXIT_SUCCESS
1401
  else:
1402
    return constants.EXIT_FAILURE
1403

    
1404

    
1405
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1406
        _confirm_fn=ConfirmOperation,
1407
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1408
  """EPO operations.
1409

1410
  @param opts: the command line options selected by the user
1411
  @type args: list
1412
  @param args: should contain only one element, the subcommand
1413
  @rtype: int
1414
  @return: the desired exit code
1415

1416
  """
1417
  if opts.groups and opts.show_all:
1418
    _stderr_fn("Only one of --groups or --all are allowed")
1419
    return constants.EXIT_FAILURE
1420
  elif args and opts.show_all:
1421
    _stderr_fn("Arguments in combination with --all are not allowed")
1422
    return constants.EXIT_FAILURE
1423

    
1424
  if cl is None:
1425
    cl = GetClient()
1426

    
1427
  if opts.groups:
1428
    node_query_list = \
1429
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1430
  else:
1431
    node_query_list = args
1432

    
1433
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1434
                                           "sinst_list", "powered", "offline"],
1435
                         False)
1436

    
1437
  all_nodes = map(compat.fst, result)
1438
  node_list = []
1439
  inst_map = {}
1440
  for (node, master, pinsts, sinsts, powered, offline) in result:
1441
    if not offline:
1442
      for inst in (pinsts + sinsts):
1443
        if inst in inst_map:
1444
          if not master:
1445
            inst_map[inst].add(node)
1446
        elif master:
1447
          inst_map[inst] = set()
1448
        else:
1449
          inst_map[inst] = set([node])
1450

    
1451
    if master and opts.on:
1452
      # We ignore the master for turning on the machines, in fact we are
1453
      # already operating on the master at this point :)
1454
      continue
1455
    elif master and not opts.show_all:
1456
      _stderr_fn("%s is the master node, please do a master-failover to another"
1457
                 " node not affected by the EPO or use --all if you intend to"
1458
                 " shutdown the whole cluster", node)
1459
      return constants.EXIT_FAILURE
1460
    elif powered is None:
1461
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1462
                 " handled in a fully automated manner", node)
1463
    elif powered == opts.on:
1464
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1465
    elif not offline or (offline and powered):
1466
      node_list.append(node)
1467

    
1468
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1469
    return constants.EXIT_FAILURE
1470

    
1471
  if opts.on:
1472
    return _on_fn(opts, all_nodes, node_list, inst_map)
1473
  else:
1474
    return _off_fn(opts, node_list, inst_map)
1475

    
1476

    
1477
commands = {
1478
  "init": (
1479
    InitCluster, [ArgHost(min=1, max=1)],
1480
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1481
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1482
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1483
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1484
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1485
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1486
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1487
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + INSTANCE_POLICY_OPTS,
1488
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1489
  "destroy": (
1490
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1491
    "", "Destroy cluster"),
1492
  "rename": (
1493
    RenameCluster, [ArgHost(min=1, max=1)],
1494
    [FORCE_OPT, DRY_RUN_OPT],
1495
    "<new_name>",
1496
    "Renames the cluster"),
1497
  "redist-conf": (
1498
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1499
    "", "Forces a push of the configuration file and ssconf files"
1500
    " to the nodes in the cluster"),
1501
  "verify": (
1502
    VerifyCluster, ARGS_NONE,
1503
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1504
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1505
    "", "Does a check on the cluster configuration"),
1506
  "verify-disks": (
1507
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1508
    "", "Does a check on the cluster disk status"),
1509
  "repair-disk-sizes": (
1510
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1511
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1512
  "master-failover": (
1513
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1514
    "", "Makes the current node the master"),
1515
  "master-ping": (
1516
    MasterPing, ARGS_NONE, [],
1517
    "", "Checks if the master is alive"),
1518
  "version": (
1519
    ShowClusterVersion, ARGS_NONE, [],
1520
    "", "Shows the cluster version"),
1521
  "getmaster": (
1522
    ShowClusterMaster, ARGS_NONE, [],
1523
    "", "Shows the cluster master"),
1524
  "copyfile": (
1525
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1526
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1527
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1528
  "command": (
1529
    RunClusterCommand, [ArgCommand(min=1)],
1530
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1531
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1532
  "info": (
1533
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1534
    "[--roman]", "Show cluster configuration"),
1535
  "list-tags": (
1536
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1537
  "add-tags": (
1538
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1539
    "tag...", "Add tags to the cluster"),
1540
  "remove-tags": (
1541
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1542
    "tag...", "Remove tags from the cluster"),
1543
  "search-tags": (
1544
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1545
    "Searches the tags on all objects on"
1546
    " the cluster for a given pattern (regex)"),
1547
  "queue": (
1548
    QueueOps,
1549
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1550
    [], "drain|undrain|info", "Change queue properties"),
1551
  "watcher": (
1552
    WatcherOps,
1553
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1554
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1555
    [],
1556
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1557
  "modify": (
1558
    SetClusterParams, ARGS_NONE,
1559
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1560
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1561
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1562
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1563
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1564
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1565
     DISK_STATE_OPT, SUBMIT_OPT] +
1566
    INSTANCE_POLICY_OPTS,
1567
    "[opts...]",
1568
    "Alters the parameters of the cluster"),
1569
  "renew-crypto": (
1570
    RenewCrypto, ARGS_NONE,
1571
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1572
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1573
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1574
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1575
    "[opts...]",
1576
    "Renews cluster certificates, keys and secrets"),
1577
  "epo": (
1578
    Epo, [ArgUnknown()],
1579
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1580
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1581
    "[opts...] [args]",
1582
    "Performs an emergency power-off on given args"),
1583
  "activate-master-ip": (
1584
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1585
  "deactivate-master-ip": (
1586
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1587
    "Deactivates the master IP"),
1588
  }
1589

    
1590

    
1591
#: dictionary with aliases for commands
1592
aliases = {
1593
  "masterfailover": "master-failover",
1594
  "show": "info",
1595
}
1596

    
1597

    
1598
def Main():
1599
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1600
                     aliases=aliases)