Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 912737ba

History | View | Annotate | Download (52.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45
from ganeti import pathutils
46

    
47

    
48
ON_OPT = cli_option("--on", default=False,
49
                    action="store_true", dest="on",
50
                    help="Recover from an EPO")
51

    
52
GROUPS_OPT = cli_option("--groups", default=False,
53
                        action="store_true", dest="groups",
54
                        help="Arguments are node groups instead of nodes")
55

    
56
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
57
                            help="Override interactive check for --no-voting",
58
                            default=False, action="store_true")
59

    
60
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
61
_EPO_PING_TIMEOUT = 1 # 1 second
62
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
63

    
64

    
65
def _CheckNoLvmStorageOptDeprecated(opts):
66
  """Checks if the legacy option '--no-lvm-storage' is used.
67

68
  """
69
  if not opts.lvm_storage:
70
    ToStderr("The option --no-lvm-storage is no longer supported. If you want"
71
             " to disable lvm-based storage cluster-wide, use the option"
72
             " --enabled-disk-templates to disable all of these lvm-base disk "
73
             "  templates: %s" %
74
             utils.CommaJoin(utils.GetLvmDiskTemplates()))
75
    return 1
76

    
77

    
78
@UsesRPC
79
def InitCluster(opts, args):
80
  """Initialize the cluster.
81

82
  @param opts: the command line options selected by the user
83
  @type args: list
84
  @param args: should contain only one element, the desired
85
      cluster name
86
  @rtype: int
87
  @return: the desired exit code
88

89
  """
90
  if _CheckNoLvmStorageOptDeprecated(opts):
91
    return 1
92
  enabled_disk_templates = opts.enabled_disk_templates
93
  if enabled_disk_templates:
94
    enabled_disk_templates = enabled_disk_templates.split(",")
95
  else:
96
    enabled_disk_templates = list(constants.DEFAULT_ENABLED_DISK_TEMPLATES)
97

    
98
  vg_name = None
99
  if opts.vg_name is not None:
100
    vg_name = opts.vg_name
101
    if vg_name:
102
      if not utils.IsLvmEnabled(enabled_disk_templates):
103
        ToStdout("You specified a volume group with --vg-name, but you did not"
104
                 " enable any disk template that uses lvm.")
105
    else:
106
      if utils.IsLvmEnabled(enabled_disk_templates):
107
        ToStderr("LVM disk templates are enabled, but vg name not set.")
108
        return 1
109
  else:
110
    if utils.IsLvmEnabled(enabled_disk_templates):
111
      vg_name = constants.DEFAULT_VG
112

    
113
  if not opts.drbd_storage and opts.drbd_helper:
114
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
115
    return 1
116

    
117
  drbd_helper = opts.drbd_helper
118
  if opts.drbd_storage and not opts.drbd_helper:
119
    drbd_helper = constants.DEFAULT_DRBD_HELPER
120

    
121
  master_netdev = opts.master_netdev
122
  if master_netdev is None:
123
    master_netdev = constants.DEFAULT_BRIDGE
124

    
125
  hvlist = opts.enabled_hypervisors
126
  if hvlist is None:
127
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
128
  hvlist = hvlist.split(",")
129

    
130
  hvparams = dict(opts.hvparams)
131
  beparams = opts.beparams
132
  nicparams = opts.nicparams
133

    
134
  diskparams = dict(opts.diskparams)
135

    
136
  # check the disk template types here, as we cannot rely on the type check done
137
  # by the opcode parameter types
138
  diskparams_keys = set(diskparams.keys())
139
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
140
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
141
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
142
    return 1
143

    
144
  # prepare beparams dict
145
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
146
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
147

    
148
  # prepare nicparams dict
149
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
150
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
151

    
152
  # prepare ndparams dict
153
  if opts.ndparams is None:
154
    ndparams = dict(constants.NDC_DEFAULTS)
155
  else:
156
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
157
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
158

    
159
  # prepare hvparams dict
160
  for hv in constants.HYPER_TYPES:
161
    if hv not in hvparams:
162
      hvparams[hv] = {}
163
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
164
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
165

    
166
  # prepare diskparams dict
167
  for templ in constants.DISK_TEMPLATES:
168
    if templ not in diskparams:
169
      diskparams[templ] = {}
170
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
171
                                         diskparams[templ])
172
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
173

    
174
  # prepare ipolicy dict
175
  ipolicy = CreateIPolicyFromOpts(
176
    ispecs_mem_size=opts.ispecs_mem_size,
177
    ispecs_cpu_count=opts.ispecs_cpu_count,
178
    ispecs_disk_count=opts.ispecs_disk_count,
179
    ispecs_disk_size=opts.ispecs_disk_size,
180
    ispecs_nic_count=opts.ispecs_nic_count,
181
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
182
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
183
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
184
    fill_all=True)
185

    
186
  if opts.candidate_pool_size is None:
187
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
188

    
189
  if opts.mac_prefix is None:
190
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
191

    
192
  uid_pool = opts.uid_pool
193
  if uid_pool is not None:
194
    uid_pool = uidpool.ParseUidPool(uid_pool)
195

    
196
  if opts.prealloc_wipe_disks is None:
197
    opts.prealloc_wipe_disks = False
198

    
199
  external_ip_setup_script = opts.use_external_mip_script
200
  if external_ip_setup_script is None:
201
    external_ip_setup_script = False
202

    
203
  try:
204
    primary_ip_version = int(opts.primary_ip_version)
205
  except (ValueError, TypeError), err:
206
    ToStderr("Invalid primary ip version value: %s" % str(err))
207
    return 1
208

    
209
  master_netmask = opts.master_netmask
210
  try:
211
    if master_netmask is not None:
212
      master_netmask = int(master_netmask)
213
  except (ValueError, TypeError), err:
214
    ToStderr("Invalid master netmask value: %s" % str(err))
215
    return 1
216

    
217
  if opts.disk_state:
218
    disk_state = utils.FlatToDict(opts.disk_state)
219
  else:
220
    disk_state = {}
221

    
222
  hv_state = dict(opts.hv_state)
223

    
224
  bootstrap.InitCluster(cluster_name=args[0],
225
                        secondary_ip=opts.secondary_ip,
226
                        vg_name=vg_name,
227
                        mac_prefix=opts.mac_prefix,
228
                        master_netmask=master_netmask,
229
                        master_netdev=master_netdev,
230
                        file_storage_dir=opts.file_storage_dir,
231
                        shared_file_storage_dir=opts.shared_file_storage_dir,
232
                        enabled_hypervisors=hvlist,
233
                        hvparams=hvparams,
234
                        beparams=beparams,
235
                        nicparams=nicparams,
236
                        ndparams=ndparams,
237
                        diskparams=diskparams,
238
                        ipolicy=ipolicy,
239
                        candidate_pool_size=opts.candidate_pool_size,
240
                        modify_etc_hosts=opts.modify_etc_hosts,
241
                        modify_ssh_setup=opts.modify_ssh_setup,
242
                        maintain_node_health=opts.maintain_node_health,
243
                        drbd_helper=drbd_helper,
244
                        uid_pool=uid_pool,
245
                        default_iallocator=opts.default_iallocator,
246
                        primary_ip_version=primary_ip_version,
247
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
248
                        use_external_mip_script=external_ip_setup_script,
249
                        hv_state=hv_state,
250
                        disk_state=disk_state,
251
                        enabled_disk_templates=enabled_disk_templates,
252
                        )
253
  op = opcodes.OpClusterPostInit()
254
  SubmitOpCode(op, opts=opts)
255
  return 0
256

    
257

    
258
@UsesRPC
259
def DestroyCluster(opts, args):
260
  """Destroy the cluster.
261

262
  @param opts: the command line options selected by the user
263
  @type args: list
264
  @param args: should be an empty list
265
  @rtype: int
266
  @return: the desired exit code
267

268
  """
269
  if not opts.yes_do_it:
270
    ToStderr("Destroying a cluster is irreversible. If you really want"
271
             " destroy this cluster, supply the --yes-do-it option.")
272
    return 1
273

    
274
  op = opcodes.OpClusterDestroy()
275
  master = SubmitOpCode(op, opts=opts)
276
  # if we reached this, the opcode didn't fail; we can proceed to
277
  # shutdown all the daemons
278
  bootstrap.FinalizeClusterDestroy(master)
279
  return 0
280

    
281

    
282
def RenameCluster(opts, args):
283
  """Rename the cluster.
284

285
  @param opts: the command line options selected by the user
286
  @type args: list
287
  @param args: should contain only one element, the new cluster name
288
  @rtype: int
289
  @return: the desired exit code
290

291
  """
292
  cl = GetClient()
293

    
294
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
295

    
296
  new_name = args[0]
297
  if not opts.force:
298
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
299
                " connected over the network to the cluster name, the"
300
                " operation is very dangerous as the IP address will be"
301
                " removed from the node and the change may not go through."
302
                " Continue?") % (cluster_name, new_name)
303
    if not AskUser(usertext):
304
      return 1
305

    
306
  op = opcodes.OpClusterRename(name=new_name)
307
  result = SubmitOpCode(op, opts=opts, cl=cl)
308

    
309
  if result:
310
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
311

    
312
  return 0
313

    
314

    
315
def ActivateMasterIp(opts, args):
316
  """Activates the master IP.
317

318
  """
319
  op = opcodes.OpClusterActivateMasterIp()
320
  SubmitOpCode(op)
321
  return 0
322

    
323

    
324
def DeactivateMasterIp(opts, args):
325
  """Deactivates the master IP.
326

327
  """
328
  if not opts.confirm:
329
    usertext = ("This will disable the master IP. All the open connections to"
330
                " the master IP will be closed. To reach the master you will"
331
                " need to use its node IP."
332
                " Continue?")
333
    if not AskUser(usertext):
334
      return 1
335

    
336
  op = opcodes.OpClusterDeactivateMasterIp()
337
  SubmitOpCode(op)
338
  return 0
339

    
340

    
341
def RedistributeConfig(opts, args):
342
  """Forces push of the cluster configuration.
343

344
  @param opts: the command line options selected by the user
345
  @type args: list
346
  @param args: empty list
347
  @rtype: int
348
  @return: the desired exit code
349

350
  """
351
  op = opcodes.OpClusterRedistConf()
352
  SubmitOrSend(op, opts)
353
  return 0
354

    
355

    
356
def ShowClusterVersion(opts, args):
357
  """Write version of ganeti software to the standard output.
358

359
  @param opts: the command line options selected by the user
360
  @type args: list
361
  @param args: should be an empty list
362
  @rtype: int
363
  @return: the desired exit code
364

365
  """
366
  cl = GetClient(query=True)
367
  result = cl.QueryClusterInfo()
368
  ToStdout("Software version: %s", result["software_version"])
369
  ToStdout("Internode protocol: %s", result["protocol_version"])
370
  ToStdout("Configuration format: %s", result["config_version"])
371
  ToStdout("OS api version: %s", result["os_api_version"])
372
  ToStdout("Export interface: %s", result["export_version"])
373
  return 0
374

    
375

    
376
def ShowClusterMaster(opts, args):
377
  """Write name of master node to the standard output.
378

379
  @param opts: the command line options selected by the user
380
  @type args: list
381
  @param args: should be an empty list
382
  @rtype: int
383
  @return: the desired exit code
384

385
  """
386
  master = bootstrap.GetMaster()
387
  ToStdout(master)
388
  return 0
389

    
390

    
391
def _FormatGroupedParams(paramsdict, roman=False):
392
  """Format Grouped parameters (be, nic, disk) by group.
393

394
  @type paramsdict: dict of dicts
395
  @param paramsdict: {group: {param: value, ...}, ...}
396
  @rtype: dict of dicts
397
  @return: copy of the input dictionaries with strings as values
398

399
  """
400
  ret = {}
401
  for (item, val) in paramsdict.items():
402
    if isinstance(val, dict):
403
      ret[item] = _FormatGroupedParams(val, roman=roman)
404
    elif roman and isinstance(val, int):
405
      ret[item] = compat.TryToRoman(val)
406
    else:
407
      ret[item] = str(val)
408
  return ret
409

    
410

    
411
def ShowClusterConfig(opts, args):
412
  """Shows cluster information.
413

414
  @param opts: the command line options selected by the user
415
  @type args: list
416
  @param args: should be an empty list
417
  @rtype: int
418
  @return: the desired exit code
419

420
  """
421
  cl = GetClient(query=True)
422
  result = cl.QueryClusterInfo()
423

    
424
  if result["tags"]:
425
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
426
  else:
427
    tags = "(none)"
428
  if result["reserved_lvs"]:
429
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
430
  else:
431
    reserved_lvs = "(none)"
432

    
433
  info = [
434
    ("Cluster name", result["name"]),
435
    ("Cluster UUID", result["uuid"]),
436

    
437
    ("Creation time", utils.FormatTime(result["ctime"])),
438
    ("Modification time", utils.FormatTime(result["mtime"])),
439

    
440
    ("Master node", result["master"]),
441

    
442
    ("Architecture (this node)",
443
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
444

    
445
    ("Tags", tags),
446

    
447
    ("Default hypervisor", result["default_hypervisor"]),
448
    ("Enabled hypervisors",
449
     utils.CommaJoin(result["enabled_hypervisors"])),
450

    
451
    ("Hypervisor parameters", _FormatGroupedParams(result["hvparams"])),
452

    
453
    ("OS-specific hypervisor parameters",
454
     _FormatGroupedParams(result["os_hvp"])),
455

    
456
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
457

    
458
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
459
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
460

    
461
    ("Cluster parameters", [
462
      ("candidate pool size",
463
       compat.TryToRoman(result["candidate_pool_size"],
464
                         convert=opts.roman_integers)),
465
      ("master netdev", result["master_netdev"]),
466
      ("master netmask", result["master_netmask"]),
467
      ("use external master IP address setup script",
468
       result["use_external_mip_script"]),
469
      ("lvm volume group", result["volume_group_name"]),
470
      ("lvm reserved volumes", reserved_lvs),
471
      ("drbd usermode helper", result["drbd_usermode_helper"]),
472
      ("file storage path", result["file_storage_dir"]),
473
      ("shared file storage path", result["shared_file_storage_dir"]),
474
      ("maintenance of node health", result["maintain_node_health"]),
475
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
476
      ("default instance allocator", result["default_iallocator"]),
477
      ("primary ip version", result["primary_ip_version"]),
478
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
479
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
480
      ("ExtStorage Providers search path",
481
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
482
      ("enabled disk templates",
483
       utils.CommaJoin(result["enabled_disk_templates"])),
484
      ]),
485

    
486
    ("Default node parameters",
487
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
488

    
489
    ("Default instance parameters",
490
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
491

    
492
    ("Default nic parameters",
493
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
494

    
495
    ("Default disk parameters",
496
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
497

    
498
    ("Instance policy - limits for instances",
499
     FormatPolicyInfo(result["ipolicy"], None, True)),
500
    ]
501

    
502
  PrintGenericInfo(info)
503
  return 0
504

    
505

    
506
def ClusterCopyFile(opts, args):
507
  """Copy a file from master to some nodes.
508

509
  @param opts: the command line options selected by the user
510
  @type args: list
511
  @param args: should contain only one element, the path of
512
      the file to be copied
513
  @rtype: int
514
  @return: the desired exit code
515

516
  """
517
  filename = args[0]
518
  if not os.path.exists(filename):
519
    raise errors.OpPrereqError("No such filename '%s'" % filename,
520
                               errors.ECODE_INVAL)
521

    
522
  cl = GetClient()
523

    
524
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
525

    
526
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
527
                           secondary_ips=opts.use_replication_network,
528
                           nodegroup=opts.nodegroup)
529

    
530
  srun = ssh.SshRunner(cluster_name)
531
  for node in results:
532
    if not srun.CopyFileToNode(node, filename):
533
      ToStderr("Copy of file %s to node %s failed", filename, node)
534

    
535
  return 0
536

    
537

    
538
def RunClusterCommand(opts, args):
539
  """Run a command on some nodes.
540

541
  @param opts: the command line options selected by the user
542
  @type args: list
543
  @param args: should contain the command to be run and its arguments
544
  @rtype: int
545
  @return: the desired exit code
546

547
  """
548
  cl = GetClient()
549

    
550
  command = " ".join(args)
551

    
552
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
553

    
554
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
555
                                                    "master_node"])
556

    
557
  srun = ssh.SshRunner(cluster_name=cluster_name)
558

    
559
  # Make sure master node is at list end
560
  if master_node in nodes:
561
    nodes.remove(master_node)
562
    nodes.append(master_node)
563

    
564
  for name in nodes:
565
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
566

    
567
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
568
      # Do not output anything for successful commands
569
      continue
570

    
571
    ToStdout("------------------------------------------------")
572
    if opts.show_machine_names:
573
      for line in result.output.splitlines():
574
        ToStdout("%s: %s", name, line)
575
    else:
576
      ToStdout("node: %s", name)
577
      ToStdout("%s", result.output)
578
    ToStdout("return code = %s", result.exit_code)
579

    
580
  return 0
581

    
582

    
583
def VerifyCluster(opts, args):
584
  """Verify integrity of cluster, performing various test on nodes.
585

586
  @param opts: the command line options selected by the user
587
  @type args: list
588
  @param args: should be an empty list
589
  @rtype: int
590
  @return: the desired exit code
591

592
  """
593
  skip_checks = []
594

    
595
  if opts.skip_nplusone_mem:
596
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
597

    
598
  cl = GetClient()
599

    
600
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
601
                               error_codes=opts.error_codes,
602
                               debug_simulate_errors=opts.simulate_errors,
603
                               skip_checks=skip_checks,
604
                               ignore_errors=opts.ignore_errors,
605
                               group_name=opts.nodegroup)
606
  result = SubmitOpCode(op, cl=cl, opts=opts)
607

    
608
  # Keep track of submitted jobs
609
  jex = JobExecutor(cl=cl, opts=opts)
610

    
611
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
612
    jex.AddJobId(None, status, job_id)
613

    
614
  results = jex.GetResults()
615

    
616
  (bad_jobs, bad_results) = \
617
    map(len,
618
        # Convert iterators to lists
619
        map(list,
620
            # Count errors
621
            map(compat.partial(itertools.ifilterfalse, bool),
622
                # Convert result to booleans in a tuple
623
                zip(*((job_success, len(op_results) == 1 and op_results[0])
624
                      for (job_success, op_results) in results)))))
625

    
626
  if bad_jobs == 0 and bad_results == 0:
627
    rcode = constants.EXIT_SUCCESS
628
  else:
629
    rcode = constants.EXIT_FAILURE
630
    if bad_jobs > 0:
631
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
632

    
633
  return rcode
634

    
635

    
636
def VerifyDisks(opts, args):
637
  """Verify integrity of cluster disks.
638

639
  @param opts: the command line options selected by the user
640
  @type args: list
641
  @param args: should be an empty list
642
  @rtype: int
643
  @return: the desired exit code
644

645
  """
646
  cl = GetClient()
647

    
648
  op = opcodes.OpClusterVerifyDisks()
649

    
650
  result = SubmitOpCode(op, cl=cl, opts=opts)
651

    
652
  # Keep track of submitted jobs
653
  jex = JobExecutor(cl=cl, opts=opts)
654

    
655
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
656
    jex.AddJobId(None, status, job_id)
657

    
658
  retcode = constants.EXIT_SUCCESS
659

    
660
  for (status, result) in jex.GetResults():
661
    if not status:
662
      ToStdout("Job failed: %s", result)
663
      continue
664

    
665
    ((bad_nodes, instances, missing), ) = result
666

    
667
    for node, text in bad_nodes.items():
668
      ToStdout("Error gathering data on node %s: %s",
669
               node, utils.SafeEncode(text[-400:]))
670
      retcode = constants.EXIT_FAILURE
671
      ToStdout("You need to fix these nodes first before fixing instances")
672

    
673
    for iname in instances:
674
      if iname in missing:
675
        continue
676
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
677
      try:
678
        ToStdout("Activating disks for instance '%s'", iname)
679
        SubmitOpCode(op, opts=opts, cl=cl)
680
      except errors.GenericError, err:
681
        nret, msg = FormatError(err)
682
        retcode |= nret
683
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
684

    
685
    if missing:
686
      for iname, ival in missing.iteritems():
687
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
688
        if all_missing:
689
          ToStdout("Instance %s cannot be verified as it lives on"
690
                   " broken nodes", iname)
691
        else:
692
          ToStdout("Instance %s has missing logical volumes:", iname)
693
          ival.sort()
694
          for node, vol in ival:
695
            if node in bad_nodes:
696
              ToStdout("\tbroken node %s /dev/%s", node, vol)
697
            else:
698
              ToStdout("\t%s /dev/%s", node, vol)
699

    
700
      ToStdout("You need to replace or recreate disks for all the above"
701
               " instances if this message persists after fixing broken nodes.")
702
      retcode = constants.EXIT_FAILURE
703
    elif not instances:
704
      ToStdout("No disks need to be activated.")
705

    
706
  return retcode
707

    
708

    
709
def RepairDiskSizes(opts, args):
710
  """Verify sizes of cluster disks.
711

712
  @param opts: the command line options selected by the user
713
  @type args: list
714
  @param args: optional list of instances to restrict check to
715
  @rtype: int
716
  @return: the desired exit code
717

718
  """
719
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
720
  SubmitOpCode(op, opts=opts)
721

    
722

    
723
@UsesRPC
724
def MasterFailover(opts, args):
725
  """Failover the master node.
726

727
  This command, when run on a non-master node, will cause the current
728
  master to cease being master, and the non-master to become new
729
  master.
730

731
  @param opts: the command line options selected by the user
732
  @type args: list
733
  @param args: should be an empty list
734
  @rtype: int
735
  @return: the desired exit code
736

737
  """
738
  if opts.no_voting and not opts.yes_do_it:
739
    usertext = ("This will perform the failover even if most other nodes"
740
                " are down, or if this node is outdated. This is dangerous"
741
                " as it can lead to a non-consistent cluster. Check the"
742
                " gnt-cluster(8) man page before proceeding. Continue?")
743
    if not AskUser(usertext):
744
      return 1
745

    
746
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
747

    
748

    
749
def MasterPing(opts, args):
750
  """Checks if the master is alive.
751

752
  @param opts: the command line options selected by the user
753
  @type args: list
754
  @param args: should be an empty list
755
  @rtype: int
756
  @return: the desired exit code
757

758
  """
759
  try:
760
    cl = GetClient()
761
    cl.QueryClusterInfo()
762
    return 0
763
  except Exception: # pylint: disable=W0703
764
    return 1
765

    
766

    
767
def SearchTags(opts, args):
768
  """Searches the tags on all the cluster.
769

770
  @param opts: the command line options selected by the user
771
  @type args: list
772
  @param args: should contain only one element, the tag pattern
773
  @rtype: int
774
  @return: the desired exit code
775

776
  """
777
  op = opcodes.OpTagsSearch(pattern=args[0])
778
  result = SubmitOpCode(op, opts=opts)
779
  if not result:
780
    return 1
781
  result = list(result)
782
  result.sort()
783
  for path, tag in result:
784
    ToStdout("%s %s", path, tag)
785

    
786

    
787
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
788
  """Reads and verifies an X509 certificate.
789

790
  @type cert_filename: string
791
  @param cert_filename: the path of the file containing the certificate to
792
                        verify encoded in PEM format
793
  @type verify_private_key: bool
794
  @param verify_private_key: whether to verify the private key in addition to
795
                             the public certificate
796
  @rtype: string
797
  @return: a string containing the PEM-encoded certificate.
798

799
  """
800
  try:
801
    pem = utils.ReadFile(cert_filename)
802
  except IOError, err:
803
    raise errors.X509CertError(cert_filename,
804
                               "Unable to read certificate: %s" % str(err))
805

    
806
  try:
807
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
808
  except Exception, err:
809
    raise errors.X509CertError(cert_filename,
810
                               "Unable to load certificate: %s" % str(err))
811

    
812
  if verify_private_key:
813
    try:
814
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
815
    except Exception, err:
816
      raise errors.X509CertError(cert_filename,
817
                                 "Unable to load private key: %s" % str(err))
818

    
819
  return pem
820

    
821

    
822
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
823
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
824
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
825
                 cds_filename, force):
826
  """Renews cluster certificates, keys and secrets.
827

828
  @type new_cluster_cert: bool
829
  @param new_cluster_cert: Whether to generate a new cluster certificate
830
  @type new_rapi_cert: bool
831
  @param new_rapi_cert: Whether to generate a new RAPI certificate
832
  @type rapi_cert_filename: string
833
  @param rapi_cert_filename: Path to file containing new RAPI certificate
834
  @type new_spice_cert: bool
835
  @param new_spice_cert: Whether to generate a new SPICE certificate
836
  @type spice_cert_filename: string
837
  @param spice_cert_filename: Path to file containing new SPICE certificate
838
  @type spice_cacert_filename: string
839
  @param spice_cacert_filename: Path to file containing the certificate of the
840
                                CA that signed the SPICE certificate
841
  @type new_confd_hmac_key: bool
842
  @param new_confd_hmac_key: Whether to generate a new HMAC key
843
  @type new_cds: bool
844
  @param new_cds: Whether to generate a new cluster domain secret
845
  @type cds_filename: string
846
  @param cds_filename: Path to file containing new cluster domain secret
847
  @type force: bool
848
  @param force: Whether to ask user for confirmation
849

850
  """
851
  if new_rapi_cert and rapi_cert_filename:
852
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
853
             " options can be specified at the same time.")
854
    return 1
855

    
856
  if new_cds and cds_filename:
857
    ToStderr("Only one of the --new-cluster-domain-secret and"
858
             " --cluster-domain-secret options can be specified at"
859
             " the same time.")
860
    return 1
861

    
862
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
863
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
864
             " and --spice-ca-certificate must not be used.")
865
    return 1
866

    
867
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
868
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
869
             " specified.")
870
    return 1
871

    
872
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
873
  try:
874
    if rapi_cert_filename:
875
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
876
    if spice_cert_filename:
877
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
878
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
879
  except errors.X509CertError, err:
880
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
881
    return 1
882

    
883
  if cds_filename:
884
    try:
885
      cds = utils.ReadFile(cds_filename)
886
    except Exception, err: # pylint: disable=W0703
887
      ToStderr("Can't load new cluster domain secret from %s: %s" %
888
               (cds_filename, str(err)))
889
      return 1
890
  else:
891
    cds = None
892

    
893
  if not force:
894
    usertext = ("This requires all daemons on all nodes to be restarted and"
895
                " may take some time. Continue?")
896
    if not AskUser(usertext):
897
      return 1
898

    
899
  def _RenewCryptoInner(ctx):
900
    ctx.feedback_fn("Updating certificates and keys")
901
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
902
                                    new_rapi_cert,
903
                                    new_spice_cert,
904
                                    new_confd_hmac_key,
905
                                    new_cds,
906
                                    rapi_cert_pem=rapi_cert_pem,
907
                                    spice_cert_pem=spice_cert_pem,
908
                                    spice_cacert_pem=spice_cacert_pem,
909
                                    cds=cds)
910

    
911
    files_to_copy = []
912

    
913
    if new_cluster_cert:
914
      files_to_copy.append(pathutils.NODED_CERT_FILE)
915

    
916
    if new_rapi_cert or rapi_cert_pem:
917
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
918

    
919
    if new_spice_cert or spice_cert_pem:
920
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
921
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
922

    
923
    if new_confd_hmac_key:
924
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
925

    
926
    if new_cds or cds:
927
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
928

    
929
    if files_to_copy:
930
      for node_name in ctx.nonmaster_nodes:
931
        ctx.feedback_fn("Copying %s to %s" %
932
                        (", ".join(files_to_copy), node_name))
933
        for file_name in files_to_copy:
934
          ctx.ssh.CopyFileToNode(node_name, file_name)
935

    
936
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
937

    
938
  ToStdout("All requested certificates and keys have been replaced."
939
           " Running \"gnt-cluster verify\" now is recommended.")
940

    
941
  return 0
942

    
943

    
944
def RenewCrypto(opts, args):
945
  """Renews cluster certificates, keys and secrets.
946

947
  """
948
  return _RenewCrypto(opts.new_cluster_cert,
949
                      opts.new_rapi_cert,
950
                      opts.rapi_cert,
951
                      opts.new_spice_cert,
952
                      opts.spice_cert,
953
                      opts.spice_cacert,
954
                      opts.new_confd_hmac_key,
955
                      opts.new_cluster_domain_secret,
956
                      opts.cluster_domain_secret,
957
                      opts.force)
958

    
959

    
960
def SetClusterParams(opts, args):
961
  """Modify the cluster.
962

963
  @param opts: the command line options selected by the user
964
  @type args: list
965
  @param args: should be an empty list
966
  @rtype: int
967
  @return: the desired exit code
968

969
  """
970
  if not (opts.vg_name is not None or opts.drbd_helper or
971
          opts.enabled_hypervisors or opts.hvparams or
972
          opts.beparams or opts.nicparams or
973
          opts.ndparams or opts.diskparams or
974
          opts.candidate_pool_size is not None or
975
          opts.uid_pool is not None or
976
          opts.maintain_node_health is not None or
977
          opts.add_uids is not None or
978
          opts.remove_uids is not None or
979
          opts.default_iallocator is not None or
980
          opts.reserved_lvs is not None or
981
          opts.master_netdev is not None or
982
          opts.master_netmask is not None or
983
          opts.use_external_mip_script is not None or
984
          opts.prealloc_wipe_disks is not None or
985
          opts.hv_state or
986
          opts.enabled_disk_templates or
987
          opts.disk_state or
988
          opts.ispecs_mem_size or
989
          opts.ispecs_cpu_count or
990
          opts.ispecs_disk_count or
991
          opts.ispecs_disk_size or
992
          opts.ispecs_nic_count or
993
          opts.ipolicy_disk_templates is not None or
994
          opts.ipolicy_vcpu_ratio is not None or
995
          opts.ipolicy_spindle_ratio is not None):
996
    ToStderr("Please give at least one of the parameters.")
997
    return 1
998

    
999
  if _CheckNoLvmStorageOptDeprecated(opts):
1000
    return 1
1001

    
1002
  enabled_disk_templates = None
1003
  if opts.enabled_disk_templates:
1004
    enabled_disk_templates = opts.enabled_disk_templates.split(",")
1005

    
1006
  # consistency between vg name and enabled disk templates
1007
  vg_name = None
1008
  if opts.vg_name is not None:
1009
    vg_name = opts.vg_name
1010
  if enabled_disk_templates:
1011
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1012
      ToStdout("You specified a volume group with --vg-name, but you did not"
1013
               " enable any of the following lvm-based disk templates: %s" %
1014
               utils.CommaJoin(utils.GetLvmDiskTemplates()))
1015

    
1016
  drbd_helper = opts.drbd_helper
1017
  if not opts.drbd_storage and opts.drbd_helper:
1018
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
1019
    return 1
1020

    
1021
  if not opts.drbd_storage:
1022
    drbd_helper = ""
1023

    
1024
  hvlist = opts.enabled_hypervisors
1025
  if hvlist is not None:
1026
    hvlist = hvlist.split(",")
1027

    
1028
  # a list of (name, dict) we can pass directly to dict() (or [])
1029
  hvparams = dict(opts.hvparams)
1030
  for hv_params in hvparams.values():
1031
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1032

    
1033
  diskparams = dict(opts.diskparams)
1034

    
1035
  for dt_params in diskparams.values():
1036
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1037

    
1038
  beparams = opts.beparams
1039
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1040

    
1041
  nicparams = opts.nicparams
1042
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1043

    
1044
  ndparams = opts.ndparams
1045
  if ndparams is not None:
1046
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1047

    
1048
  ipolicy = CreateIPolicyFromOpts(
1049
    ispecs_mem_size=opts.ispecs_mem_size,
1050
    ispecs_cpu_count=opts.ispecs_cpu_count,
1051
    ispecs_disk_count=opts.ispecs_disk_count,
1052
    ispecs_disk_size=opts.ispecs_disk_size,
1053
    ispecs_nic_count=opts.ispecs_nic_count,
1054
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1055
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1056
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1057
    )
1058

    
1059
  mnh = opts.maintain_node_health
1060

    
1061
  uid_pool = opts.uid_pool
1062
  if uid_pool is not None:
1063
    uid_pool = uidpool.ParseUidPool(uid_pool)
1064

    
1065
  add_uids = opts.add_uids
1066
  if add_uids is not None:
1067
    add_uids = uidpool.ParseUidPool(add_uids)
1068

    
1069
  remove_uids = opts.remove_uids
1070
  if remove_uids is not None:
1071
    remove_uids = uidpool.ParseUidPool(remove_uids)
1072

    
1073
  if opts.reserved_lvs is not None:
1074
    if opts.reserved_lvs == "":
1075
      opts.reserved_lvs = []
1076
    else:
1077
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1078

    
1079
  if opts.master_netmask is not None:
1080
    try:
1081
      opts.master_netmask = int(opts.master_netmask)
1082
    except ValueError:
1083
      ToStderr("The --master-netmask option expects an int parameter.")
1084
      return 1
1085

    
1086
  ext_ip_script = opts.use_external_mip_script
1087

    
1088
  if opts.disk_state:
1089
    disk_state = utils.FlatToDict(opts.disk_state)
1090
  else:
1091
    disk_state = {}
1092

    
1093
  hv_state = dict(opts.hv_state)
1094

    
1095
  op = opcodes.OpClusterSetParams(
1096
    vg_name=vg_name,
1097
    drbd_helper=drbd_helper,
1098
    enabled_hypervisors=hvlist,
1099
    hvparams=hvparams,
1100
    os_hvp=None,
1101
    beparams=beparams,
1102
    nicparams=nicparams,
1103
    ndparams=ndparams,
1104
    diskparams=diskparams,
1105
    ipolicy=ipolicy,
1106
    candidate_pool_size=opts.candidate_pool_size,
1107
    maintain_node_health=mnh,
1108
    uid_pool=uid_pool,
1109
    add_uids=add_uids,
1110
    remove_uids=remove_uids,
1111
    default_iallocator=opts.default_iallocator,
1112
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1113
    master_netdev=opts.master_netdev,
1114
    master_netmask=opts.master_netmask,
1115
    reserved_lvs=opts.reserved_lvs,
1116
    use_external_mip_script=ext_ip_script,
1117
    hv_state=hv_state,
1118
    disk_state=disk_state,
1119
    enabled_disk_templates=enabled_disk_templates,
1120
    )
1121
  SubmitOrSend(op, opts)
1122
  return 0
1123

    
1124

    
1125
def QueueOps(opts, args):
1126
  """Queue operations.
1127

1128
  @param opts: the command line options selected by the user
1129
  @type args: list
1130
  @param args: should contain only one element, the subcommand
1131
  @rtype: int
1132
  @return: the desired exit code
1133

1134
  """
1135
  command = args[0]
1136
  client = GetClient()
1137
  if command in ("drain", "undrain"):
1138
    drain_flag = command == "drain"
1139
    client.SetQueueDrainFlag(drain_flag)
1140
  elif command == "info":
1141
    result = client.QueryConfigValues(["drain_flag"])
1142
    if result[0]:
1143
      val = "set"
1144
    else:
1145
      val = "unset"
1146
    ToStdout("The drain flag is %s" % val)
1147
  else:
1148
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1149
                               errors.ECODE_INVAL)
1150

    
1151
  return 0
1152

    
1153

    
1154
def _ShowWatcherPause(until):
1155
  if until is None or until < time.time():
1156
    ToStdout("The watcher is not paused.")
1157
  else:
1158
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1159

    
1160

    
1161
def WatcherOps(opts, args):
1162
  """Watcher operations.
1163

1164
  @param opts: the command line options selected by the user
1165
  @type args: list
1166
  @param args: should contain only one element, the subcommand
1167
  @rtype: int
1168
  @return: the desired exit code
1169

1170
  """
1171
  command = args[0]
1172
  client = GetClient()
1173

    
1174
  if command == "continue":
1175
    client.SetWatcherPause(None)
1176
    ToStdout("The watcher is no longer paused.")
1177

    
1178
  elif command == "pause":
1179
    if len(args) < 2:
1180
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1181

    
1182
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1183
    _ShowWatcherPause(result)
1184

    
1185
  elif command == "info":
1186
    result = client.QueryConfigValues(["watcher_pause"])
1187
    _ShowWatcherPause(result[0])
1188

    
1189
  else:
1190
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1191
                               errors.ECODE_INVAL)
1192

    
1193
  return 0
1194

    
1195

    
1196
def _OobPower(opts, node_list, power):
1197
  """Puts the node in the list to desired power state.
1198

1199
  @param opts: The command line options selected by the user
1200
  @param node_list: The list of nodes to operate on
1201
  @param power: True if they should be powered on, False otherwise
1202
  @return: The success of the operation (none failed)
1203

1204
  """
1205
  if power:
1206
    command = constants.OOB_POWER_ON
1207
  else:
1208
    command = constants.OOB_POWER_OFF
1209

    
1210
  op = opcodes.OpOobCommand(node_names=node_list,
1211
                            command=command,
1212
                            ignore_status=True,
1213
                            timeout=opts.oob_timeout,
1214
                            power_delay=opts.power_delay)
1215
  result = SubmitOpCode(op, opts=opts)
1216
  errs = 0
1217
  for node_result in result:
1218
    (node_tuple, data_tuple) = node_result
1219
    (_, node_name) = node_tuple
1220
    (data_status, _) = data_tuple
1221
    if data_status != constants.RS_NORMAL:
1222
      assert data_status != constants.RS_UNAVAIL
1223
      errs += 1
1224
      ToStderr("There was a problem changing power for %s, please investigate",
1225
               node_name)
1226

    
1227
  if errs > 0:
1228
    return False
1229

    
1230
  return True
1231

    
1232

    
1233
def _InstanceStart(opts, inst_list, start, no_remember=False):
1234
  """Puts the instances in the list to desired state.
1235

1236
  @param opts: The command line options selected by the user
1237
  @param inst_list: The list of instances to operate on
1238
  @param start: True if they should be started, False for shutdown
1239
  @param no_remember: If the instance state should be remembered
1240
  @return: The success of the operation (none failed)
1241

1242
  """
1243
  if start:
1244
    opcls = opcodes.OpInstanceStartup
1245
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1246
  else:
1247
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1248
                           timeout=opts.shutdown_timeout,
1249
                           no_remember=no_remember)
1250
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1251

    
1252
  jex = JobExecutor(opts=opts)
1253

    
1254
  for inst in inst_list:
1255
    ToStdout("Submit %s of instance %s", text_submit, inst)
1256
    op = opcls(instance_name=inst)
1257
    jex.QueueJob(inst, op)
1258

    
1259
  results = jex.GetResults()
1260
  bad_cnt = len([1 for (success, _) in results if not success])
1261

    
1262
  if bad_cnt == 0:
1263
    ToStdout("All instances have been %s successfully", text_success)
1264
  else:
1265
    ToStderr("There were errors while %s instances:\n"
1266
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1267
             len(results))
1268
    return False
1269

    
1270
  return True
1271

    
1272

    
1273
class _RunWhenNodesReachableHelper:
1274
  """Helper class to make shared internal state sharing easier.
1275

1276
  @ivar success: Indicates if all action_cb calls were successful
1277

1278
  """
1279
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1280
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1281
    """Init the object.
1282

1283
    @param node_list: The list of nodes to be reachable
1284
    @param action_cb: Callback called when a new host is reachable
1285
    @type node2ip: dict
1286
    @param node2ip: Node to ip mapping
1287
    @param port: The port to use for the TCP ping
1288
    @param feedback_fn: The function used for feedback
1289
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1290
    @param _sleep_fn: Function to sleep (for unittest use only)
1291

1292
    """
1293
    self.down = set(node_list)
1294
    self.up = set()
1295
    self.node2ip = node2ip
1296
    self.success = True
1297
    self.action_cb = action_cb
1298
    self.port = port
1299
    self.feedback_fn = feedback_fn
1300
    self._ping_fn = _ping_fn
1301
    self._sleep_fn = _sleep_fn
1302

    
1303
  def __call__(self):
1304
    """When called we run action_cb.
1305

1306
    @raises utils.RetryAgain: When there are still down nodes
1307

1308
    """
1309
    if not self.action_cb(self.up):
1310
      self.success = False
1311

    
1312
    if self.down:
1313
      raise utils.RetryAgain()
1314
    else:
1315
      return self.success
1316

    
1317
  def Wait(self, secs):
1318
    """Checks if a host is up or waits remaining seconds.
1319

1320
    @param secs: The secs remaining
1321

1322
    """
1323
    start = time.time()
1324
    for node in self.down:
1325
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1326
                       live_port_needed=True):
1327
        self.feedback_fn("Node %s became available" % node)
1328
        self.up.add(node)
1329
        self.down -= self.up
1330
        # If we have a node available there is the possibility to run the
1331
        # action callback successfully, therefore we don't wait and return
1332
        return
1333

    
1334
    self._sleep_fn(max(0.0, start + secs - time.time()))
1335

    
1336

    
1337
def _RunWhenNodesReachable(node_list, action_cb, interval):
1338
  """Run action_cb when nodes become reachable.
1339

1340
  @param node_list: The list of nodes to be reachable
1341
  @param action_cb: Callback called when a new host is reachable
1342
  @param interval: The earliest time to retry
1343

1344
  """
1345
  client = GetClient()
1346
  cluster_info = client.QueryClusterInfo()
1347
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1348
    family = netutils.IPAddress.family
1349
  else:
1350
    family = netutils.IP6Address.family
1351

    
1352
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1353
                 for node in node_list)
1354

    
1355
  port = netutils.GetDaemonPort(constants.NODED)
1356
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1357
                                        ToStdout)
1358

    
1359
  try:
1360
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1361
                       wait_fn=helper.Wait)
1362
  except utils.RetryTimeout:
1363
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1364
             " again:\n  - %s", "  - ".join(helper.down))
1365
    return False
1366

    
1367

    
1368
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1369
                          _instance_start_fn=_InstanceStart):
1370
  """Start the instances conditional based on node_states.
1371

1372
  @param opts: The command line options selected by the user
1373
  @param inst_map: A dict of inst -> nodes mapping
1374
  @param nodes_online: A list of nodes online
1375
  @param _instance_start_fn: Callback to start instances (unittest use only)
1376
  @return: Success of the operation on all instances
1377

1378
  """
1379
  start_inst_list = []
1380
  for (inst, nodes) in inst_map.items():
1381
    if not (nodes - nodes_online):
1382
      # All nodes the instance lives on are back online
1383
      start_inst_list.append(inst)
1384

    
1385
  for inst in start_inst_list:
1386
    del inst_map[inst]
1387

    
1388
  if start_inst_list:
1389
    return _instance_start_fn(opts, start_inst_list, True)
1390

    
1391
  return True
1392

    
1393

    
1394
def _EpoOn(opts, full_node_list, node_list, inst_map):
1395
  """Does the actual power on.
1396

1397
  @param opts: The command line options selected by the user
1398
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1399
                         OOB)
1400
  @param node_list: The list of nodes to operate on (all need to support OOB)
1401
  @param inst_map: A dict of inst -> nodes mapping
1402
  @return: The desired exit status
1403

1404
  """
1405
  if node_list and not _OobPower(opts, node_list, False):
1406
    ToStderr("Not all nodes seem to get back up, investigate and start"
1407
             " manually if needed")
1408

    
1409
  # Wait for the nodes to be back up
1410
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1411

    
1412
  ToStdout("Waiting until all nodes are available again")
1413
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1414
    ToStderr("Please investigate and start stopped instances manually")
1415
    return constants.EXIT_FAILURE
1416

    
1417
  return constants.EXIT_SUCCESS
1418

    
1419

    
1420
def _EpoOff(opts, node_list, inst_map):
1421
  """Does the actual power off.
1422

1423
  @param opts: The command line options selected by the user
1424
  @param node_list: The list of nodes to operate on (all need to support OOB)
1425
  @param inst_map: A dict of inst -> nodes mapping
1426
  @return: The desired exit status
1427

1428
  """
1429
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1430
    ToStderr("Please investigate and stop instances manually before continuing")
1431
    return constants.EXIT_FAILURE
1432

    
1433
  if not node_list:
1434
    return constants.EXIT_SUCCESS
1435

    
1436
  if _OobPower(opts, node_list, False):
1437
    return constants.EXIT_SUCCESS
1438
  else:
1439
    return constants.EXIT_FAILURE
1440

    
1441

    
1442
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1443
        _confirm_fn=ConfirmOperation,
1444
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1445
  """EPO operations.
1446

1447
  @param opts: the command line options selected by the user
1448
  @type args: list
1449
  @param args: should contain only one element, the subcommand
1450
  @rtype: int
1451
  @return: the desired exit code
1452

1453
  """
1454
  if opts.groups and opts.show_all:
1455
    _stderr_fn("Only one of --groups or --all are allowed")
1456
    return constants.EXIT_FAILURE
1457
  elif args and opts.show_all:
1458
    _stderr_fn("Arguments in combination with --all are not allowed")
1459
    return constants.EXIT_FAILURE
1460

    
1461
  if cl is None:
1462
    cl = GetClient()
1463

    
1464
  if opts.groups:
1465
    node_query_list = \
1466
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1467
  else:
1468
    node_query_list = args
1469

    
1470
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1471
                                           "sinst_list", "powered", "offline"],
1472
                         False)
1473

    
1474
  all_nodes = map(compat.fst, result)
1475
  node_list = []
1476
  inst_map = {}
1477
  for (node, master, pinsts, sinsts, powered, offline) in result:
1478
    if not offline:
1479
      for inst in (pinsts + sinsts):
1480
        if inst in inst_map:
1481
          if not master:
1482
            inst_map[inst].add(node)
1483
        elif master:
1484
          inst_map[inst] = set()
1485
        else:
1486
          inst_map[inst] = set([node])
1487

    
1488
    if master and opts.on:
1489
      # We ignore the master for turning on the machines, in fact we are
1490
      # already operating on the master at this point :)
1491
      continue
1492
    elif master and not opts.show_all:
1493
      _stderr_fn("%s is the master node, please do a master-failover to another"
1494
                 " node not affected by the EPO or use --all if you intend to"
1495
                 " shutdown the whole cluster", node)
1496
      return constants.EXIT_FAILURE
1497
    elif powered is None:
1498
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1499
                 " handled in a fully automated manner", node)
1500
    elif powered == opts.on:
1501
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1502
    elif not offline or (offline and powered):
1503
      node_list.append(node)
1504

    
1505
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1506
    return constants.EXIT_FAILURE
1507

    
1508
  if opts.on:
1509
    return _on_fn(opts, all_nodes, node_list, inst_map)
1510
  else:
1511
    return _off_fn(opts, node_list, inst_map)
1512

    
1513

    
1514
commands = {
1515
  "init": (
1516
    InitCluster, [ArgHost(min=1, max=1)],
1517
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1518
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1519
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1520
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1521
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1522
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1523
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1524
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT]
1525
     + INSTANCE_POLICY_OPTS,
1526
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1527
  "destroy": (
1528
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1529
    "", "Destroy cluster"),
1530
  "rename": (
1531
    RenameCluster, [ArgHost(min=1, max=1)],
1532
    [FORCE_OPT, DRY_RUN_OPT],
1533
    "<new_name>",
1534
    "Renames the cluster"),
1535
  "redist-conf": (
1536
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1537
    "", "Forces a push of the configuration file and ssconf files"
1538
    " to the nodes in the cluster"),
1539
  "verify": (
1540
    VerifyCluster, ARGS_NONE,
1541
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1542
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1543
    "", "Does a check on the cluster configuration"),
1544
  "verify-disks": (
1545
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1546
    "", "Does a check on the cluster disk status"),
1547
  "repair-disk-sizes": (
1548
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1549
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1550
  "master-failover": (
1551
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1552
    "", "Makes the current node the master"),
1553
  "master-ping": (
1554
    MasterPing, ARGS_NONE, [],
1555
    "", "Checks if the master is alive"),
1556
  "version": (
1557
    ShowClusterVersion, ARGS_NONE, [],
1558
    "", "Shows the cluster version"),
1559
  "getmaster": (
1560
    ShowClusterMaster, ARGS_NONE, [],
1561
    "", "Shows the cluster master"),
1562
  "copyfile": (
1563
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1564
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1565
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1566
  "command": (
1567
    RunClusterCommand, [ArgCommand(min=1)],
1568
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1569
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1570
  "info": (
1571
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1572
    "[--roman]", "Show cluster configuration"),
1573
  "list-tags": (
1574
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1575
  "add-tags": (
1576
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1577
    "tag...", "Add tags to the cluster"),
1578
  "remove-tags": (
1579
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1580
    "tag...", "Remove tags from the cluster"),
1581
  "search-tags": (
1582
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1583
    "Searches the tags on all objects on"
1584
    " the cluster for a given pattern (regex)"),
1585
  "queue": (
1586
    QueueOps,
1587
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1588
    [], "drain|undrain|info", "Change queue properties"),
1589
  "watcher": (
1590
    WatcherOps,
1591
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1592
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1593
    [],
1594
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1595
  "modify": (
1596
    SetClusterParams, ARGS_NONE,
1597
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1598
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1599
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1600
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1601
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1602
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1603
     DISK_STATE_OPT, SUBMIT_OPT, ENABLED_DISK_TEMPLATES_OPT] +
1604
    INSTANCE_POLICY_OPTS,
1605
    "[opts...]",
1606
    "Alters the parameters of the cluster"),
1607
  "renew-crypto": (
1608
    RenewCrypto, ARGS_NONE,
1609
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1610
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1611
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1612
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1613
    "[opts...]",
1614
    "Renews cluster certificates, keys and secrets"),
1615
  "epo": (
1616
    Epo, [ArgUnknown()],
1617
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1618
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1619
    "[opts...] [args]",
1620
    "Performs an emergency power-off on given args"),
1621
  "activate-master-ip": (
1622
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1623
  "deactivate-master-ip": (
1624
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1625
    "Deactivates the master IP"),
1626
  }
1627

    
1628

    
1629
#: dictionary with aliases for commands
1630
aliases = {
1631
  "masterfailover": "master-failover",
1632
  "show": "info",
1633
}
1634

    
1635

    
1636
def Main():
1637
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1638
                     aliases=aliases)