Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 57dc299a

History | View | Annotate | Download (51.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45

    
46

    
47
ON_OPT = cli_option("--on", default=False,
48
                    action="store_true", dest="on",
49
                    help="Recover from an EPO")
50

    
51
GROUPS_OPT = cli_option("--groups", default=False,
52
                    action="store_true", dest="groups",
53
                    help="Arguments are node groups instead of nodes")
54

    
55
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
56
_EPO_PING_TIMEOUT = 1 # 1 second
57
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
58

    
59

    
60
@UsesRPC
61
def InitCluster(opts, args):
62
  """Initialize the cluster.
63

64
  @param opts: the command line options selected by the user
65
  @type args: list
66
  @param args: should contain only one element, the desired
67
      cluster name
68
  @rtype: int
69
  @return: the desired exit code
70

71
  """
72
  if not opts.lvm_storage and opts.vg_name:
73
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
74
    return 1
75

    
76
  vg_name = opts.vg_name
77
  if opts.lvm_storage and not opts.vg_name:
78
    vg_name = constants.DEFAULT_VG
79

    
80
  if not opts.drbd_storage and opts.drbd_helper:
81
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
82
    return 1
83

    
84
  drbd_helper = opts.drbd_helper
85
  if opts.drbd_storage and not opts.drbd_helper:
86
    drbd_helper = constants.DEFAULT_DRBD_HELPER
87

    
88
  master_netdev = opts.master_netdev
89
  if master_netdev is None:
90
    master_netdev = constants.DEFAULT_BRIDGE
91

    
92
  hvlist = opts.enabled_hypervisors
93
  if hvlist is None:
94
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
95
  hvlist = hvlist.split(",")
96

    
97
  hvparams = dict(opts.hvparams)
98
  beparams = opts.beparams
99
  nicparams = opts.nicparams
100

    
101
  diskparams = dict(opts.diskparams)
102

    
103
  # check the disk template types here, as we cannot rely on the type check done
104
  # by the opcode parameter types
105
  diskparams_keys = set(diskparams.keys())
106
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
107
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
108
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
109
    return 1
110

    
111
  # prepare beparams dict
112
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
113
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
114

    
115
  # prepare nicparams dict
116
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
117
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
118

    
119
  # prepare ndparams dict
120
  if opts.ndparams is None:
121
    ndparams = dict(constants.NDC_DEFAULTS)
122
  else:
123
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
124
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
125

    
126
  # prepare hvparams dict
127
  for hv in constants.HYPER_TYPES:
128
    if hv not in hvparams:
129
      hvparams[hv] = {}
130
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
131
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
132

    
133
  # prepare diskparams dict
134
  for templ in constants.DISK_TEMPLATES:
135
    if templ not in diskparams:
136
      diskparams[templ] = {}
137
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
138
                                         diskparams[templ])
139
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
140

    
141
  # prepare ipolicy dict
142
  ispecs_dts = opts.ispecs_disk_templates # hate long var names
143
  ipolicy_raw = \
144
    objects.CreateIPolicyFromOpts(ispecs_mem_size=opts.ispecs_mem_size,
145
                                  ispecs_cpu_count=opts.ispecs_cpu_count,
146
                                  ispecs_disk_count=opts.ispecs_disk_count,
147
                                  ispecs_disk_size=opts.ispecs_disk_size,
148
                                  ispecs_nic_count=opts.ispecs_nic_count,
149
                                  ispecs_disk_templates=ispecs_dts,
150
                                  fill_all=True)
151
  ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_raw)
152

    
153
  if opts.candidate_pool_size is None:
154
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
155

    
156
  if opts.mac_prefix is None:
157
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
158

    
159
  uid_pool = opts.uid_pool
160
  if uid_pool is not None:
161
    uid_pool = uidpool.ParseUidPool(uid_pool)
162

    
163
  if opts.prealloc_wipe_disks is None:
164
    opts.prealloc_wipe_disks = False
165

    
166
  external_ip_setup_script = opts.use_external_mip_script
167
  if external_ip_setup_script is None:
168
    external_ip_setup_script = False
169

    
170
  try:
171
    primary_ip_version = int(opts.primary_ip_version)
172
  except (ValueError, TypeError), err:
173
    ToStderr("Invalid primary ip version value: %s" % str(err))
174
    return 1
175

    
176
  master_netmask = opts.master_netmask
177
  try:
178
    if master_netmask is not None:
179
      master_netmask = int(master_netmask)
180
  except (ValueError, TypeError), err:
181
    ToStderr("Invalid master netmask value: %s" % str(err))
182
    return 1
183

    
184
  if opts.disk_state:
185
    disk_state = utils.FlatToDict(opts.disk_state)
186
  else:
187
    disk_state = {}
188

    
189
  hv_state = dict(opts.hv_state)
190

    
191
  bootstrap.InitCluster(cluster_name=args[0],
192
                        secondary_ip=opts.secondary_ip,
193
                        vg_name=vg_name,
194
                        mac_prefix=opts.mac_prefix,
195
                        master_netmask=master_netmask,
196
                        master_netdev=master_netdev,
197
                        file_storage_dir=opts.file_storage_dir,
198
                        shared_file_storage_dir=opts.shared_file_storage_dir,
199
                        enabled_hypervisors=hvlist,
200
                        hvparams=hvparams,
201
                        beparams=beparams,
202
                        nicparams=nicparams,
203
                        ndparams=ndparams,
204
                        diskparams=diskparams,
205
                        ipolicy=ipolicy,
206
                        candidate_pool_size=opts.candidate_pool_size,
207
                        modify_etc_hosts=opts.modify_etc_hosts,
208
                        modify_ssh_setup=opts.modify_ssh_setup,
209
                        maintain_node_health=opts.maintain_node_health,
210
                        drbd_helper=drbd_helper,
211
                        uid_pool=uid_pool,
212
                        default_iallocator=opts.default_iallocator,
213
                        primary_ip_version=primary_ip_version,
214
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
215
                        use_external_mip_script=external_ip_setup_script,
216
                        hv_state=hv_state,
217
                        disk_state=disk_state,
218
                        )
219
  op = opcodes.OpClusterPostInit()
220
  SubmitOpCode(op, opts=opts)
221
  return 0
222

    
223

    
224
@UsesRPC
225
def DestroyCluster(opts, args):
226
  """Destroy the cluster.
227

228
  @param opts: the command line options selected by the user
229
  @type args: list
230
  @param args: should be an empty list
231
  @rtype: int
232
  @return: the desired exit code
233

234
  """
235
  if not opts.yes_do_it:
236
    ToStderr("Destroying a cluster is irreversible. If you really want"
237
             " destroy this cluster, supply the --yes-do-it option.")
238
    return 1
239

    
240
  op = opcodes.OpClusterDestroy()
241
  master = SubmitOpCode(op, opts=opts)
242
  # if we reached this, the opcode didn't fail; we can proceed to
243
  # shutdown all the daemons
244
  bootstrap.FinalizeClusterDestroy(master)
245
  return 0
246

    
247

    
248
def RenameCluster(opts, args):
249
  """Rename the cluster.
250

251
  @param opts: the command line options selected by the user
252
  @type args: list
253
  @param args: should contain only one element, the new cluster name
254
  @rtype: int
255
  @return: the desired exit code
256

257
  """
258
  cl = GetClient()
259

    
260
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
261

    
262
  new_name = args[0]
263
  if not opts.force:
264
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
265
                " connected over the network to the cluster name, the"
266
                " operation is very dangerous as the IP address will be"
267
                " removed from the node and the change may not go through."
268
                " Continue?") % (cluster_name, new_name)
269
    if not AskUser(usertext):
270
      return 1
271

    
272
  op = opcodes.OpClusterRename(name=new_name)
273
  result = SubmitOpCode(op, opts=opts, cl=cl)
274

    
275
  if result:
276
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
277

    
278
  return 0
279

    
280

    
281
def ActivateMasterIp(opts, args):
282
  """Activates the master IP.
283

284
  """
285
  op = opcodes.OpClusterActivateMasterIp()
286
  SubmitOpCode(op)
287
  return 0
288

    
289

    
290
def DeactivateMasterIp(opts, args):
291
  """Deactivates the master IP.
292

293
  """
294
  if not opts.confirm:
295
    usertext = ("This will disable the master IP. All the open connections to"
296
                " the master IP will be closed. To reach the master you will"
297
                " need to use its node IP."
298
                " Continue?")
299
    if not AskUser(usertext):
300
      return 1
301

    
302
  op = opcodes.OpClusterDeactivateMasterIp()
303
  SubmitOpCode(op)
304
  return 0
305

    
306

    
307
def RedistributeConfig(opts, args):
308
  """Forces push of the cluster configuration.
309

310
  @param opts: the command line options selected by the user
311
  @type args: list
312
  @param args: empty list
313
  @rtype: int
314
  @return: the desired exit code
315

316
  """
317
  op = opcodes.OpClusterRedistConf()
318
  SubmitOrSend(op, opts)
319
  return 0
320

    
321

    
322
def ShowClusterVersion(opts, args):
323
  """Write version of ganeti software to the standard output.
324

325
  @param opts: the command line options selected by the user
326
  @type args: list
327
  @param args: should be an empty list
328
  @rtype: int
329
  @return: the desired exit code
330

331
  """
332
  cl = GetClient()
333
  result = cl.QueryClusterInfo()
334
  ToStdout("Software version: %s", result["software_version"])
335
  ToStdout("Internode protocol: %s", result["protocol_version"])
336
  ToStdout("Configuration format: %s", result["config_version"])
337
  ToStdout("OS api version: %s", result["os_api_version"])
338
  ToStdout("Export interface: %s", result["export_version"])
339
  return 0
340

    
341

    
342
def ShowClusterMaster(opts, args):
343
  """Write name of master node to the standard output.
344

345
  @param opts: the command line options selected by the user
346
  @type args: list
347
  @param args: should be an empty list
348
  @rtype: int
349
  @return: the desired exit code
350

351
  """
352
  master = bootstrap.GetMaster()
353
  ToStdout(master)
354
  return 0
355

    
356

    
357
def _PrintGroupedParams(paramsdict, level=1, roman=False):
358
  """Print Grouped parameters (be, nic, disk) by group.
359

360
  @type paramsdict: dict of dicts
361
  @param paramsdict: {group: {param: value, ...}, ...}
362
  @type level: int
363
  @param level: Level of indention
364

365
  """
366
  indent = "  " * level
367
  for item, val in sorted(paramsdict.items()):
368
    if isinstance(val, dict):
369
      ToStdout("%s- %s:", indent, item)
370
      _PrintGroupedParams(val, level=level + 1, roman=roman)
371
    elif roman and isinstance(val, int):
372
      ToStdout("%s  %s: %s", indent, item, compat.TryToRoman(val))
373
    else:
374
      ToStdout("%s  %s: %s", indent, item, val)
375

    
376

    
377
def ShowClusterConfig(opts, args):
378
  """Shows cluster information.
379

380
  @param opts: the command line options selected by the user
381
  @type args: list
382
  @param args: should be an empty list
383
  @rtype: int
384
  @return: the desired exit code
385

386
  """
387
  cl = GetClient()
388
  result = cl.QueryClusterInfo()
389

    
390
  ToStdout("Cluster name: %s", result["name"])
391
  ToStdout("Cluster UUID: %s", result["uuid"])
392

    
393
  ToStdout("Creation time: %s", utils.FormatTime(result["ctime"]))
394
  ToStdout("Modification time: %s", utils.FormatTime(result["mtime"]))
395

    
396
  ToStdout("Master node: %s", result["master"])
397

    
398
  ToStdout("Architecture (this node): %s (%s)",
399
           result["architecture"][0], result["architecture"][1])
400

    
401
  if result["tags"]:
402
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
403
  else:
404
    tags = "(none)"
405

    
406
  ToStdout("Tags: %s", tags)
407

    
408
  ToStdout("Default hypervisor: %s", result["default_hypervisor"])
409
  ToStdout("Enabled hypervisors: %s",
410
           utils.CommaJoin(result["enabled_hypervisors"]))
411

    
412
  ToStdout("Hypervisor parameters:")
413
  _PrintGroupedParams(result["hvparams"])
414

    
415
  ToStdout("OS-specific hypervisor parameters:")
416
  _PrintGroupedParams(result["os_hvp"])
417

    
418
  ToStdout("OS parameters:")
419
  _PrintGroupedParams(result["osparams"])
420

    
421
  ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"]))
422
  ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"]))
423

    
424
  ToStdout("Cluster parameters:")
425
  ToStdout("  - candidate pool size: %s",
426
            compat.TryToRoman(result["candidate_pool_size"],
427
                              convert=opts.roman_integers))
428
  ToStdout("  - master netdev: %s", result["master_netdev"])
429
  ToStdout("  - master netmask: %s", result["master_netmask"])
430
  ToStdout("  - use external master IP address setup script: %s",
431
           result["use_external_mip_script"])
432
  ToStdout("  - lvm volume group: %s", result["volume_group_name"])
433
  if result["reserved_lvs"]:
434
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
435
  else:
436
    reserved_lvs = "(none)"
437
  ToStdout("  - lvm reserved volumes: %s", reserved_lvs)
438
  ToStdout("  - drbd usermode helper: %s", result["drbd_usermode_helper"])
439
  ToStdout("  - file storage path: %s", result["file_storage_dir"])
440
  ToStdout("  - shared file storage path: %s",
441
           result["shared_file_storage_dir"])
442
  ToStdout("  - maintenance of node health: %s",
443
           result["maintain_node_health"])
444
  ToStdout("  - uid pool: %s",
445
            uidpool.FormatUidPool(result["uid_pool"],
446
                                  roman=opts.roman_integers))
447
  ToStdout("  - default instance allocator: %s", result["default_iallocator"])
448
  ToStdout("  - primary ip version: %d", result["primary_ip_version"])
449
  ToStdout("  - preallocation wipe disks: %s", result["prealloc_wipe_disks"])
450
  ToStdout("  - OS search path: %s", utils.CommaJoin(constants.OS_SEARCH_PATH))
451

    
452
  ToStdout("Default node parameters:")
453
  _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers)
454

    
455
  ToStdout("Default instance parameters:")
456
  _PrintGroupedParams(result["beparams"], roman=opts.roman_integers)
457

    
458
  ToStdout("Default nic parameters:")
459
  _PrintGroupedParams(result["nicparams"], roman=opts.roman_integers)
460

    
461
  ToStdout("Instance policy - limits for instances:")
462
  for key in constants.IPOLICY_PARAMETERS:
463
    ToStdout("  - %s", key)
464
    _PrintGroupedParams(result["ipolicy"][key], roman=opts.roman_integers)
465
  ToStdout("  - enabled disk templates: %s",
466
           utils.CommaJoin(result["ipolicy"][constants.ISPECS_DTS]))
467

    
468
  return 0
469

    
470

    
471
def ClusterCopyFile(opts, args):
472
  """Copy a file from master to some nodes.
473

474
  @param opts: the command line options selected by the user
475
  @type args: list
476
  @param args: should contain only one element, the path of
477
      the file to be copied
478
  @rtype: int
479
  @return: the desired exit code
480

481
  """
482
  filename = args[0]
483
  if not os.path.exists(filename):
484
    raise errors.OpPrereqError("No such filename '%s'" % filename,
485
                               errors.ECODE_INVAL)
486

    
487
  cl = GetClient()
488

    
489
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
490

    
491
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
492
                           secondary_ips=opts.use_replication_network,
493
                           nodegroup=opts.nodegroup)
494

    
495
  srun = ssh.SshRunner(cluster_name=cluster_name)
496
  for node in results:
497
    if not srun.CopyFileToNode(node, filename):
498
      ToStderr("Copy of file %s to node %s failed", filename, node)
499

    
500
  return 0
501

    
502

    
503
def RunClusterCommand(opts, args):
504
  """Run a command on some nodes.
505

506
  @param opts: the command line options selected by the user
507
  @type args: list
508
  @param args: should contain the command to be run and its arguments
509
  @rtype: int
510
  @return: the desired exit code
511

512
  """
513
  cl = GetClient()
514

    
515
  command = " ".join(args)
516

    
517
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
518

    
519
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
520
                                                    "master_node"])
521

    
522
  srun = ssh.SshRunner(cluster_name=cluster_name)
523

    
524
  # Make sure master node is at list end
525
  if master_node in nodes:
526
    nodes.remove(master_node)
527
    nodes.append(master_node)
528

    
529
  for name in nodes:
530
    result = srun.Run(name, "root", command)
531
    ToStdout("------------------------------------------------")
532
    ToStdout("node: %s", name)
533
    ToStdout("%s", result.output)
534
    ToStdout("return code = %s", result.exit_code)
535

    
536
  return 0
537

    
538

    
539
def VerifyCluster(opts, args):
540
  """Verify integrity of cluster, performing various test on nodes.
541

542
  @param opts: the command line options selected by the user
543
  @type args: list
544
  @param args: should be an empty list
545
  @rtype: int
546
  @return: the desired exit code
547

548
  """
549
  skip_checks = []
550

    
551
  if opts.skip_nplusone_mem:
552
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
553

    
554
  cl = GetClient()
555

    
556
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
557
                               error_codes=opts.error_codes,
558
                               debug_simulate_errors=opts.simulate_errors,
559
                               skip_checks=skip_checks,
560
                               ignore_errors=opts.ignore_errors,
561
                               group_name=opts.nodegroup)
562
  result = SubmitOpCode(op, cl=cl, opts=opts)
563

    
564
  # Keep track of submitted jobs
565
  jex = JobExecutor(cl=cl, opts=opts)
566

    
567
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
568
    jex.AddJobId(None, status, job_id)
569

    
570
  results = jex.GetResults()
571

    
572
  (bad_jobs, bad_results) = \
573
    map(len,
574
        # Convert iterators to lists
575
        map(list,
576
            # Count errors
577
            map(compat.partial(itertools.ifilterfalse, bool),
578
                # Convert result to booleans in a tuple
579
                zip(*((job_success, len(op_results) == 1 and op_results[0])
580
                      for (job_success, op_results) in results)))))
581

    
582
  if bad_jobs == 0 and bad_results == 0:
583
    rcode = constants.EXIT_SUCCESS
584
  else:
585
    rcode = constants.EXIT_FAILURE
586
    if bad_jobs > 0:
587
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
588

    
589
  return rcode
590

    
591

    
592
def VerifyDisks(opts, args):
593
  """Verify integrity of cluster disks.
594

595
  @param opts: the command line options selected by the user
596
  @type args: list
597
  @param args: should be an empty list
598
  @rtype: int
599
  @return: the desired exit code
600

601
  """
602
  cl = GetClient()
603

    
604
  op = opcodes.OpClusterVerifyDisks()
605

    
606
  result = SubmitOpCode(op, cl=cl, opts=opts)
607

    
608
  # Keep track of submitted jobs
609
  jex = JobExecutor(cl=cl, opts=opts)
610

    
611
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
612
    jex.AddJobId(None, status, job_id)
613

    
614
  retcode = constants.EXIT_SUCCESS
615

    
616
  for (status, result) in jex.GetResults():
617
    if not status:
618
      ToStdout("Job failed: %s", result)
619
      continue
620

    
621
    ((bad_nodes, instances, missing), ) = result
622

    
623
    for node, text in bad_nodes.items():
624
      ToStdout("Error gathering data on node %s: %s",
625
               node, utils.SafeEncode(text[-400:]))
626
      retcode = constants.EXIT_FAILURE
627
      ToStdout("You need to fix these nodes first before fixing instances")
628

    
629
    for iname in instances:
630
      if iname in missing:
631
        continue
632
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
633
      try:
634
        ToStdout("Activating disks for instance '%s'", iname)
635
        SubmitOpCode(op, opts=opts, cl=cl)
636
      except errors.GenericError, err:
637
        nret, msg = FormatError(err)
638
        retcode |= nret
639
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
640

    
641
    if missing:
642
      for iname, ival in missing.iteritems():
643
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
644
        if all_missing:
645
          ToStdout("Instance %s cannot be verified as it lives on"
646
                   " broken nodes", iname)
647
        else:
648
          ToStdout("Instance %s has missing logical volumes:", iname)
649
          ival.sort()
650
          for node, vol in ival:
651
            if node in bad_nodes:
652
              ToStdout("\tbroken node %s /dev/%s", node, vol)
653
            else:
654
              ToStdout("\t%s /dev/%s", node, vol)
655

    
656
      ToStdout("You need to replace or recreate disks for all the above"
657
               " instances if this message persists after fixing broken nodes.")
658
      retcode = constants.EXIT_FAILURE
659

    
660
  return retcode
661

    
662

    
663
def RepairDiskSizes(opts, args):
664
  """Verify sizes of cluster disks.
665

666
  @param opts: the command line options selected by the user
667
  @type args: list
668
  @param args: optional list of instances to restrict check to
669
  @rtype: int
670
  @return: the desired exit code
671

672
  """
673
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
674
  SubmitOpCode(op, opts=opts)
675

    
676

    
677
@UsesRPC
678
def MasterFailover(opts, args):
679
  """Failover the master node.
680

681
  This command, when run on a non-master node, will cause the current
682
  master to cease being master, and the non-master to become new
683
  master.
684

685
  @param opts: the command line options selected by the user
686
  @type args: list
687
  @param args: should be an empty list
688
  @rtype: int
689
  @return: the desired exit code
690

691
  """
692
  if opts.no_voting:
693
    usertext = ("This will perform the failover even if most other nodes"
694
                " are down, or if this node is outdated. This is dangerous"
695
                " as it can lead to a non-consistent cluster. Check the"
696
                " gnt-cluster(8) man page before proceeding. Continue?")
697
    if not AskUser(usertext):
698
      return 1
699

    
700
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
701

    
702

    
703
def MasterPing(opts, args):
704
  """Checks if the master is alive.
705

706
  @param opts: the command line options selected by the user
707
  @type args: list
708
  @param args: should be an empty list
709
  @rtype: int
710
  @return: the desired exit code
711

712
  """
713
  try:
714
    cl = GetClient()
715
    cl.QueryClusterInfo()
716
    return 0
717
  except Exception: # pylint: disable=W0703
718
    return 1
719

    
720

    
721
def SearchTags(opts, args):
722
  """Searches the tags on all the cluster.
723

724
  @param opts: the command line options selected by the user
725
  @type args: list
726
  @param args: should contain only one element, the tag pattern
727
  @rtype: int
728
  @return: the desired exit code
729

730
  """
731
  op = opcodes.OpTagsSearch(pattern=args[0])
732
  result = SubmitOpCode(op, opts=opts)
733
  if not result:
734
    return 1
735
  result = list(result)
736
  result.sort()
737
  for path, tag in result:
738
    ToStdout("%s %s", path, tag)
739

    
740

    
741
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
742
  """Reads and verifies an X509 certificate.
743

744
  @type cert_filename: string
745
  @param cert_filename: the path of the file containing the certificate to
746
                        verify encoded in PEM format
747
  @type verify_private_key: bool
748
  @param verify_private_key: whether to verify the private key in addition to
749
                             the public certificate
750
  @rtype: string
751
  @return: a string containing the PEM-encoded certificate.
752

753
  """
754
  try:
755
    pem = utils.ReadFile(cert_filename)
756
  except IOError, err:
757
    raise errors.X509CertError(cert_filename,
758
                               "Unable to read certificate: %s" % str(err))
759

    
760
  try:
761
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
762
  except Exception, err:
763
    raise errors.X509CertError(cert_filename,
764
                               "Unable to load certificate: %s" % str(err))
765

    
766
  if verify_private_key:
767
    try:
768
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
769
    except Exception, err:
770
      raise errors.X509CertError(cert_filename,
771
                                 "Unable to load private key: %s" % str(err))
772

    
773
  return pem
774

    
775

    
776
def _RenewCrypto(new_cluster_cert, new_rapi_cert, #pylint: disable=R0911
777
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
778
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
779
                 cds_filename, force):
780
  """Renews cluster certificates, keys and secrets.
781

782
  @type new_cluster_cert: bool
783
  @param new_cluster_cert: Whether to generate a new cluster certificate
784
  @type new_rapi_cert: bool
785
  @param new_rapi_cert: Whether to generate a new RAPI certificate
786
  @type rapi_cert_filename: string
787
  @param rapi_cert_filename: Path to file containing new RAPI certificate
788
  @type new_spice_cert: bool
789
  @param new_spice_cert: Whether to generate a new SPICE certificate
790
  @type spice_cert_filename: string
791
  @param spice_cert_filename: Path to file containing new SPICE certificate
792
  @type spice_cacert_filename: string
793
  @param spice_cacert_filename: Path to file containing the certificate of the
794
                                CA that signed the SPICE certificate
795
  @type new_confd_hmac_key: bool
796
  @param new_confd_hmac_key: Whether to generate a new HMAC key
797
  @type new_cds: bool
798
  @param new_cds: Whether to generate a new cluster domain secret
799
  @type cds_filename: string
800
  @param cds_filename: Path to file containing new cluster domain secret
801
  @type force: bool
802
  @param force: Whether to ask user for confirmation
803

804
  """
805
  if new_rapi_cert and rapi_cert_filename:
806
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
807
             " options can be specified at the same time.")
808
    return 1
809

    
810
  if new_cds and cds_filename:
811
    ToStderr("Only one of the --new-cluster-domain-secret and"
812
             " --cluster-domain-secret options can be specified at"
813
             " the same time.")
814
    return 1
815

    
816
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
817
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
818
             " and --spice-ca-certificate must not be used.")
819
    return 1
820

    
821
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
822
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
823
             " specified.")
824
    return 1
825

    
826
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
827
  try:
828
    if rapi_cert_filename:
829
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
830
    if spice_cert_filename:
831
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
832
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
833
  except errors.X509CertError, err:
834
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
835
    return 1
836

    
837
  if cds_filename:
838
    try:
839
      cds = utils.ReadFile(cds_filename)
840
    except Exception, err: # pylint: disable=W0703
841
      ToStderr("Can't load new cluster domain secret from %s: %s" %
842
               (cds_filename, str(err)))
843
      return 1
844
  else:
845
    cds = None
846

    
847
  if not force:
848
    usertext = ("This requires all daemons on all nodes to be restarted and"
849
                " may take some time. Continue?")
850
    if not AskUser(usertext):
851
      return 1
852

    
853
  def _RenewCryptoInner(ctx):
854
    ctx.feedback_fn("Updating certificates and keys")
855
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
856
                                    new_rapi_cert,
857
                                    new_spice_cert,
858
                                    new_confd_hmac_key,
859
                                    new_cds,
860
                                    rapi_cert_pem=rapi_cert_pem,
861
                                    spice_cert_pem=spice_cert_pem,
862
                                    spice_cacert_pem=spice_cacert_pem,
863
                                    cds=cds)
864

    
865
    files_to_copy = []
866

    
867
    if new_cluster_cert:
868
      files_to_copy.append(constants.NODED_CERT_FILE)
869

    
870
    if new_rapi_cert or rapi_cert_pem:
871
      files_to_copy.append(constants.RAPI_CERT_FILE)
872

    
873
    if new_spice_cert or spice_cert_pem:
874
      files_to_copy.append(constants.SPICE_CERT_FILE)
875
      files_to_copy.append(constants.SPICE_CACERT_FILE)
876

    
877
    if new_confd_hmac_key:
878
      files_to_copy.append(constants.CONFD_HMAC_KEY)
879

    
880
    if new_cds or cds:
881
      files_to_copy.append(constants.CLUSTER_DOMAIN_SECRET_FILE)
882

    
883
    if files_to_copy:
884
      for node_name in ctx.nonmaster_nodes:
885
        ctx.feedback_fn("Copying %s to %s" %
886
                        (", ".join(files_to_copy), node_name))
887
        for file_name in files_to_copy:
888
          ctx.ssh.CopyFileToNode(node_name, file_name)
889

    
890
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
891

    
892
  ToStdout("All requested certificates and keys have been replaced."
893
           " Running \"gnt-cluster verify\" now is recommended.")
894

    
895
  return 0
896

    
897

    
898
def RenewCrypto(opts, args):
899
  """Renews cluster certificates, keys and secrets.
900

901
  """
902
  return _RenewCrypto(opts.new_cluster_cert,
903
                      opts.new_rapi_cert,
904
                      opts.rapi_cert,
905
                      opts.new_spice_cert,
906
                      opts.spice_cert,
907
                      opts.spice_cacert,
908
                      opts.new_confd_hmac_key,
909
                      opts.new_cluster_domain_secret,
910
                      opts.cluster_domain_secret,
911
                      opts.force)
912

    
913

    
914
def SetClusterParams(opts, args):
915
  """Modify the cluster.
916

917
  @param opts: the command line options selected by the user
918
  @type args: list
919
  @param args: should be an empty list
920
  @rtype: int
921
  @return: the desired exit code
922

923
  """
924
  if not (not opts.lvm_storage or opts.vg_name or
925
          not opts.drbd_storage or opts.drbd_helper or
926
          opts.enabled_hypervisors or opts.hvparams or
927
          opts.beparams or opts.nicparams or
928
          opts.ndparams or opts.diskparams or
929
          opts.candidate_pool_size is not None or
930
          opts.uid_pool is not None or
931
          opts.maintain_node_health is not None or
932
          opts.add_uids is not None or
933
          opts.remove_uids is not None or
934
          opts.default_iallocator is not None or
935
          opts.reserved_lvs is not None or
936
          opts.master_netdev is not None or
937
          opts.master_netmask is not None or
938
          opts.use_external_mip_script is not None or
939
          opts.prealloc_wipe_disks is not None or
940
          opts.hv_state or
941
          opts.disk_state or
942
          opts.ispecs_mem_size is not None or
943
          opts.ispecs_cpu_count is not None or
944
          opts.ispecs_disk_count is not None or
945
          opts.ispecs_disk_size is not None or
946
          opts.ispecs_nic_count is not None):
947
    ToStderr("Please give at least one of the parameters.")
948
    return 1
949

    
950
  vg_name = opts.vg_name
951
  if not opts.lvm_storage and opts.vg_name:
952
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
953
    return 1
954

    
955
  if not opts.lvm_storage:
956
    vg_name = ""
957

    
958
  drbd_helper = opts.drbd_helper
959
  if not opts.drbd_storage and opts.drbd_helper:
960
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
961
    return 1
962

    
963
  if not opts.drbd_storage:
964
    drbd_helper = ""
965

    
966
  hvlist = opts.enabled_hypervisors
967
  if hvlist is not None:
968
    hvlist = hvlist.split(",")
969

    
970
  # a list of (name, dict) we can pass directly to dict() (or [])
971
  hvparams = dict(opts.hvparams)
972
  for hv_params in hvparams.values():
973
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
974

    
975
  diskparams = dict(opts.diskparams)
976

    
977
  for dt_params in diskparams.values():
978
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
979

    
980
  beparams = opts.beparams
981
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
982

    
983
  nicparams = opts.nicparams
984
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
985

    
986
  ndparams = opts.ndparams
987
  if ndparams is not None:
988
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
989

    
990
  ispecs_dts = opts.ispecs_disk_templates
991
  ipolicy = \
992
    objects.CreateIPolicyFromOpts(ispecs_mem_size=opts.ispecs_mem_size,
993
                                  ispecs_cpu_count=opts.ispecs_cpu_count,
994
                                  ispecs_disk_count=opts.ispecs_disk_count,
995
                                  ispecs_disk_size=opts.ispecs_disk_size,
996
                                  ispecs_nic_count=opts.ispecs_nic_count,
997
                                  ispecs_disk_templates=ispecs_dts)
998

    
999
  mnh = opts.maintain_node_health
1000

    
1001
  uid_pool = opts.uid_pool
1002
  if uid_pool is not None:
1003
    uid_pool = uidpool.ParseUidPool(uid_pool)
1004

    
1005
  add_uids = opts.add_uids
1006
  if add_uids is not None:
1007
    add_uids = uidpool.ParseUidPool(add_uids)
1008

    
1009
  remove_uids = opts.remove_uids
1010
  if remove_uids is not None:
1011
    remove_uids = uidpool.ParseUidPool(remove_uids)
1012

    
1013
  if opts.reserved_lvs is not None:
1014
    if opts.reserved_lvs == "":
1015
      opts.reserved_lvs = []
1016
    else:
1017
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1018

    
1019
  if opts.master_netmask is not None:
1020
    try:
1021
      opts.master_netmask = int(opts.master_netmask)
1022
    except ValueError:
1023
      ToStderr("The --master-netmask option expects an int parameter.")
1024
      return 1
1025

    
1026
  ext_ip_script = opts.use_external_mip_script
1027

    
1028
  if opts.disk_state:
1029
    disk_state = utils.FlatToDict(opts.disk_state)
1030
  else:
1031
    disk_state = {}
1032

    
1033
  hv_state = dict(opts.hv_state)
1034

    
1035
  op = opcodes.OpClusterSetParams(vg_name=vg_name,
1036
                                  drbd_helper=drbd_helper,
1037
                                  enabled_hypervisors=hvlist,
1038
                                  hvparams=hvparams,
1039
                                  os_hvp=None,
1040
                                  beparams=beparams,
1041
                                  nicparams=nicparams,
1042
                                  ndparams=ndparams,
1043
                                  diskparams=diskparams,
1044
                                  ipolicy=ipolicy,
1045
                                  candidate_pool_size=opts.candidate_pool_size,
1046
                                  maintain_node_health=mnh,
1047
                                  uid_pool=uid_pool,
1048
                                  add_uids=add_uids,
1049
                                  remove_uids=remove_uids,
1050
                                  default_iallocator=opts.default_iallocator,
1051
                                  prealloc_wipe_disks=opts.prealloc_wipe_disks,
1052
                                  master_netdev=opts.master_netdev,
1053
                                  master_netmask=opts.master_netmask,
1054
                                  reserved_lvs=opts.reserved_lvs,
1055
                                  use_external_mip_script=ext_ip_script,
1056
                                  hv_state=hv_state,
1057
                                  disk_state=disk_state,
1058
                                  )
1059
  SubmitOpCode(op, opts=opts)
1060
  return 0
1061

    
1062

    
1063
def QueueOps(opts, args):
1064
  """Queue operations.
1065

1066
  @param opts: the command line options selected by the user
1067
  @type args: list
1068
  @param args: should contain only one element, the subcommand
1069
  @rtype: int
1070
  @return: the desired exit code
1071

1072
  """
1073
  command = args[0]
1074
  client = GetClient()
1075
  if command in ("drain", "undrain"):
1076
    drain_flag = command == "drain"
1077
    client.SetQueueDrainFlag(drain_flag)
1078
  elif command == "info":
1079
    result = client.QueryConfigValues(["drain_flag"])
1080
    if result[0]:
1081
      val = "set"
1082
    else:
1083
      val = "unset"
1084
    ToStdout("The drain flag is %s" % val)
1085
  else:
1086
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1087
                               errors.ECODE_INVAL)
1088

    
1089
  return 0
1090

    
1091

    
1092
def _ShowWatcherPause(until):
1093
  if until is None or until < time.time():
1094
    ToStdout("The watcher is not paused.")
1095
  else:
1096
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1097

    
1098

    
1099
def WatcherOps(opts, args):
1100
  """Watcher operations.
1101

1102
  @param opts: the command line options selected by the user
1103
  @type args: list
1104
  @param args: should contain only one element, the subcommand
1105
  @rtype: int
1106
  @return: the desired exit code
1107

1108
  """
1109
  command = args[0]
1110
  client = GetClient()
1111

    
1112
  if command == "continue":
1113
    client.SetWatcherPause(None)
1114
    ToStdout("The watcher is no longer paused.")
1115

    
1116
  elif command == "pause":
1117
    if len(args) < 2:
1118
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1119

    
1120
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1121
    _ShowWatcherPause(result)
1122

    
1123
  elif command == "info":
1124
    result = client.QueryConfigValues(["watcher_pause"])
1125
    _ShowWatcherPause(result[0])
1126

    
1127
  else:
1128
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1129
                               errors.ECODE_INVAL)
1130

    
1131
  return 0
1132

    
1133

    
1134
def _OobPower(opts, node_list, power):
1135
  """Puts the node in the list to desired power state.
1136

1137
  @param opts: The command line options selected by the user
1138
  @param node_list: The list of nodes to operate on
1139
  @param power: True if they should be powered on, False otherwise
1140
  @return: The success of the operation (none failed)
1141

1142
  """
1143
  if power:
1144
    command = constants.OOB_POWER_ON
1145
  else:
1146
    command = constants.OOB_POWER_OFF
1147

    
1148
  op = opcodes.OpOobCommand(node_names=node_list,
1149
                            command=command,
1150
                            ignore_status=True,
1151
                            timeout=opts.oob_timeout,
1152
                            power_delay=opts.power_delay)
1153
  result = SubmitOpCode(op, opts=opts)
1154
  errs = 0
1155
  for node_result in result:
1156
    (node_tuple, data_tuple) = node_result
1157
    (_, node_name) = node_tuple
1158
    (data_status, _) = data_tuple
1159
    if data_status != constants.RS_NORMAL:
1160
      assert data_status != constants.RS_UNAVAIL
1161
      errs += 1
1162
      ToStderr("There was a problem changing power for %s, please investigate",
1163
               node_name)
1164

    
1165
  if errs > 0:
1166
    return False
1167

    
1168
  return True
1169

    
1170

    
1171
def _InstanceStart(opts, inst_list, start):
1172
  """Puts the instances in the list to desired state.
1173

1174
  @param opts: The command line options selected by the user
1175
  @param inst_list: The list of instances to operate on
1176
  @param start: True if they should be started, False for shutdown
1177
  @return: The success of the operation (none failed)
1178

1179
  """
1180
  if start:
1181
    opcls = opcodes.OpInstanceStartup
1182
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1183
  else:
1184
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1185
                           timeout=opts.shutdown_timeout)
1186
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1187

    
1188
  jex = JobExecutor(opts=opts)
1189

    
1190
  for inst in inst_list:
1191
    ToStdout("Submit %s of instance %s", text_submit, inst)
1192
    op = opcls(instance_name=inst)
1193
    jex.QueueJob(inst, op)
1194

    
1195
  results = jex.GetResults()
1196
  bad_cnt = len([1 for (success, _) in results if not success])
1197

    
1198
  if bad_cnt == 0:
1199
    ToStdout("All instances have been %s successfully", text_success)
1200
  else:
1201
    ToStderr("There were errors while %s instances:\n"
1202
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1203
             len(results))
1204
    return False
1205

    
1206
  return True
1207

    
1208

    
1209
class _RunWhenNodesReachableHelper:
1210
  """Helper class to make shared internal state sharing easier.
1211

1212
  @ivar success: Indicates if all action_cb calls were successful
1213

1214
  """
1215
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1216
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1217
    """Init the object.
1218

1219
    @param node_list: The list of nodes to be reachable
1220
    @param action_cb: Callback called when a new host is reachable
1221
    @type node2ip: dict
1222
    @param node2ip: Node to ip mapping
1223
    @param port: The port to use for the TCP ping
1224
    @param feedback_fn: The function used for feedback
1225
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1226
    @param _sleep_fn: Function to sleep (for unittest use only)
1227

1228
    """
1229
    self.down = set(node_list)
1230
    self.up = set()
1231
    self.node2ip = node2ip
1232
    self.success = True
1233
    self.action_cb = action_cb
1234
    self.port = port
1235
    self.feedback_fn = feedback_fn
1236
    self._ping_fn = _ping_fn
1237
    self._sleep_fn = _sleep_fn
1238

    
1239
  def __call__(self):
1240
    """When called we run action_cb.
1241

1242
    @raises utils.RetryAgain: When there are still down nodes
1243

1244
    """
1245
    if not self.action_cb(self.up):
1246
      self.success = False
1247

    
1248
    if self.down:
1249
      raise utils.RetryAgain()
1250
    else:
1251
      return self.success
1252

    
1253
  def Wait(self, secs):
1254
    """Checks if a host is up or waits remaining seconds.
1255

1256
    @param secs: The secs remaining
1257

1258
    """
1259
    start = time.time()
1260
    for node in self.down:
1261
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1262
                       live_port_needed=True):
1263
        self.feedback_fn("Node %s became available" % node)
1264
        self.up.add(node)
1265
        self.down -= self.up
1266
        # If we have a node available there is the possibility to run the
1267
        # action callback successfully, therefore we don't wait and return
1268
        return
1269

    
1270
    self._sleep_fn(max(0.0, start + secs - time.time()))
1271

    
1272

    
1273
def _RunWhenNodesReachable(node_list, action_cb, interval):
1274
  """Run action_cb when nodes become reachable.
1275

1276
  @param node_list: The list of nodes to be reachable
1277
  @param action_cb: Callback called when a new host is reachable
1278
  @param interval: The earliest time to retry
1279

1280
  """
1281
  client = GetClient()
1282
  cluster_info = client.QueryClusterInfo()
1283
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1284
    family = netutils.IPAddress.family
1285
  else:
1286
    family = netutils.IP6Address.family
1287

    
1288
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1289
                 for node in node_list)
1290

    
1291
  port = netutils.GetDaemonPort(constants.NODED)
1292
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1293
                                        ToStdout)
1294

    
1295
  try:
1296
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1297
                       wait_fn=helper.Wait)
1298
  except utils.RetryTimeout:
1299
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1300
             " again:\n  - %s", "  - ".join(helper.down))
1301
    return False
1302

    
1303

    
1304
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1305
                          _instance_start_fn=_InstanceStart):
1306
  """Start the instances conditional based on node_states.
1307

1308
  @param opts: The command line options selected by the user
1309
  @param inst_map: A dict of inst -> nodes mapping
1310
  @param nodes_online: A list of nodes online
1311
  @param _instance_start_fn: Callback to start instances (unittest use only)
1312
  @return: Success of the operation on all instances
1313

1314
  """
1315
  start_inst_list = []
1316
  for (inst, nodes) in inst_map.items():
1317
    if not (nodes - nodes_online):
1318
      # All nodes the instance lives on are back online
1319
      start_inst_list.append(inst)
1320

    
1321
  for inst in start_inst_list:
1322
    del inst_map[inst]
1323

    
1324
  if start_inst_list:
1325
    return _instance_start_fn(opts, start_inst_list, True)
1326

    
1327
  return True
1328

    
1329

    
1330
def _EpoOn(opts, full_node_list, node_list, inst_map):
1331
  """Does the actual power on.
1332

1333
  @param opts: The command line options selected by the user
1334
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1335
                         OOB)
1336
  @param node_list: The list of nodes to operate on (all need to support OOB)
1337
  @param inst_map: A dict of inst -> nodes mapping
1338
  @return: The desired exit status
1339

1340
  """
1341
  if node_list and not _OobPower(opts, node_list, False):
1342
    ToStderr("Not all nodes seem to get back up, investigate and start"
1343
             " manually if needed")
1344

    
1345
  # Wait for the nodes to be back up
1346
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1347

    
1348
  ToStdout("Waiting until all nodes are available again")
1349
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1350
    ToStderr("Please investigate and start stopped instances manually")
1351
    return constants.EXIT_FAILURE
1352

    
1353
  return constants.EXIT_SUCCESS
1354

    
1355

    
1356
def _EpoOff(opts, node_list, inst_map):
1357
  """Does the actual power off.
1358

1359
  @param opts: The command line options selected by the user
1360
  @param node_list: The list of nodes to operate on (all need to support OOB)
1361
  @param inst_map: A dict of inst -> nodes mapping
1362
  @return: The desired exit status
1363

1364
  """
1365
  if not _InstanceStart(opts, inst_map.keys(), False):
1366
    ToStderr("Please investigate and stop instances manually before continuing")
1367
    return constants.EXIT_FAILURE
1368

    
1369
  if not node_list:
1370
    return constants.EXIT_SUCCESS
1371

    
1372
  if _OobPower(opts, node_list, False):
1373
    return constants.EXIT_SUCCESS
1374
  else:
1375
    return constants.EXIT_FAILURE
1376

    
1377

    
1378
def Epo(opts, args):
1379
  """EPO operations.
1380

1381
  @param opts: the command line options selected by the user
1382
  @type args: list
1383
  @param args: should contain only one element, the subcommand
1384
  @rtype: int
1385
  @return: the desired exit code
1386

1387
  """
1388
  if opts.groups and opts.show_all:
1389
    ToStderr("Only one of --groups or --all are allowed")
1390
    return constants.EXIT_FAILURE
1391
  elif args and opts.show_all:
1392
    ToStderr("Arguments in combination with --all are not allowed")
1393
    return constants.EXIT_FAILURE
1394

    
1395
  client = GetClient()
1396

    
1397
  if opts.groups:
1398
    node_query_list = itertools.chain(*client.QueryGroups(names=args,
1399
                                                          fields=["node_list"],
1400
                                                          use_locking=False))
1401
  else:
1402
    node_query_list = args
1403

    
1404
  result = client.QueryNodes(names=node_query_list,
1405
                             fields=["name", "master", "pinst_list",
1406
                                     "sinst_list", "powered", "offline"],
1407
                             use_locking=False)
1408
  node_list = []
1409
  inst_map = {}
1410
  for (idx, (node, master, pinsts, sinsts, powered,
1411
             offline)) in enumerate(result):
1412
    # Normalize the node_query_list as well
1413
    if not opts.show_all:
1414
      node_query_list[idx] = node
1415
    if not offline:
1416
      for inst in (pinsts + sinsts):
1417
        if inst in inst_map:
1418
          if not master:
1419
            inst_map[inst].add(node)
1420
        elif master:
1421
          inst_map[inst] = set()
1422
        else:
1423
          inst_map[inst] = set([node])
1424

    
1425
    if master and opts.on:
1426
      # We ignore the master for turning on the machines, in fact we are
1427
      # already operating on the master at this point :)
1428
      continue
1429
    elif master and not opts.show_all:
1430
      ToStderr("%s is the master node, please do a master-failover to another"
1431
               " node not affected by the EPO or use --all if you intend to"
1432
               " shutdown the whole cluster", node)
1433
      return constants.EXIT_FAILURE
1434
    elif powered is None:
1435
      ToStdout("Node %s does not support out-of-band handling, it can not be"
1436
               " handled in a fully automated manner", node)
1437
    elif powered == opts.on:
1438
      ToStdout("Node %s is already in desired power state, skipping", node)
1439
    elif not offline or (offline and powered):
1440
      node_list.append(node)
1441

    
1442
  if not opts.force and not ConfirmOperation(node_query_list, "nodes", "epo"):
1443
    return constants.EXIT_FAILURE
1444

    
1445
  if opts.on:
1446
    return _EpoOn(opts, node_query_list, node_list, inst_map)
1447
  else:
1448
    return _EpoOff(opts, node_list, inst_map)
1449

    
1450
commands = {
1451
  "init": (
1452
    InitCluster, [ArgHost(min=1, max=1)],
1453
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1454
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1455
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1456
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1457
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1458
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1459
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1460
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + INSTANCE_POLICY_OPTS,
1461
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1462
  "destroy": (
1463
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1464
    "", "Destroy cluster"),
1465
  "rename": (
1466
    RenameCluster, [ArgHost(min=1, max=1)],
1467
    [FORCE_OPT, DRY_RUN_OPT],
1468
    "<new_name>",
1469
    "Renames the cluster"),
1470
  "redist-conf": (
1471
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1472
    "", "Forces a push of the configuration file and ssconf files"
1473
    " to the nodes in the cluster"),
1474
  "verify": (
1475
    VerifyCluster, ARGS_NONE,
1476
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1477
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1478
    "", "Does a check on the cluster configuration"),
1479
  "verify-disks": (
1480
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1481
    "", "Does a check on the cluster disk status"),
1482
  "repair-disk-sizes": (
1483
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1484
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1485
  "master-failover": (
1486
    MasterFailover, ARGS_NONE, [NOVOTING_OPT],
1487
    "", "Makes the current node the master"),
1488
  "master-ping": (
1489
    MasterPing, ARGS_NONE, [],
1490
    "", "Checks if the master is alive"),
1491
  "version": (
1492
    ShowClusterVersion, ARGS_NONE, [],
1493
    "", "Shows the cluster version"),
1494
  "getmaster": (
1495
    ShowClusterMaster, ARGS_NONE, [],
1496
    "", "Shows the cluster master"),
1497
  "copyfile": (
1498
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1499
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1500
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1501
  "command": (
1502
    RunClusterCommand, [ArgCommand(min=1)],
1503
    [NODE_LIST_OPT, NODEGROUP_OPT],
1504
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1505
  "info": (
1506
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1507
    "[--roman]", "Show cluster configuration"),
1508
  "list-tags": (
1509
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1510
  "add-tags": (
1511
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT],
1512
    "tag...", "Add tags to the cluster"),
1513
  "remove-tags": (
1514
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT],
1515
    "tag...", "Remove tags from the cluster"),
1516
  "search-tags": (
1517
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1518
    "Searches the tags on all objects on"
1519
    " the cluster for a given pattern (regex)"),
1520
  "queue": (
1521
    QueueOps,
1522
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1523
    [], "drain|undrain|info", "Change queue properties"),
1524
  "watcher": (
1525
    WatcherOps,
1526
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1527
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1528
    [],
1529
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1530
  "modify": (
1531
    SetClusterParams, ARGS_NONE,
1532
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1533
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1534
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1535
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1536
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1537
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1538
     DISK_STATE_OPT] +
1539
    INSTANCE_POLICY_OPTS,
1540
    "[opts...]",
1541
    "Alters the parameters of the cluster"),
1542
  "renew-crypto": (
1543
    RenewCrypto, ARGS_NONE,
1544
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1545
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1546
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1547
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1548
    "[opts...]",
1549
    "Renews cluster certificates, keys and secrets"),
1550
  "epo": (
1551
    Epo, [ArgUnknown()],
1552
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1553
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1554
    "[opts...] [args]",
1555
    "Performs an emergency power-off on given args"),
1556
  "activate-master-ip": (
1557
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1558
  "deactivate-master-ip": (
1559
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1560
    "Deactivates the master IP"),
1561
  }
1562

    
1563

    
1564
#: dictionary with aliases for commands
1565
aliases = {
1566
  "masterfailover": "master-failover",
1567
}
1568

    
1569

    
1570
def Main():
1571
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1572
                     aliases=aliases)