Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 5ae4945a

History | View | Annotate | Download (51.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45

    
46

    
47
ON_OPT = cli_option("--on", default=False,
48
                    action="store_true", dest="on",
49
                    help="Recover from an EPO")
50

    
51
GROUPS_OPT = cli_option("--groups", default=False,
52
                        action="store_true", dest="groups",
53
                        help="Arguments are node groups instead of nodes")
54

    
55
SHOW_MACHINE_OPT = cli_option("-M", "--show-machine-names", default=False,
56
                              action="store_true",
57
                              help="Show machine name for every line in output")
58

    
59
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
60
_EPO_PING_TIMEOUT = 1 # 1 second
61
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
62

    
63

    
64
@UsesRPC
65
def InitCluster(opts, args):
66
  """Initialize the cluster.
67

68
  @param opts: the command line options selected by the user
69
  @type args: list
70
  @param args: should contain only one element, the desired
71
      cluster name
72
  @rtype: int
73
  @return: the desired exit code
74

75
  """
76
  if not opts.lvm_storage and opts.vg_name:
77
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
78
    return 1
79

    
80
  vg_name = opts.vg_name
81
  if opts.lvm_storage and not opts.vg_name:
82
    vg_name = constants.DEFAULT_VG
83

    
84
  if not opts.drbd_storage and opts.drbd_helper:
85
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
86
    return 1
87

    
88
  drbd_helper = opts.drbd_helper
89
  if opts.drbd_storage and not opts.drbd_helper:
90
    drbd_helper = constants.DEFAULT_DRBD_HELPER
91

    
92
  master_netdev = opts.master_netdev
93
  if master_netdev is None:
94
    master_netdev = constants.DEFAULT_BRIDGE
95

    
96
  hvlist = opts.enabled_hypervisors
97
  if hvlist is None:
98
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
99
  hvlist = hvlist.split(",")
100

    
101
  hvparams = dict(opts.hvparams)
102
  beparams = opts.beparams
103
  nicparams = opts.nicparams
104

    
105
  diskparams = dict(opts.diskparams)
106

    
107
  # check the disk template types here, as we cannot rely on the type check done
108
  # by the opcode parameter types
109
  diskparams_keys = set(diskparams.keys())
110
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
111
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
112
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
113
    return 1
114

    
115
  # prepare beparams dict
116
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
117
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
118

    
119
  # prepare nicparams dict
120
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
121
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
122

    
123
  # prepare ndparams dict
124
  if opts.ndparams is None:
125
    ndparams = dict(constants.NDC_DEFAULTS)
126
  else:
127
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
128
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
129

    
130
  # prepare hvparams dict
131
  for hv in constants.HYPER_TYPES:
132
    if hv not in hvparams:
133
      hvparams[hv] = {}
134
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
135
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
136

    
137
  # prepare diskparams dict
138
  for templ in constants.DISK_TEMPLATES:
139
    if templ not in diskparams:
140
      diskparams[templ] = {}
141
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
142
                                         diskparams[templ])
143
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
144

    
145
  # prepare ipolicy dict
146
  ipolicy_raw = CreateIPolicyFromOpts(
147
    ispecs_mem_size=opts.ispecs_mem_size,
148
    ispecs_cpu_count=opts.ispecs_cpu_count,
149
    ispecs_disk_count=opts.ispecs_disk_count,
150
    ispecs_disk_size=opts.ispecs_disk_size,
151
    ispecs_nic_count=opts.ispecs_nic_count,
152
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
153
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
154
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
155
    fill_all=True)
156
  ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_raw)
157

    
158
  if opts.candidate_pool_size is None:
159
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
160

    
161
  if opts.mac_prefix is None:
162
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
163

    
164
  uid_pool = opts.uid_pool
165
  if uid_pool is not None:
166
    uid_pool = uidpool.ParseUidPool(uid_pool)
167

    
168
  if opts.prealloc_wipe_disks is None:
169
    opts.prealloc_wipe_disks = False
170

    
171
  external_ip_setup_script = opts.use_external_mip_script
172
  if external_ip_setup_script is None:
173
    external_ip_setup_script = False
174

    
175
  try:
176
    primary_ip_version = int(opts.primary_ip_version)
177
  except (ValueError, TypeError), err:
178
    ToStderr("Invalid primary ip version value: %s" % str(err))
179
    return 1
180

    
181
  master_netmask = opts.master_netmask
182
  try:
183
    if master_netmask is not None:
184
      master_netmask = int(master_netmask)
185
  except (ValueError, TypeError), err:
186
    ToStderr("Invalid master netmask value: %s" % str(err))
187
    return 1
188

    
189
  if opts.disk_state:
190
    disk_state = utils.FlatToDict(opts.disk_state)
191
  else:
192
    disk_state = {}
193

    
194
  hv_state = dict(opts.hv_state)
195

    
196
  bootstrap.InitCluster(cluster_name=args[0],
197
                        secondary_ip=opts.secondary_ip,
198
                        vg_name=vg_name,
199
                        mac_prefix=opts.mac_prefix,
200
                        master_netmask=master_netmask,
201
                        master_netdev=master_netdev,
202
                        file_storage_dir=opts.file_storage_dir,
203
                        shared_file_storage_dir=opts.shared_file_storage_dir,
204
                        enabled_hypervisors=hvlist,
205
                        hvparams=hvparams,
206
                        beparams=beparams,
207
                        nicparams=nicparams,
208
                        ndparams=ndparams,
209
                        diskparams=diskparams,
210
                        ipolicy=ipolicy,
211
                        candidate_pool_size=opts.candidate_pool_size,
212
                        modify_etc_hosts=opts.modify_etc_hosts,
213
                        modify_ssh_setup=opts.modify_ssh_setup,
214
                        maintain_node_health=opts.maintain_node_health,
215
                        drbd_helper=drbd_helper,
216
                        uid_pool=uid_pool,
217
                        default_iallocator=opts.default_iallocator,
218
                        primary_ip_version=primary_ip_version,
219
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
220
                        use_external_mip_script=external_ip_setup_script,
221
                        hv_state=hv_state,
222
                        disk_state=disk_state,
223
                        )
224
  op = opcodes.OpClusterPostInit()
225
  SubmitOpCode(op, opts=opts)
226
  return 0
227

    
228

    
229
@UsesRPC
230
def DestroyCluster(opts, args):
231
  """Destroy the cluster.
232

233
  @param opts: the command line options selected by the user
234
  @type args: list
235
  @param args: should be an empty list
236
  @rtype: int
237
  @return: the desired exit code
238

239
  """
240
  if not opts.yes_do_it:
241
    ToStderr("Destroying a cluster is irreversible. If you really want"
242
             " destroy this cluster, supply the --yes-do-it option.")
243
    return 1
244

    
245
  op = opcodes.OpClusterDestroy()
246
  master = SubmitOpCode(op, opts=opts)
247
  # if we reached this, the opcode didn't fail; we can proceed to
248
  # shutdown all the daemons
249
  bootstrap.FinalizeClusterDestroy(master)
250
  return 0
251

    
252

    
253
def RenameCluster(opts, args):
254
  """Rename the cluster.
255

256
  @param opts: the command line options selected by the user
257
  @type args: list
258
  @param args: should contain only one element, the new cluster name
259
  @rtype: int
260
  @return: the desired exit code
261

262
  """
263
  cl = GetClient()
264

    
265
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
266

    
267
  new_name = args[0]
268
  if not opts.force:
269
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
270
                " connected over the network to the cluster name, the"
271
                " operation is very dangerous as the IP address will be"
272
                " removed from the node and the change may not go through."
273
                " Continue?") % (cluster_name, new_name)
274
    if not AskUser(usertext):
275
      return 1
276

    
277
  op = opcodes.OpClusterRename(name=new_name)
278
  result = SubmitOpCode(op, opts=opts, cl=cl)
279

    
280
  if result:
281
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
282

    
283
  return 0
284

    
285

    
286
def ActivateMasterIp(opts, args):
287
  """Activates the master IP.
288

289
  """
290
  op = opcodes.OpClusterActivateMasterIp()
291
  SubmitOpCode(op)
292
  return 0
293

    
294

    
295
def DeactivateMasterIp(opts, args):
296
  """Deactivates the master IP.
297

298
  """
299
  if not opts.confirm:
300
    usertext = ("This will disable the master IP. All the open connections to"
301
                " the master IP will be closed. To reach the master you will"
302
                " need to use its node IP."
303
                " Continue?")
304
    if not AskUser(usertext):
305
      return 1
306

    
307
  op = opcodes.OpClusterDeactivateMasterIp()
308
  SubmitOpCode(op)
309
  return 0
310

    
311

    
312
def RedistributeConfig(opts, args):
313
  """Forces push of the cluster configuration.
314

315
  @param opts: the command line options selected by the user
316
  @type args: list
317
  @param args: empty list
318
  @rtype: int
319
  @return: the desired exit code
320

321
  """
322
  op = opcodes.OpClusterRedistConf()
323
  SubmitOrSend(op, opts)
324
  return 0
325

    
326

    
327
def ShowClusterVersion(opts, args):
328
  """Write version of ganeti software to the standard output.
329

330
  @param opts: the command line options selected by the user
331
  @type args: list
332
  @param args: should be an empty list
333
  @rtype: int
334
  @return: the desired exit code
335

336
  """
337
  cl = GetClient()
338
  result = cl.QueryClusterInfo()
339
  ToStdout("Software version: %s", result["software_version"])
340
  ToStdout("Internode protocol: %s", result["protocol_version"])
341
  ToStdout("Configuration format: %s", result["config_version"])
342
  ToStdout("OS api version: %s", result["os_api_version"])
343
  ToStdout("Export interface: %s", result["export_version"])
344
  return 0
345

    
346

    
347
def ShowClusterMaster(opts, args):
348
  """Write name of master node to the standard output.
349

350
  @param opts: the command line options selected by the user
351
  @type args: list
352
  @param args: should be an empty list
353
  @rtype: int
354
  @return: the desired exit code
355

356
  """
357
  master = bootstrap.GetMaster()
358
  ToStdout(master)
359
  return 0
360

    
361

    
362
def _PrintGroupedParams(paramsdict, level=1, roman=False):
363
  """Print Grouped parameters (be, nic, disk) by group.
364

365
  @type paramsdict: dict of dicts
366
  @param paramsdict: {group: {param: value, ...}, ...}
367
  @type level: int
368
  @param level: Level of indention
369

370
  """
371
  indent = "  " * level
372
  for item, val in sorted(paramsdict.items()):
373
    if isinstance(val, dict):
374
      ToStdout("%s- %s:", indent, item)
375
      _PrintGroupedParams(val, level=level + 1, roman=roman)
376
    elif roman and isinstance(val, int):
377
      ToStdout("%s  %s: %s", indent, item, compat.TryToRoman(val))
378
    else:
379
      ToStdout("%s  %s: %s", indent, item, val)
380

    
381

    
382
def ShowClusterConfig(opts, args):
383
  """Shows cluster information.
384

385
  @param opts: the command line options selected by the user
386
  @type args: list
387
  @param args: should be an empty list
388
  @rtype: int
389
  @return: the desired exit code
390

391
  """
392
  cl = GetClient()
393
  result = cl.QueryClusterInfo()
394

    
395
  ToStdout("Cluster name: %s", result["name"])
396
  ToStdout("Cluster UUID: %s", result["uuid"])
397

    
398
  ToStdout("Creation time: %s", utils.FormatTime(result["ctime"]))
399
  ToStdout("Modification time: %s", utils.FormatTime(result["mtime"]))
400

    
401
  ToStdout("Master node: %s", result["master"])
402

    
403
  ToStdout("Architecture (this node): %s (%s)",
404
           result["architecture"][0], result["architecture"][1])
405

    
406
  if result["tags"]:
407
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
408
  else:
409
    tags = "(none)"
410

    
411
  ToStdout("Tags: %s", tags)
412

    
413
  ToStdout("Default hypervisor: %s", result["default_hypervisor"])
414
  ToStdout("Enabled hypervisors: %s",
415
           utils.CommaJoin(result["enabled_hypervisors"]))
416

    
417
  ToStdout("Hypervisor parameters:")
418
  _PrintGroupedParams(result["hvparams"])
419

    
420
  ToStdout("OS-specific hypervisor parameters:")
421
  _PrintGroupedParams(result["os_hvp"])
422

    
423
  ToStdout("OS parameters:")
424
  _PrintGroupedParams(result["osparams"])
425

    
426
  ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"]))
427
  ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"]))
428

    
429
  ToStdout("Cluster parameters:")
430
  ToStdout("  - candidate pool size: %s",
431
            compat.TryToRoman(result["candidate_pool_size"],
432
                              convert=opts.roman_integers))
433
  ToStdout("  - master netdev: %s", result["master_netdev"])
434
  ToStdout("  - master netmask: %s", result["master_netmask"])
435
  ToStdout("  - use external master IP address setup script: %s",
436
           result["use_external_mip_script"])
437
  ToStdout("  - lvm volume group: %s", result["volume_group_name"])
438
  if result["reserved_lvs"]:
439
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
440
  else:
441
    reserved_lvs = "(none)"
442
  ToStdout("  - lvm reserved volumes: %s", reserved_lvs)
443
  ToStdout("  - drbd usermode helper: %s", result["drbd_usermode_helper"])
444
  ToStdout("  - file storage path: %s", result["file_storage_dir"])
445
  ToStdout("  - shared file storage path: %s",
446
           result["shared_file_storage_dir"])
447
  ToStdout("  - maintenance of node health: %s",
448
           result["maintain_node_health"])
449
  ToStdout("  - uid pool: %s",
450
            uidpool.FormatUidPool(result["uid_pool"],
451
                                  roman=opts.roman_integers))
452
  ToStdout("  - default instance allocator: %s", result["default_iallocator"])
453
  ToStdout("  - primary ip version: %d", result["primary_ip_version"])
454
  ToStdout("  - preallocation wipe disks: %s", result["prealloc_wipe_disks"])
455
  ToStdout("  - OS search path: %s", utils.CommaJoin(constants.OS_SEARCH_PATH))
456

    
457
  ToStdout("Default node parameters:")
458
  _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers)
459

    
460
  ToStdout("Default instance parameters:")
461
  _PrintGroupedParams(result["beparams"], roman=opts.roman_integers)
462

    
463
  ToStdout("Default nic parameters:")
464
  _PrintGroupedParams(result["nicparams"], roman=opts.roman_integers)
465

    
466
  ToStdout("Default disk parameters:")
467
  _PrintGroupedParams(result["diskparams"], roman=opts.roman_integers)
468

    
469
  ToStdout("Instance policy - limits for instances:")
470
  for key in constants.IPOLICY_ISPECS:
471
    ToStdout("  - %s", key)
472
    _PrintGroupedParams(result["ipolicy"][key], roman=opts.roman_integers)
473
  ToStdout("  - enabled disk templates: %s",
474
           utils.CommaJoin(result["ipolicy"][constants.IPOLICY_DTS]))
475
  for key in constants.IPOLICY_PARAMETERS:
476
    ToStdout("  - %s: %s", key, result["ipolicy"][key])
477

    
478
  return 0
479

    
480

    
481
def ClusterCopyFile(opts, args):
482
  """Copy a file from master to some nodes.
483

484
  @param opts: the command line options selected by the user
485
  @type args: list
486
  @param args: should contain only one element, the path of
487
      the file to be copied
488
  @rtype: int
489
  @return: the desired exit code
490

491
  """
492
  filename = args[0]
493
  if not os.path.exists(filename):
494
    raise errors.OpPrereqError("No such filename '%s'" % filename,
495
                               errors.ECODE_INVAL)
496

    
497
  cl = GetClient()
498

    
499
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
500

    
501
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
502
                           secondary_ips=opts.use_replication_network,
503
                           nodegroup=opts.nodegroup)
504

    
505
  srun = ssh.SshRunner(cluster_name=cluster_name)
506
  for node in results:
507
    if not srun.CopyFileToNode(node, filename):
508
      ToStderr("Copy of file %s to node %s failed", filename, node)
509

    
510
  return 0
511

    
512

    
513
def RunClusterCommand(opts, args):
514
  """Run a command on some nodes.
515

516
  @param opts: the command line options selected by the user
517
  @type args: list
518
  @param args: should contain the command to be run and its arguments
519
  @rtype: int
520
  @return: the desired exit code
521

522
  """
523
  cl = GetClient()
524

    
525
  command = " ".join(args)
526

    
527
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
528

    
529
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
530
                                                    "master_node"])
531

    
532
  srun = ssh.SshRunner(cluster_name=cluster_name)
533

    
534
  # Make sure master node is at list end
535
  if master_node in nodes:
536
    nodes.remove(master_node)
537
    nodes.append(master_node)
538

    
539
  for name in nodes:
540
    result = srun.Run(name, "root", command)
541
    ToStdout("------------------------------------------------")
542
    if opts.show_machine_names:
543
      for line in result.output.splitlines():
544
        ToStdout("%s: %s", name, line)
545
    else:
546
      ToStdout("node: %s", name)
547
      ToStdout("%s", result.output)
548
    ToStdout("return code = %s", result.exit_code)
549

    
550
  return 0
551

    
552

    
553
def VerifyCluster(opts, args):
554
  """Verify integrity of cluster, performing various test on nodes.
555

556
  @param opts: the command line options selected by the user
557
  @type args: list
558
  @param args: should be an empty list
559
  @rtype: int
560
  @return: the desired exit code
561

562
  """
563
  skip_checks = []
564

    
565
  if opts.skip_nplusone_mem:
566
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
567

    
568
  cl = GetClient()
569

    
570
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
571
                               error_codes=opts.error_codes,
572
                               debug_simulate_errors=opts.simulate_errors,
573
                               skip_checks=skip_checks,
574
                               ignore_errors=opts.ignore_errors,
575
                               group_name=opts.nodegroup)
576
  result = SubmitOpCode(op, cl=cl, opts=opts)
577

    
578
  # Keep track of submitted jobs
579
  jex = JobExecutor(cl=cl, opts=opts)
580

    
581
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
582
    jex.AddJobId(None, status, job_id)
583

    
584
  results = jex.GetResults()
585

    
586
  (bad_jobs, bad_results) = \
587
    map(len,
588
        # Convert iterators to lists
589
        map(list,
590
            # Count errors
591
            map(compat.partial(itertools.ifilterfalse, bool),
592
                # Convert result to booleans in a tuple
593
                zip(*((job_success, len(op_results) == 1 and op_results[0])
594
                      for (job_success, op_results) in results)))))
595

    
596
  if bad_jobs == 0 and bad_results == 0:
597
    rcode = constants.EXIT_SUCCESS
598
  else:
599
    rcode = constants.EXIT_FAILURE
600
    if bad_jobs > 0:
601
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
602

    
603
  return rcode
604

    
605

    
606
def VerifyDisks(opts, args):
607
  """Verify integrity of cluster disks.
608

609
  @param opts: the command line options selected by the user
610
  @type args: list
611
  @param args: should be an empty list
612
  @rtype: int
613
  @return: the desired exit code
614

615
  """
616
  cl = GetClient()
617

    
618
  op = opcodes.OpClusterVerifyDisks()
619

    
620
  result = SubmitOpCode(op, cl=cl, opts=opts)
621

    
622
  # Keep track of submitted jobs
623
  jex = JobExecutor(cl=cl, opts=opts)
624

    
625
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
626
    jex.AddJobId(None, status, job_id)
627

    
628
  retcode = constants.EXIT_SUCCESS
629

    
630
  for (status, result) in jex.GetResults():
631
    if not status:
632
      ToStdout("Job failed: %s", result)
633
      continue
634

    
635
    ((bad_nodes, instances, missing), ) = result
636

    
637
    for node, text in bad_nodes.items():
638
      ToStdout("Error gathering data on node %s: %s",
639
               node, utils.SafeEncode(text[-400:]))
640
      retcode = constants.EXIT_FAILURE
641
      ToStdout("You need to fix these nodes first before fixing instances")
642

    
643
    for iname in instances:
644
      if iname in missing:
645
        continue
646
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
647
      try:
648
        ToStdout("Activating disks for instance '%s'", iname)
649
        SubmitOpCode(op, opts=opts, cl=cl)
650
      except errors.GenericError, err:
651
        nret, msg = FormatError(err)
652
        retcode |= nret
653
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
654

    
655
    if missing:
656
      for iname, ival in missing.iteritems():
657
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
658
        if all_missing:
659
          ToStdout("Instance %s cannot be verified as it lives on"
660
                   " broken nodes", iname)
661
        else:
662
          ToStdout("Instance %s has missing logical volumes:", iname)
663
          ival.sort()
664
          for node, vol in ival:
665
            if node in bad_nodes:
666
              ToStdout("\tbroken node %s /dev/%s", node, vol)
667
            else:
668
              ToStdout("\t%s /dev/%s", node, vol)
669

    
670
      ToStdout("You need to replace or recreate disks for all the above"
671
               " instances if this message persists after fixing broken nodes.")
672
      retcode = constants.EXIT_FAILURE
673

    
674
  return retcode
675

    
676

    
677
def RepairDiskSizes(opts, args):
678
  """Verify sizes of cluster disks.
679

680
  @param opts: the command line options selected by the user
681
  @type args: list
682
  @param args: optional list of instances to restrict check to
683
  @rtype: int
684
  @return: the desired exit code
685

686
  """
687
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
688
  SubmitOpCode(op, opts=opts)
689

    
690

    
691
@UsesRPC
692
def MasterFailover(opts, args):
693
  """Failover the master node.
694

695
  This command, when run on a non-master node, will cause the current
696
  master to cease being master, and the non-master to become new
697
  master.
698

699
  @param opts: the command line options selected by the user
700
  @type args: list
701
  @param args: should be an empty list
702
  @rtype: int
703
  @return: the desired exit code
704

705
  """
706
  if opts.no_voting:
707
    usertext = ("This will perform the failover even if most other nodes"
708
                " are down, or if this node is outdated. This is dangerous"
709
                " as it can lead to a non-consistent cluster. Check the"
710
                " gnt-cluster(8) man page before proceeding. Continue?")
711
    if not AskUser(usertext):
712
      return 1
713

    
714
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
715

    
716

    
717
def MasterPing(opts, args):
718
  """Checks if the master is alive.
719

720
  @param opts: the command line options selected by the user
721
  @type args: list
722
  @param args: should be an empty list
723
  @rtype: int
724
  @return: the desired exit code
725

726
  """
727
  try:
728
    cl = GetClient()
729
    cl.QueryClusterInfo()
730
    return 0
731
  except Exception: # pylint: disable=W0703
732
    return 1
733

    
734

    
735
def SearchTags(opts, args):
736
  """Searches the tags on all the cluster.
737

738
  @param opts: the command line options selected by the user
739
  @type args: list
740
  @param args: should contain only one element, the tag pattern
741
  @rtype: int
742
  @return: the desired exit code
743

744
  """
745
  op = opcodes.OpTagsSearch(pattern=args[0])
746
  result = SubmitOpCode(op, opts=opts)
747
  if not result:
748
    return 1
749
  result = list(result)
750
  result.sort()
751
  for path, tag in result:
752
    ToStdout("%s %s", path, tag)
753

    
754

    
755
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
756
  """Reads and verifies an X509 certificate.
757

758
  @type cert_filename: string
759
  @param cert_filename: the path of the file containing the certificate to
760
                        verify encoded in PEM format
761
  @type verify_private_key: bool
762
  @param verify_private_key: whether to verify the private key in addition to
763
                             the public certificate
764
  @rtype: string
765
  @return: a string containing the PEM-encoded certificate.
766

767
  """
768
  try:
769
    pem = utils.ReadFile(cert_filename)
770
  except IOError, err:
771
    raise errors.X509CertError(cert_filename,
772
                               "Unable to read certificate: %s" % str(err))
773

    
774
  try:
775
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
776
  except Exception, err:
777
    raise errors.X509CertError(cert_filename,
778
                               "Unable to load certificate: %s" % str(err))
779

    
780
  if verify_private_key:
781
    try:
782
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
783
    except Exception, err:
784
      raise errors.X509CertError(cert_filename,
785
                                 "Unable to load private key: %s" % str(err))
786

    
787
  return pem
788

    
789

    
790
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
791
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
792
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
793
                 cds_filename, force):
794
  """Renews cluster certificates, keys and secrets.
795

796
  @type new_cluster_cert: bool
797
  @param new_cluster_cert: Whether to generate a new cluster certificate
798
  @type new_rapi_cert: bool
799
  @param new_rapi_cert: Whether to generate a new RAPI certificate
800
  @type rapi_cert_filename: string
801
  @param rapi_cert_filename: Path to file containing new RAPI certificate
802
  @type new_spice_cert: bool
803
  @param new_spice_cert: Whether to generate a new SPICE certificate
804
  @type spice_cert_filename: string
805
  @param spice_cert_filename: Path to file containing new SPICE certificate
806
  @type spice_cacert_filename: string
807
  @param spice_cacert_filename: Path to file containing the certificate of the
808
                                CA that signed the SPICE certificate
809
  @type new_confd_hmac_key: bool
810
  @param new_confd_hmac_key: Whether to generate a new HMAC key
811
  @type new_cds: bool
812
  @param new_cds: Whether to generate a new cluster domain secret
813
  @type cds_filename: string
814
  @param cds_filename: Path to file containing new cluster domain secret
815
  @type force: bool
816
  @param force: Whether to ask user for confirmation
817

818
  """
819
  if new_rapi_cert and rapi_cert_filename:
820
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
821
             " options can be specified at the same time.")
822
    return 1
823

    
824
  if new_cds and cds_filename:
825
    ToStderr("Only one of the --new-cluster-domain-secret and"
826
             " --cluster-domain-secret options can be specified at"
827
             " the same time.")
828
    return 1
829

    
830
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
831
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
832
             " and --spice-ca-certificate must not be used.")
833
    return 1
834

    
835
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
836
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
837
             " specified.")
838
    return 1
839

    
840
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
841
  try:
842
    if rapi_cert_filename:
843
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
844
    if spice_cert_filename:
845
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
846
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
847
  except errors.X509CertError, err:
848
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
849
    return 1
850

    
851
  if cds_filename:
852
    try:
853
      cds = utils.ReadFile(cds_filename)
854
    except Exception, err: # pylint: disable=W0703
855
      ToStderr("Can't load new cluster domain secret from %s: %s" %
856
               (cds_filename, str(err)))
857
      return 1
858
  else:
859
    cds = None
860

    
861
  if not force:
862
    usertext = ("This requires all daemons on all nodes to be restarted and"
863
                " may take some time. Continue?")
864
    if not AskUser(usertext):
865
      return 1
866

    
867
  def _RenewCryptoInner(ctx):
868
    ctx.feedback_fn("Updating certificates and keys")
869
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
870
                                    new_rapi_cert,
871
                                    new_spice_cert,
872
                                    new_confd_hmac_key,
873
                                    new_cds,
874
                                    rapi_cert_pem=rapi_cert_pem,
875
                                    spice_cert_pem=spice_cert_pem,
876
                                    spice_cacert_pem=spice_cacert_pem,
877
                                    cds=cds)
878

    
879
    files_to_copy = []
880

    
881
    if new_cluster_cert:
882
      files_to_copy.append(constants.NODED_CERT_FILE)
883

    
884
    if new_rapi_cert or rapi_cert_pem:
885
      files_to_copy.append(constants.RAPI_CERT_FILE)
886

    
887
    if new_spice_cert or spice_cert_pem:
888
      files_to_copy.append(constants.SPICE_CERT_FILE)
889
      files_to_copy.append(constants.SPICE_CACERT_FILE)
890

    
891
    if new_confd_hmac_key:
892
      files_to_copy.append(constants.CONFD_HMAC_KEY)
893

    
894
    if new_cds or cds:
895
      files_to_copy.append(constants.CLUSTER_DOMAIN_SECRET_FILE)
896

    
897
    if files_to_copy:
898
      for node_name in ctx.nonmaster_nodes:
899
        ctx.feedback_fn("Copying %s to %s" %
900
                        (", ".join(files_to_copy), node_name))
901
        for file_name in files_to_copy:
902
          ctx.ssh.CopyFileToNode(node_name, file_name)
903

    
904
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
905

    
906
  ToStdout("All requested certificates and keys have been replaced."
907
           " Running \"gnt-cluster verify\" now is recommended.")
908

    
909
  return 0
910

    
911

    
912
def RenewCrypto(opts, args):
913
  """Renews cluster certificates, keys and secrets.
914

915
  """
916
  return _RenewCrypto(opts.new_cluster_cert,
917
                      opts.new_rapi_cert,
918
                      opts.rapi_cert,
919
                      opts.new_spice_cert,
920
                      opts.spice_cert,
921
                      opts.spice_cacert,
922
                      opts.new_confd_hmac_key,
923
                      opts.new_cluster_domain_secret,
924
                      opts.cluster_domain_secret,
925
                      opts.force)
926

    
927

    
928
def SetClusterParams(opts, args):
929
  """Modify the cluster.
930

931
  @param opts: the command line options selected by the user
932
  @type args: list
933
  @param args: should be an empty list
934
  @rtype: int
935
  @return: the desired exit code
936

937
  """
938
  if not (not opts.lvm_storage or opts.vg_name or
939
          not opts.drbd_storage or opts.drbd_helper or
940
          opts.enabled_hypervisors or opts.hvparams or
941
          opts.beparams or opts.nicparams or
942
          opts.ndparams or opts.diskparams or
943
          opts.candidate_pool_size is not None or
944
          opts.uid_pool is not None or
945
          opts.maintain_node_health is not None or
946
          opts.add_uids is not None or
947
          opts.remove_uids is not None or
948
          opts.default_iallocator is not None or
949
          opts.reserved_lvs is not None or
950
          opts.master_netdev is not None or
951
          opts.master_netmask is not None or
952
          opts.use_external_mip_script is not None or
953
          opts.prealloc_wipe_disks is not None or
954
          opts.hv_state or
955
          opts.disk_state or
956
          opts.ispecs_mem_size or
957
          opts.ispecs_cpu_count or
958
          opts.ispecs_disk_count or
959
          opts.ispecs_disk_size or
960
          opts.ispecs_nic_count or
961
          opts.ipolicy_disk_templates is not None or
962
          opts.ipolicy_vcpu_ratio is not None or
963
          opts.ipolicy_spindle_ratio is not None):
964
    ToStderr("Please give at least one of the parameters.")
965
    return 1
966

    
967
  vg_name = opts.vg_name
968
  if not opts.lvm_storage and opts.vg_name:
969
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
970
    return 1
971

    
972
  if not opts.lvm_storage:
973
    vg_name = ""
974

    
975
  drbd_helper = opts.drbd_helper
976
  if not opts.drbd_storage and opts.drbd_helper:
977
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
978
    return 1
979

    
980
  if not opts.drbd_storage:
981
    drbd_helper = ""
982

    
983
  hvlist = opts.enabled_hypervisors
984
  if hvlist is not None:
985
    hvlist = hvlist.split(",")
986

    
987
  # a list of (name, dict) we can pass directly to dict() (or [])
988
  hvparams = dict(opts.hvparams)
989
  for hv_params in hvparams.values():
990
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
991

    
992
  diskparams = dict(opts.diskparams)
993

    
994
  for dt_params in diskparams.values():
995
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
996

    
997
  beparams = opts.beparams
998
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
999

    
1000
  nicparams = opts.nicparams
1001
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1002

    
1003
  ndparams = opts.ndparams
1004
  if ndparams is not None:
1005
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1006

    
1007
  ipolicy = CreateIPolicyFromOpts(
1008
    ispecs_mem_size=opts.ispecs_mem_size,
1009
    ispecs_cpu_count=opts.ispecs_cpu_count,
1010
    ispecs_disk_count=opts.ispecs_disk_count,
1011
    ispecs_disk_size=opts.ispecs_disk_size,
1012
    ispecs_nic_count=opts.ispecs_nic_count,
1013
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1014
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1015
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1016
    )
1017

    
1018
  mnh = opts.maintain_node_health
1019

    
1020
  uid_pool = opts.uid_pool
1021
  if uid_pool is not None:
1022
    uid_pool = uidpool.ParseUidPool(uid_pool)
1023

    
1024
  add_uids = opts.add_uids
1025
  if add_uids is not None:
1026
    add_uids = uidpool.ParseUidPool(add_uids)
1027

    
1028
  remove_uids = opts.remove_uids
1029
  if remove_uids is not None:
1030
    remove_uids = uidpool.ParseUidPool(remove_uids)
1031

    
1032
  if opts.reserved_lvs is not None:
1033
    if opts.reserved_lvs == "":
1034
      opts.reserved_lvs = []
1035
    else:
1036
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1037

    
1038
  if opts.master_netmask is not None:
1039
    try:
1040
      opts.master_netmask = int(opts.master_netmask)
1041
    except ValueError:
1042
      ToStderr("The --master-netmask option expects an int parameter.")
1043
      return 1
1044

    
1045
  ext_ip_script = opts.use_external_mip_script
1046

    
1047
  if opts.disk_state:
1048
    disk_state = utils.FlatToDict(opts.disk_state)
1049
  else:
1050
    disk_state = {}
1051

    
1052
  hv_state = dict(opts.hv_state)
1053

    
1054
  op = opcodes.OpClusterSetParams(vg_name=vg_name,
1055
                                  drbd_helper=drbd_helper,
1056
                                  enabled_hypervisors=hvlist,
1057
                                  hvparams=hvparams,
1058
                                  os_hvp=None,
1059
                                  beparams=beparams,
1060
                                  nicparams=nicparams,
1061
                                  ndparams=ndparams,
1062
                                  diskparams=diskparams,
1063
                                  ipolicy=ipolicy,
1064
                                  candidate_pool_size=opts.candidate_pool_size,
1065
                                  maintain_node_health=mnh,
1066
                                  uid_pool=uid_pool,
1067
                                  add_uids=add_uids,
1068
                                  remove_uids=remove_uids,
1069
                                  default_iallocator=opts.default_iallocator,
1070
                                  prealloc_wipe_disks=opts.prealloc_wipe_disks,
1071
                                  master_netdev=opts.master_netdev,
1072
                                  master_netmask=opts.master_netmask,
1073
                                  reserved_lvs=opts.reserved_lvs,
1074
                                  use_external_mip_script=ext_ip_script,
1075
                                  hv_state=hv_state,
1076
                                  disk_state=disk_state,
1077
                                  )
1078
  SubmitOrSend(op, opts)
1079
  return 0
1080

    
1081

    
1082
def QueueOps(opts, args):
1083
  """Queue operations.
1084

1085
  @param opts: the command line options selected by the user
1086
  @type args: list
1087
  @param args: should contain only one element, the subcommand
1088
  @rtype: int
1089
  @return: the desired exit code
1090

1091
  """
1092
  command = args[0]
1093
  client = GetClient()
1094
  if command in ("drain", "undrain"):
1095
    drain_flag = command == "drain"
1096
    client.SetQueueDrainFlag(drain_flag)
1097
  elif command == "info":
1098
    result = client.QueryConfigValues(["drain_flag"])
1099
    if result[0]:
1100
      val = "set"
1101
    else:
1102
      val = "unset"
1103
    ToStdout("The drain flag is %s" % val)
1104
  else:
1105
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1106
                               errors.ECODE_INVAL)
1107

    
1108
  return 0
1109

    
1110

    
1111
def _ShowWatcherPause(until):
1112
  if until is None or until < time.time():
1113
    ToStdout("The watcher is not paused.")
1114
  else:
1115
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1116

    
1117

    
1118
def WatcherOps(opts, args):
1119
  """Watcher operations.
1120

1121
  @param opts: the command line options selected by the user
1122
  @type args: list
1123
  @param args: should contain only one element, the subcommand
1124
  @rtype: int
1125
  @return: the desired exit code
1126

1127
  """
1128
  command = args[0]
1129
  client = GetClient()
1130

    
1131
  if command == "continue":
1132
    client.SetWatcherPause(None)
1133
    ToStdout("The watcher is no longer paused.")
1134

    
1135
  elif command == "pause":
1136
    if len(args) < 2:
1137
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1138

    
1139
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1140
    _ShowWatcherPause(result)
1141

    
1142
  elif command == "info":
1143
    result = client.QueryConfigValues(["watcher_pause"])
1144
    _ShowWatcherPause(result[0])
1145

    
1146
  else:
1147
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1148
                               errors.ECODE_INVAL)
1149

    
1150
  return 0
1151

    
1152

    
1153
def _OobPower(opts, node_list, power):
1154
  """Puts the node in the list to desired power state.
1155

1156
  @param opts: The command line options selected by the user
1157
  @param node_list: The list of nodes to operate on
1158
  @param power: True if they should be powered on, False otherwise
1159
  @return: The success of the operation (none failed)
1160

1161
  """
1162
  if power:
1163
    command = constants.OOB_POWER_ON
1164
  else:
1165
    command = constants.OOB_POWER_OFF
1166

    
1167
  op = opcodes.OpOobCommand(node_names=node_list,
1168
                            command=command,
1169
                            ignore_status=True,
1170
                            timeout=opts.oob_timeout,
1171
                            power_delay=opts.power_delay)
1172
  result = SubmitOpCode(op, opts=opts)
1173
  errs = 0
1174
  for node_result in result:
1175
    (node_tuple, data_tuple) = node_result
1176
    (_, node_name) = node_tuple
1177
    (data_status, _) = data_tuple
1178
    if data_status != constants.RS_NORMAL:
1179
      assert data_status != constants.RS_UNAVAIL
1180
      errs += 1
1181
      ToStderr("There was a problem changing power for %s, please investigate",
1182
               node_name)
1183

    
1184
  if errs > 0:
1185
    return False
1186

    
1187
  return True
1188

    
1189

    
1190
def _InstanceStart(opts, inst_list, start, no_remember=False):
1191
  """Puts the instances in the list to desired state.
1192

1193
  @param opts: The command line options selected by the user
1194
  @param inst_list: The list of instances to operate on
1195
  @param start: True if they should be started, False for shutdown
1196
  @param no_remember: If the instance state should be remembered
1197
  @return: The success of the operation (none failed)
1198

1199
  """
1200
  if start:
1201
    opcls = opcodes.OpInstanceStartup
1202
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1203
  else:
1204
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1205
                           timeout=opts.shutdown_timeout,
1206
                           no_remember=no_remember)
1207
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1208

    
1209
  jex = JobExecutor(opts=opts)
1210

    
1211
  for inst in inst_list:
1212
    ToStdout("Submit %s of instance %s", text_submit, inst)
1213
    op = opcls(instance_name=inst)
1214
    jex.QueueJob(inst, op)
1215

    
1216
  results = jex.GetResults()
1217
  bad_cnt = len([1 for (success, _) in results if not success])
1218

    
1219
  if bad_cnt == 0:
1220
    ToStdout("All instances have been %s successfully", text_success)
1221
  else:
1222
    ToStderr("There were errors while %s instances:\n"
1223
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1224
             len(results))
1225
    return False
1226

    
1227
  return True
1228

    
1229

    
1230
class _RunWhenNodesReachableHelper:
1231
  """Helper class to make shared internal state sharing easier.
1232

1233
  @ivar success: Indicates if all action_cb calls were successful
1234

1235
  """
1236
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1237
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1238
    """Init the object.
1239

1240
    @param node_list: The list of nodes to be reachable
1241
    @param action_cb: Callback called when a new host is reachable
1242
    @type node2ip: dict
1243
    @param node2ip: Node to ip mapping
1244
    @param port: The port to use for the TCP ping
1245
    @param feedback_fn: The function used for feedback
1246
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1247
    @param _sleep_fn: Function to sleep (for unittest use only)
1248

1249
    """
1250
    self.down = set(node_list)
1251
    self.up = set()
1252
    self.node2ip = node2ip
1253
    self.success = True
1254
    self.action_cb = action_cb
1255
    self.port = port
1256
    self.feedback_fn = feedback_fn
1257
    self._ping_fn = _ping_fn
1258
    self._sleep_fn = _sleep_fn
1259

    
1260
  def __call__(self):
1261
    """When called we run action_cb.
1262

1263
    @raises utils.RetryAgain: When there are still down nodes
1264

1265
    """
1266
    if not self.action_cb(self.up):
1267
      self.success = False
1268

    
1269
    if self.down:
1270
      raise utils.RetryAgain()
1271
    else:
1272
      return self.success
1273

    
1274
  def Wait(self, secs):
1275
    """Checks if a host is up or waits remaining seconds.
1276

1277
    @param secs: The secs remaining
1278

1279
    """
1280
    start = time.time()
1281
    for node in self.down:
1282
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1283
                       live_port_needed=True):
1284
        self.feedback_fn("Node %s became available" % node)
1285
        self.up.add(node)
1286
        self.down -= self.up
1287
        # If we have a node available there is the possibility to run the
1288
        # action callback successfully, therefore we don't wait and return
1289
        return
1290

    
1291
    self._sleep_fn(max(0.0, start + secs - time.time()))
1292

    
1293

    
1294
def _RunWhenNodesReachable(node_list, action_cb, interval):
1295
  """Run action_cb when nodes become reachable.
1296

1297
  @param node_list: The list of nodes to be reachable
1298
  @param action_cb: Callback called when a new host is reachable
1299
  @param interval: The earliest time to retry
1300

1301
  """
1302
  client = GetClient()
1303
  cluster_info = client.QueryClusterInfo()
1304
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1305
    family = netutils.IPAddress.family
1306
  else:
1307
    family = netutils.IP6Address.family
1308

    
1309
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1310
                 for node in node_list)
1311

    
1312
  port = netutils.GetDaemonPort(constants.NODED)
1313
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1314
                                        ToStdout)
1315

    
1316
  try:
1317
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1318
                       wait_fn=helper.Wait)
1319
  except utils.RetryTimeout:
1320
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1321
             " again:\n  - %s", "  - ".join(helper.down))
1322
    return False
1323

    
1324

    
1325
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1326
                          _instance_start_fn=_InstanceStart):
1327
  """Start the instances conditional based on node_states.
1328

1329
  @param opts: The command line options selected by the user
1330
  @param inst_map: A dict of inst -> nodes mapping
1331
  @param nodes_online: A list of nodes online
1332
  @param _instance_start_fn: Callback to start instances (unittest use only)
1333
  @return: Success of the operation on all instances
1334

1335
  """
1336
  start_inst_list = []
1337
  for (inst, nodes) in inst_map.items():
1338
    if not (nodes - nodes_online):
1339
      # All nodes the instance lives on are back online
1340
      start_inst_list.append(inst)
1341

    
1342
  for inst in start_inst_list:
1343
    del inst_map[inst]
1344

    
1345
  if start_inst_list:
1346
    return _instance_start_fn(opts, start_inst_list, True)
1347

    
1348
  return True
1349

    
1350

    
1351
def _EpoOn(opts, full_node_list, node_list, inst_map):
1352
  """Does the actual power on.
1353

1354
  @param opts: The command line options selected by the user
1355
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1356
                         OOB)
1357
  @param node_list: The list of nodes to operate on (all need to support OOB)
1358
  @param inst_map: A dict of inst -> nodes mapping
1359
  @return: The desired exit status
1360

1361
  """
1362
  if node_list and not _OobPower(opts, node_list, False):
1363
    ToStderr("Not all nodes seem to get back up, investigate and start"
1364
             " manually if needed")
1365

    
1366
  # Wait for the nodes to be back up
1367
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1368

    
1369
  ToStdout("Waiting until all nodes are available again")
1370
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1371
    ToStderr("Please investigate and start stopped instances manually")
1372
    return constants.EXIT_FAILURE
1373

    
1374
  return constants.EXIT_SUCCESS
1375

    
1376

    
1377
def _EpoOff(opts, node_list, inst_map):
1378
  """Does the actual power off.
1379

1380
  @param opts: The command line options selected by the user
1381
  @param node_list: The list of nodes to operate on (all need to support OOB)
1382
  @param inst_map: A dict of inst -> nodes mapping
1383
  @return: The desired exit status
1384

1385
  """
1386
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1387
    ToStderr("Please investigate and stop instances manually before continuing")
1388
    return constants.EXIT_FAILURE
1389

    
1390
  if not node_list:
1391
    return constants.EXIT_SUCCESS
1392

    
1393
  if _OobPower(opts, node_list, False):
1394
    return constants.EXIT_SUCCESS
1395
  else:
1396
    return constants.EXIT_FAILURE
1397

    
1398

    
1399
def Epo(opts, args):
1400
  """EPO operations.
1401

1402
  @param opts: the command line options selected by the user
1403
  @type args: list
1404
  @param args: should contain only one element, the subcommand
1405
  @rtype: int
1406
  @return: the desired exit code
1407

1408
  """
1409
  if opts.groups and opts.show_all:
1410
    ToStderr("Only one of --groups or --all are allowed")
1411
    return constants.EXIT_FAILURE
1412
  elif args and opts.show_all:
1413
    ToStderr("Arguments in combination with --all are not allowed")
1414
    return constants.EXIT_FAILURE
1415

    
1416
  client = GetClient()
1417

    
1418
  if opts.groups:
1419
    node_query_list = itertools.chain(*client.QueryGroups(names=args,
1420
                                                          fields=["node_list"],
1421
                                                          use_locking=False))
1422
  else:
1423
    node_query_list = args
1424

    
1425
  result = client.QueryNodes(names=node_query_list,
1426
                             fields=["name", "master", "pinst_list",
1427
                                     "sinst_list", "powered", "offline"],
1428
                             use_locking=False)
1429
  node_list = []
1430
  inst_map = {}
1431
  for (idx, (node, master, pinsts, sinsts, powered,
1432
             offline)) in enumerate(result):
1433
    # Normalize the node_query_list as well
1434
    if not opts.show_all:
1435
      node_query_list[idx] = node
1436
    if not offline:
1437
      for inst in (pinsts + sinsts):
1438
        if inst in inst_map:
1439
          if not master:
1440
            inst_map[inst].add(node)
1441
        elif master:
1442
          inst_map[inst] = set()
1443
        else:
1444
          inst_map[inst] = set([node])
1445

    
1446
    if master and opts.on:
1447
      # We ignore the master for turning on the machines, in fact we are
1448
      # already operating on the master at this point :)
1449
      continue
1450
    elif master and not opts.show_all:
1451
      ToStderr("%s is the master node, please do a master-failover to another"
1452
               " node not affected by the EPO or use --all if you intend to"
1453
               " shutdown the whole cluster", node)
1454
      return constants.EXIT_FAILURE
1455
    elif powered is None:
1456
      ToStdout("Node %s does not support out-of-band handling, it can not be"
1457
               " handled in a fully automated manner", node)
1458
    elif powered == opts.on:
1459
      ToStdout("Node %s is already in desired power state, skipping", node)
1460
    elif not offline or (offline and powered):
1461
      node_list.append(node)
1462

    
1463
  if not opts.force and not ConfirmOperation(node_query_list, "nodes", "epo"):
1464
    return constants.EXIT_FAILURE
1465

    
1466
  if opts.on:
1467
    return _EpoOn(opts, node_query_list, node_list, inst_map)
1468
  else:
1469
    return _EpoOff(opts, node_list, inst_map)
1470

    
1471
commands = {
1472
  "init": (
1473
    InitCluster, [ArgHost(min=1, max=1)],
1474
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1475
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1476
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1477
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1478
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1479
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1480
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1481
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + INSTANCE_POLICY_OPTS,
1482
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1483
  "destroy": (
1484
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1485
    "", "Destroy cluster"),
1486
  "rename": (
1487
    RenameCluster, [ArgHost(min=1, max=1)],
1488
    [FORCE_OPT, DRY_RUN_OPT],
1489
    "<new_name>",
1490
    "Renames the cluster"),
1491
  "redist-conf": (
1492
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1493
    "", "Forces a push of the configuration file and ssconf files"
1494
    " to the nodes in the cluster"),
1495
  "verify": (
1496
    VerifyCluster, ARGS_NONE,
1497
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1498
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1499
    "", "Does a check on the cluster configuration"),
1500
  "verify-disks": (
1501
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1502
    "", "Does a check on the cluster disk status"),
1503
  "repair-disk-sizes": (
1504
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1505
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1506
  "master-failover": (
1507
    MasterFailover, ARGS_NONE, [NOVOTING_OPT],
1508
    "", "Makes the current node the master"),
1509
  "master-ping": (
1510
    MasterPing, ARGS_NONE, [],
1511
    "", "Checks if the master is alive"),
1512
  "version": (
1513
    ShowClusterVersion, ARGS_NONE, [],
1514
    "", "Shows the cluster version"),
1515
  "getmaster": (
1516
    ShowClusterMaster, ARGS_NONE, [],
1517
    "", "Shows the cluster master"),
1518
  "copyfile": (
1519
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1520
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1521
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1522
  "command": (
1523
    RunClusterCommand, [ArgCommand(min=1)],
1524
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT],
1525
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1526
  "info": (
1527
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1528
    "[--roman]", "Show cluster configuration"),
1529
  "list-tags": (
1530
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1531
  "add-tags": (
1532
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1533
    "tag...", "Add tags to the cluster"),
1534
  "remove-tags": (
1535
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1536
    "tag...", "Remove tags from the cluster"),
1537
  "search-tags": (
1538
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1539
    "Searches the tags on all objects on"
1540
    " the cluster for a given pattern (regex)"),
1541
  "queue": (
1542
    QueueOps,
1543
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1544
    [], "drain|undrain|info", "Change queue properties"),
1545
  "watcher": (
1546
    WatcherOps,
1547
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1548
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1549
    [],
1550
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1551
  "modify": (
1552
    SetClusterParams, ARGS_NONE,
1553
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1554
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1555
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1556
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1557
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1558
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1559
     DISK_STATE_OPT, SUBMIT_OPT] +
1560
    INSTANCE_POLICY_OPTS,
1561
    "[opts...]",
1562
    "Alters the parameters of the cluster"),
1563
  "renew-crypto": (
1564
    RenewCrypto, ARGS_NONE,
1565
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1566
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1567
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1568
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1569
    "[opts...]",
1570
    "Renews cluster certificates, keys and secrets"),
1571
  "epo": (
1572
    Epo, [ArgUnknown()],
1573
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1574
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1575
    "[opts...] [args]",
1576
    "Performs an emergency power-off on given args"),
1577
  "activate-master-ip": (
1578
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1579
  "deactivate-master-ip": (
1580
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1581
    "Deactivates the master IP"),
1582
  }
1583

    
1584

    
1585
#: dictionary with aliases for commands
1586
aliases = {
1587
  "masterfailover": "master-failover",
1588
  "show": "info",
1589
}
1590

    
1591

    
1592
def Main():
1593
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1594
                     aliases=aliases)