Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ d2d3935a

History | View | Annotate | Download (51.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45
from ganeti import pathutils
46

    
47

    
48
ON_OPT = cli_option("--on", default=False,
49
                    action="store_true", dest="on",
50
                    help="Recover from an EPO")
51

    
52
GROUPS_OPT = cli_option("--groups", default=False,
53
                        action="store_true", dest="groups",
54
                        help="Arguments are node groups instead of nodes")
55

    
56
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
57
                            help="Override interactive check for --no-voting",
58
                            default=False, action="store_true")
59

    
60
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
61
_EPO_PING_TIMEOUT = 1 # 1 second
62
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
63

    
64

    
65
@UsesRPC
66
def InitCluster(opts, args):
67
  """Initialize the cluster.
68

69
  @param opts: the command line options selected by the user
70
  @type args: list
71
  @param args: should contain only one element, the desired
72
      cluster name
73
  @rtype: int
74
  @return: the desired exit code
75

76
  """
77
  if not opts.lvm_storage and opts.vg_name:
78
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
79
    return 1
80

    
81
  vg_name = opts.vg_name
82
  if opts.lvm_storage and not opts.vg_name:
83
    vg_name = constants.DEFAULT_VG
84

    
85
  if not opts.drbd_storage and opts.drbd_helper:
86
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
87
    return 1
88

    
89
  drbd_helper = opts.drbd_helper
90
  if opts.drbd_storage and not opts.drbd_helper:
91
    drbd_helper = constants.DEFAULT_DRBD_HELPER
92

    
93
  master_netdev = opts.master_netdev
94
  if master_netdev is None:
95
    master_netdev = constants.DEFAULT_BRIDGE
96

    
97
  hvlist = opts.enabled_hypervisors
98
  if hvlist is None:
99
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
100
  hvlist = hvlist.split(",")
101

    
102
  hvparams = dict(opts.hvparams)
103
  beparams = opts.beparams
104
  nicparams = opts.nicparams
105

    
106
  diskparams = dict(opts.diskparams)
107

    
108
  # check the disk template types here, as we cannot rely on the type check done
109
  # by the opcode parameter types
110
  diskparams_keys = set(diskparams.keys())
111
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
112
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
113
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
114
    return 1
115

    
116
  # prepare beparams dict
117
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
118
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
119

    
120
  # prepare nicparams dict
121
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
122
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
123

    
124
  # prepare ndparams dict
125
  if opts.ndparams is None:
126
    ndparams = dict(constants.NDC_DEFAULTS)
127
  else:
128
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
129
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
130

    
131
  # prepare hvparams dict
132
  for hv in constants.HYPER_TYPES:
133
    if hv not in hvparams:
134
      hvparams[hv] = {}
135
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
136
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
137

    
138
  # prepare diskparams dict
139
  for templ in constants.DISK_TEMPLATES:
140
    if templ not in diskparams:
141
      diskparams[templ] = {}
142
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
143
                                         diskparams[templ])
144
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
145

    
146
  # prepare ipolicy dict
147
  ipolicy = CreateIPolicyFromOpts(
148
    ispecs_mem_size=opts.ispecs_mem_size,
149
    ispecs_cpu_count=opts.ispecs_cpu_count,
150
    ispecs_disk_count=opts.ispecs_disk_count,
151
    ispecs_disk_size=opts.ispecs_disk_size,
152
    ispecs_nic_count=opts.ispecs_nic_count,
153
    minmax_ispecs=opts.ipolicy_bounds_specs,
154
    std_ispecs=opts.ipolicy_std_specs,
155
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
156
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
157
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
158
    fill_all=True)
159

    
160
  if opts.candidate_pool_size is None:
161
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
162

    
163
  if opts.mac_prefix is None:
164
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
165

    
166
  uid_pool = opts.uid_pool
167
  if uid_pool is not None:
168
    uid_pool = uidpool.ParseUidPool(uid_pool)
169

    
170
  if opts.prealloc_wipe_disks is None:
171
    opts.prealloc_wipe_disks = False
172

    
173
  external_ip_setup_script = opts.use_external_mip_script
174
  if external_ip_setup_script is None:
175
    external_ip_setup_script = False
176

    
177
  try:
178
    primary_ip_version = int(opts.primary_ip_version)
179
  except (ValueError, TypeError), err:
180
    ToStderr("Invalid primary ip version value: %s" % str(err))
181
    return 1
182

    
183
  master_netmask = opts.master_netmask
184
  try:
185
    if master_netmask is not None:
186
      master_netmask = int(master_netmask)
187
  except (ValueError, TypeError), err:
188
    ToStderr("Invalid master netmask value: %s" % str(err))
189
    return 1
190

    
191
  if opts.disk_state:
192
    disk_state = utils.FlatToDict(opts.disk_state)
193
  else:
194
    disk_state = {}
195

    
196
  hv_state = dict(opts.hv_state)
197

    
198
  enabled_disk_templates = opts.enabled_disk_templates
199
  if enabled_disk_templates:
200
    enabled_disk_templates = enabled_disk_templates.split(",")
201
  else:
202
    enabled_disk_templates = list(constants.DEFAULT_ENABLED_DISK_TEMPLATES)
203

    
204
  bootstrap.InitCluster(cluster_name=args[0],
205
                        secondary_ip=opts.secondary_ip,
206
                        vg_name=vg_name,
207
                        mac_prefix=opts.mac_prefix,
208
                        master_netmask=master_netmask,
209
                        master_netdev=master_netdev,
210
                        file_storage_dir=opts.file_storage_dir,
211
                        shared_file_storage_dir=opts.shared_file_storage_dir,
212
                        enabled_hypervisors=hvlist,
213
                        hvparams=hvparams,
214
                        beparams=beparams,
215
                        nicparams=nicparams,
216
                        ndparams=ndparams,
217
                        diskparams=diskparams,
218
                        ipolicy=ipolicy,
219
                        candidate_pool_size=opts.candidate_pool_size,
220
                        modify_etc_hosts=opts.modify_etc_hosts,
221
                        modify_ssh_setup=opts.modify_ssh_setup,
222
                        maintain_node_health=opts.maintain_node_health,
223
                        drbd_helper=drbd_helper,
224
                        uid_pool=uid_pool,
225
                        default_iallocator=opts.default_iallocator,
226
                        primary_ip_version=primary_ip_version,
227
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
228
                        use_external_mip_script=external_ip_setup_script,
229
                        hv_state=hv_state,
230
                        disk_state=disk_state,
231
                        enabled_disk_templates=enabled_disk_templates,
232
                        )
233
  op = opcodes.OpClusterPostInit()
234
  SubmitOpCode(op, opts=opts)
235
  return 0
236

    
237

    
238
@UsesRPC
239
def DestroyCluster(opts, args):
240
  """Destroy the cluster.
241

242
  @param opts: the command line options selected by the user
243
  @type args: list
244
  @param args: should be an empty list
245
  @rtype: int
246
  @return: the desired exit code
247

248
  """
249
  if not opts.yes_do_it:
250
    ToStderr("Destroying a cluster is irreversible. If you really want"
251
             " destroy this cluster, supply the --yes-do-it option.")
252
    return 1
253

    
254
  op = opcodes.OpClusterDestroy()
255
  master = SubmitOpCode(op, opts=opts)
256
  # if we reached this, the opcode didn't fail; we can proceed to
257
  # shutdown all the daemons
258
  bootstrap.FinalizeClusterDestroy(master)
259
  return 0
260

    
261

    
262
def RenameCluster(opts, args):
263
  """Rename the cluster.
264

265
  @param opts: the command line options selected by the user
266
  @type args: list
267
  @param args: should contain only one element, the new cluster name
268
  @rtype: int
269
  @return: the desired exit code
270

271
  """
272
  cl = GetClient()
273

    
274
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
275

    
276
  new_name = args[0]
277
  if not opts.force:
278
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
279
                " connected over the network to the cluster name, the"
280
                " operation is very dangerous as the IP address will be"
281
                " removed from the node and the change may not go through."
282
                " Continue?") % (cluster_name, new_name)
283
    if not AskUser(usertext):
284
      return 1
285

    
286
  op = opcodes.OpClusterRename(name=new_name)
287
  result = SubmitOpCode(op, opts=opts, cl=cl)
288

    
289
  if result:
290
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
291

    
292
  return 0
293

    
294

    
295
def ActivateMasterIp(opts, args):
296
  """Activates the master IP.
297

298
  """
299
  op = opcodes.OpClusterActivateMasterIp()
300
  SubmitOpCode(op)
301
  return 0
302

    
303

    
304
def DeactivateMasterIp(opts, args):
305
  """Deactivates the master IP.
306

307
  """
308
  if not opts.confirm:
309
    usertext = ("This will disable the master IP. All the open connections to"
310
                " the master IP will be closed. To reach the master you will"
311
                " need to use its node IP."
312
                " Continue?")
313
    if not AskUser(usertext):
314
      return 1
315

    
316
  op = opcodes.OpClusterDeactivateMasterIp()
317
  SubmitOpCode(op)
318
  return 0
319

    
320

    
321
def RedistributeConfig(opts, args):
322
  """Forces push of the cluster configuration.
323

324
  @param opts: the command line options selected by the user
325
  @type args: list
326
  @param args: empty list
327
  @rtype: int
328
  @return: the desired exit code
329

330
  """
331
  op = opcodes.OpClusterRedistConf()
332
  SubmitOrSend(op, opts)
333
  return 0
334

    
335

    
336
def ShowClusterVersion(opts, args):
337
  """Write version of ganeti software to the standard output.
338

339
  @param opts: the command line options selected by the user
340
  @type args: list
341
  @param args: should be an empty list
342
  @rtype: int
343
  @return: the desired exit code
344

345
  """
346
  cl = GetClient(query=True)
347
  result = cl.QueryClusterInfo()
348
  ToStdout("Software version: %s", result["software_version"])
349
  ToStdout("Internode protocol: %s", result["protocol_version"])
350
  ToStdout("Configuration format: %s", result["config_version"])
351
  ToStdout("OS api version: %s", result["os_api_version"])
352
  ToStdout("Export interface: %s", result["export_version"])
353
  return 0
354

    
355

    
356
def ShowClusterMaster(opts, args):
357
  """Write name of master node to the standard output.
358

359
  @param opts: the command line options selected by the user
360
  @type args: list
361
  @param args: should be an empty list
362
  @rtype: int
363
  @return: the desired exit code
364

365
  """
366
  master = bootstrap.GetMaster()
367
  ToStdout(master)
368
  return 0
369

    
370

    
371
def _FormatGroupedParams(paramsdict, roman=False):
372
  """Format Grouped parameters (be, nic, disk) by group.
373

374
  @type paramsdict: dict of dicts
375
  @param paramsdict: {group: {param: value, ...}, ...}
376
  @rtype: dict of dicts
377
  @return: copy of the input dictionaries with strings as values
378

379
  """
380
  ret = {}
381
  for (item, val) in paramsdict.items():
382
    if isinstance(val, dict):
383
      ret[item] = _FormatGroupedParams(val, roman=roman)
384
    elif roman and isinstance(val, int):
385
      ret[item] = compat.TryToRoman(val)
386
    else:
387
      ret[item] = str(val)
388
  return ret
389

    
390

    
391
def ShowClusterConfig(opts, args):
392
  """Shows cluster information.
393

394
  @param opts: the command line options selected by the user
395
  @type args: list
396
  @param args: should be an empty list
397
  @rtype: int
398
  @return: the desired exit code
399

400
  """
401
  cl = GetClient(query=True)
402
  result = cl.QueryClusterInfo()
403

    
404
  if result["tags"]:
405
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
406
  else:
407
    tags = "(none)"
408
  if result["reserved_lvs"]:
409
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
410
  else:
411
    reserved_lvs = "(none)"
412

    
413
  info = [
414
    ("Cluster name", result["name"]),
415
    ("Cluster UUID", result["uuid"]),
416

    
417
    ("Creation time", utils.FormatTime(result["ctime"])),
418
    ("Modification time", utils.FormatTime(result["mtime"])),
419

    
420
    ("Master node", result["master"]),
421

    
422
    ("Architecture (this node)",
423
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
424

    
425
    ("Tags", tags),
426

    
427
    ("Default hypervisor", result["default_hypervisor"]),
428
    ("Enabled hypervisors",
429
     utils.CommaJoin(result["enabled_hypervisors"])),
430

    
431
    ("Hypervisor parameters", _FormatGroupedParams(result["hvparams"])),
432

    
433
    ("OS-specific hypervisor parameters",
434
     _FormatGroupedParams(result["os_hvp"])),
435

    
436
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
437

    
438
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
439
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
440

    
441
    ("Cluster parameters", [
442
      ("candidate pool size",
443
       compat.TryToRoman(result["candidate_pool_size"],
444
                         convert=opts.roman_integers)),
445
      ("master netdev", result["master_netdev"]),
446
      ("master netmask", result["master_netmask"]),
447
      ("use external master IP address setup script",
448
       result["use_external_mip_script"]),
449
      ("lvm volume group", result["volume_group_name"]),
450
      ("lvm reserved volumes", reserved_lvs),
451
      ("drbd usermode helper", result["drbd_usermode_helper"]),
452
      ("file storage path", result["file_storage_dir"]),
453
      ("shared file storage path", result["shared_file_storage_dir"]),
454
      ("maintenance of node health", result["maintain_node_health"]),
455
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
456
      ("default instance allocator", result["default_iallocator"]),
457
      ("primary ip version", result["primary_ip_version"]),
458
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
459
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
460
      ("ExtStorage Providers search path",
461
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
462
      ("enabled disk templates",
463
       utils.CommaJoin(result["enabled_disk_templates"])),
464
      ]),
465

    
466
    ("Default node parameters",
467
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
468

    
469
    ("Default instance parameters",
470
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
471

    
472
    ("Default nic parameters",
473
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
474

    
475
    ("Default disk parameters",
476
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
477

    
478
    ("Instance policy - limits for instances",
479
     FormatPolicyInfo(result["ipolicy"], None, True)),
480
    ]
481

    
482
  PrintGenericInfo(info)
483
  return 0
484

    
485

    
486
def ClusterCopyFile(opts, args):
487
  """Copy a file from master to some nodes.
488

489
  @param opts: the command line options selected by the user
490
  @type args: list
491
  @param args: should contain only one element, the path of
492
      the file to be copied
493
  @rtype: int
494
  @return: the desired exit code
495

496
  """
497
  filename = args[0]
498
  if not os.path.exists(filename):
499
    raise errors.OpPrereqError("No such filename '%s'" % filename,
500
                               errors.ECODE_INVAL)
501

    
502
  cl = GetClient()
503

    
504
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
505

    
506
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
507
                           secondary_ips=opts.use_replication_network,
508
                           nodegroup=opts.nodegroup)
509

    
510
  srun = ssh.SshRunner(cluster_name)
511
  for node in results:
512
    if not srun.CopyFileToNode(node, filename):
513
      ToStderr("Copy of file %s to node %s failed", filename, node)
514

    
515
  return 0
516

    
517

    
518
def RunClusterCommand(opts, args):
519
  """Run a command on some nodes.
520

521
  @param opts: the command line options selected by the user
522
  @type args: list
523
  @param args: should contain the command to be run and its arguments
524
  @rtype: int
525
  @return: the desired exit code
526

527
  """
528
  cl = GetClient()
529

    
530
  command = " ".join(args)
531

    
532
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
533

    
534
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
535
                                                    "master_node"])
536

    
537
  srun = ssh.SshRunner(cluster_name=cluster_name)
538

    
539
  # Make sure master node is at list end
540
  if master_node in nodes:
541
    nodes.remove(master_node)
542
    nodes.append(master_node)
543

    
544
  for name in nodes:
545
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
546

    
547
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
548
      # Do not output anything for successful commands
549
      continue
550

    
551
    ToStdout("------------------------------------------------")
552
    if opts.show_machine_names:
553
      for line in result.output.splitlines():
554
        ToStdout("%s: %s", name, line)
555
    else:
556
      ToStdout("node: %s", name)
557
      ToStdout("%s", result.output)
558
    ToStdout("return code = %s", result.exit_code)
559

    
560
  return 0
561

    
562

    
563
def VerifyCluster(opts, args):
564
  """Verify integrity of cluster, performing various test on nodes.
565

566
  @param opts: the command line options selected by the user
567
  @type args: list
568
  @param args: should be an empty list
569
  @rtype: int
570
  @return: the desired exit code
571

572
  """
573
  skip_checks = []
574

    
575
  if opts.skip_nplusone_mem:
576
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
577

    
578
  cl = GetClient()
579

    
580
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
581
                               error_codes=opts.error_codes,
582
                               debug_simulate_errors=opts.simulate_errors,
583
                               skip_checks=skip_checks,
584
                               ignore_errors=opts.ignore_errors,
585
                               group_name=opts.nodegroup)
586
  result = SubmitOpCode(op, cl=cl, opts=opts)
587

    
588
  # Keep track of submitted jobs
589
  jex = JobExecutor(cl=cl, opts=opts)
590

    
591
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
592
    jex.AddJobId(None, status, job_id)
593

    
594
  results = jex.GetResults()
595

    
596
  (bad_jobs, bad_results) = \
597
    map(len,
598
        # Convert iterators to lists
599
        map(list,
600
            # Count errors
601
            map(compat.partial(itertools.ifilterfalse, bool),
602
                # Convert result to booleans in a tuple
603
                zip(*((job_success, len(op_results) == 1 and op_results[0])
604
                      for (job_success, op_results) in results)))))
605

    
606
  if bad_jobs == 0 and bad_results == 0:
607
    rcode = constants.EXIT_SUCCESS
608
  else:
609
    rcode = constants.EXIT_FAILURE
610
    if bad_jobs > 0:
611
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
612

    
613
  return rcode
614

    
615

    
616
def VerifyDisks(opts, args):
617
  """Verify integrity of cluster disks.
618

619
  @param opts: the command line options selected by the user
620
  @type args: list
621
  @param args: should be an empty list
622
  @rtype: int
623
  @return: the desired exit code
624

625
  """
626
  cl = GetClient()
627

    
628
  op = opcodes.OpClusterVerifyDisks()
629

    
630
  result = SubmitOpCode(op, cl=cl, opts=opts)
631

    
632
  # Keep track of submitted jobs
633
  jex = JobExecutor(cl=cl, opts=opts)
634

    
635
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
636
    jex.AddJobId(None, status, job_id)
637

    
638
  retcode = constants.EXIT_SUCCESS
639

    
640
  for (status, result) in jex.GetResults():
641
    if not status:
642
      ToStdout("Job failed: %s", result)
643
      continue
644

    
645
    ((bad_nodes, instances, missing), ) = result
646

    
647
    for node, text in bad_nodes.items():
648
      ToStdout("Error gathering data on node %s: %s",
649
               node, utils.SafeEncode(text[-400:]))
650
      retcode = constants.EXIT_FAILURE
651
      ToStdout("You need to fix these nodes first before fixing instances")
652

    
653
    for iname in instances:
654
      if iname in missing:
655
        continue
656
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
657
      try:
658
        ToStdout("Activating disks for instance '%s'", iname)
659
        SubmitOpCode(op, opts=opts, cl=cl)
660
      except errors.GenericError, err:
661
        nret, msg = FormatError(err)
662
        retcode |= nret
663
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
664

    
665
    if missing:
666
      for iname, ival in missing.iteritems():
667
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
668
        if all_missing:
669
          ToStdout("Instance %s cannot be verified as it lives on"
670
                   " broken nodes", iname)
671
        else:
672
          ToStdout("Instance %s has missing logical volumes:", iname)
673
          ival.sort()
674
          for node, vol in ival:
675
            if node in bad_nodes:
676
              ToStdout("\tbroken node %s /dev/%s", node, vol)
677
            else:
678
              ToStdout("\t%s /dev/%s", node, vol)
679

    
680
      ToStdout("You need to replace or recreate disks for all the above"
681
               " instances if this message persists after fixing broken nodes.")
682
      retcode = constants.EXIT_FAILURE
683
    elif not instances:
684
      ToStdout("No disks need to be activated.")
685

    
686
  return retcode
687

    
688

    
689
def RepairDiskSizes(opts, args):
690
  """Verify sizes of cluster disks.
691

692
  @param opts: the command line options selected by the user
693
  @type args: list
694
  @param args: optional list of instances to restrict check to
695
  @rtype: int
696
  @return: the desired exit code
697

698
  """
699
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
700
  SubmitOpCode(op, opts=opts)
701

    
702

    
703
@UsesRPC
704
def MasterFailover(opts, args):
705
  """Failover the master node.
706

707
  This command, when run on a non-master node, will cause the current
708
  master to cease being master, and the non-master to become new
709
  master.
710

711
  @param opts: the command line options selected by the user
712
  @type args: list
713
  @param args: should be an empty list
714
  @rtype: int
715
  @return: the desired exit code
716

717
  """
718
  if opts.no_voting and not opts.yes_do_it:
719
    usertext = ("This will perform the failover even if most other nodes"
720
                " are down, or if this node is outdated. This is dangerous"
721
                " as it can lead to a non-consistent cluster. Check the"
722
                " gnt-cluster(8) man page before proceeding. Continue?")
723
    if not AskUser(usertext):
724
      return 1
725

    
726
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
727

    
728

    
729
def MasterPing(opts, args):
730
  """Checks if the master is alive.
731

732
  @param opts: the command line options selected by the user
733
  @type args: list
734
  @param args: should be an empty list
735
  @rtype: int
736
  @return: the desired exit code
737

738
  """
739
  try:
740
    cl = GetClient()
741
    cl.QueryClusterInfo()
742
    return 0
743
  except Exception: # pylint: disable=W0703
744
    return 1
745

    
746

    
747
def SearchTags(opts, args):
748
  """Searches the tags on all the cluster.
749

750
  @param opts: the command line options selected by the user
751
  @type args: list
752
  @param args: should contain only one element, the tag pattern
753
  @rtype: int
754
  @return: the desired exit code
755

756
  """
757
  op = opcodes.OpTagsSearch(pattern=args[0])
758
  result = SubmitOpCode(op, opts=opts)
759
  if not result:
760
    return 1
761
  result = list(result)
762
  result.sort()
763
  for path, tag in result:
764
    ToStdout("%s %s", path, tag)
765

    
766

    
767
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
768
  """Reads and verifies an X509 certificate.
769

770
  @type cert_filename: string
771
  @param cert_filename: the path of the file containing the certificate to
772
                        verify encoded in PEM format
773
  @type verify_private_key: bool
774
  @param verify_private_key: whether to verify the private key in addition to
775
                             the public certificate
776
  @rtype: string
777
  @return: a string containing the PEM-encoded certificate.
778

779
  """
780
  try:
781
    pem = utils.ReadFile(cert_filename)
782
  except IOError, err:
783
    raise errors.X509CertError(cert_filename,
784
                               "Unable to read certificate: %s" % str(err))
785

    
786
  try:
787
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
788
  except Exception, err:
789
    raise errors.X509CertError(cert_filename,
790
                               "Unable to load certificate: %s" % str(err))
791

    
792
  if verify_private_key:
793
    try:
794
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
795
    except Exception, err:
796
      raise errors.X509CertError(cert_filename,
797
                                 "Unable to load private key: %s" % str(err))
798

    
799
  return pem
800

    
801

    
802
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
803
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
804
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
805
                 cds_filename, force):
806
  """Renews cluster certificates, keys and secrets.
807

808
  @type new_cluster_cert: bool
809
  @param new_cluster_cert: Whether to generate a new cluster certificate
810
  @type new_rapi_cert: bool
811
  @param new_rapi_cert: Whether to generate a new RAPI certificate
812
  @type rapi_cert_filename: string
813
  @param rapi_cert_filename: Path to file containing new RAPI certificate
814
  @type new_spice_cert: bool
815
  @param new_spice_cert: Whether to generate a new SPICE certificate
816
  @type spice_cert_filename: string
817
  @param spice_cert_filename: Path to file containing new SPICE certificate
818
  @type spice_cacert_filename: string
819
  @param spice_cacert_filename: Path to file containing the certificate of the
820
                                CA that signed the SPICE certificate
821
  @type new_confd_hmac_key: bool
822
  @param new_confd_hmac_key: Whether to generate a new HMAC key
823
  @type new_cds: bool
824
  @param new_cds: Whether to generate a new cluster domain secret
825
  @type cds_filename: string
826
  @param cds_filename: Path to file containing new cluster domain secret
827
  @type force: bool
828
  @param force: Whether to ask user for confirmation
829

830
  """
831
  if new_rapi_cert and rapi_cert_filename:
832
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
833
             " options can be specified at the same time.")
834
    return 1
835

    
836
  if new_cds and cds_filename:
837
    ToStderr("Only one of the --new-cluster-domain-secret and"
838
             " --cluster-domain-secret options can be specified at"
839
             " the same time.")
840
    return 1
841

    
842
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
843
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
844
             " and --spice-ca-certificate must not be used.")
845
    return 1
846

    
847
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
848
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
849
             " specified.")
850
    return 1
851

    
852
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
853
  try:
854
    if rapi_cert_filename:
855
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
856
    if spice_cert_filename:
857
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
858
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
859
  except errors.X509CertError, err:
860
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
861
    return 1
862

    
863
  if cds_filename:
864
    try:
865
      cds = utils.ReadFile(cds_filename)
866
    except Exception, err: # pylint: disable=W0703
867
      ToStderr("Can't load new cluster domain secret from %s: %s" %
868
               (cds_filename, str(err)))
869
      return 1
870
  else:
871
    cds = None
872

    
873
  if not force:
874
    usertext = ("This requires all daemons on all nodes to be restarted and"
875
                " may take some time. Continue?")
876
    if not AskUser(usertext):
877
      return 1
878

    
879
  def _RenewCryptoInner(ctx):
880
    ctx.feedback_fn("Updating certificates and keys")
881
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
882
                                    new_rapi_cert,
883
                                    new_spice_cert,
884
                                    new_confd_hmac_key,
885
                                    new_cds,
886
                                    rapi_cert_pem=rapi_cert_pem,
887
                                    spice_cert_pem=spice_cert_pem,
888
                                    spice_cacert_pem=spice_cacert_pem,
889
                                    cds=cds)
890

    
891
    files_to_copy = []
892

    
893
    if new_cluster_cert:
894
      files_to_copy.append(pathutils.NODED_CERT_FILE)
895

    
896
    if new_rapi_cert or rapi_cert_pem:
897
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
898

    
899
    if new_spice_cert or spice_cert_pem:
900
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
901
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
902

    
903
    if new_confd_hmac_key:
904
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
905

    
906
    if new_cds or cds:
907
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
908

    
909
    if files_to_copy:
910
      for node_name in ctx.nonmaster_nodes:
911
        ctx.feedback_fn("Copying %s to %s" %
912
                        (", ".join(files_to_copy), node_name))
913
        for file_name in files_to_copy:
914
          ctx.ssh.CopyFileToNode(node_name, file_name)
915

    
916
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
917

    
918
  ToStdout("All requested certificates and keys have been replaced."
919
           " Running \"gnt-cluster verify\" now is recommended.")
920

    
921
  return 0
922

    
923

    
924
def RenewCrypto(opts, args):
925
  """Renews cluster certificates, keys and secrets.
926

927
  """
928
  return _RenewCrypto(opts.new_cluster_cert,
929
                      opts.new_rapi_cert,
930
                      opts.rapi_cert,
931
                      opts.new_spice_cert,
932
                      opts.spice_cert,
933
                      opts.spice_cacert,
934
                      opts.new_confd_hmac_key,
935
                      opts.new_cluster_domain_secret,
936
                      opts.cluster_domain_secret,
937
                      opts.force)
938

    
939

    
940
def SetClusterParams(opts, args):
941
  """Modify the cluster.
942

943
  @param opts: the command line options selected by the user
944
  @type args: list
945
  @param args: should be an empty list
946
  @rtype: int
947
  @return: the desired exit code
948

949
  """
950
  if not (not opts.lvm_storage or opts.vg_name or
951
          not opts.drbd_storage or opts.drbd_helper or
952
          opts.enabled_hypervisors or opts.hvparams or
953
          opts.beparams or opts.nicparams or
954
          opts.ndparams or opts.diskparams or
955
          opts.candidate_pool_size is not None or
956
          opts.uid_pool is not None or
957
          opts.maintain_node_health is not None or
958
          opts.add_uids is not None or
959
          opts.remove_uids is not None or
960
          opts.default_iallocator is not None or
961
          opts.reserved_lvs is not None or
962
          opts.master_netdev is not None or
963
          opts.master_netmask is not None or
964
          opts.use_external_mip_script is not None or
965
          opts.prealloc_wipe_disks is not None or
966
          opts.hv_state or
967
          opts.enabled_disk_templates or
968
          opts.disk_state or
969
          opts.ispecs_mem_size or
970
          opts.ispecs_cpu_count or
971
          opts.ispecs_disk_count or
972
          opts.ispecs_disk_size or
973
          opts.ispecs_nic_count or
974
          opts.ipolicy_bounds_specs is not None or
975
          opts.ipolicy_std_specs is not None or
976
          opts.ipolicy_disk_templates is not None or
977
          opts.ipolicy_vcpu_ratio is not None or
978
          opts.ipolicy_spindle_ratio is not None):
979
    ToStderr("Please give at least one of the parameters.")
980
    return 1
981

    
982
  vg_name = opts.vg_name
983
  if not opts.lvm_storage and opts.vg_name:
984
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
985
    return 1
986

    
987
  if not opts.lvm_storage:
988
    vg_name = ""
989

    
990
  drbd_helper = opts.drbd_helper
991
  if not opts.drbd_storage and opts.drbd_helper:
992
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
993
    return 1
994

    
995
  if not opts.drbd_storage:
996
    drbd_helper = ""
997

    
998
  hvlist = opts.enabled_hypervisors
999
  if hvlist is not None:
1000
    hvlist = hvlist.split(",")
1001

    
1002
  enabled_disk_templates = opts.enabled_disk_templates
1003
  if enabled_disk_templates:
1004
    enabled_disk_templates = enabled_disk_templates.split(",")
1005

    
1006
  # a list of (name, dict) we can pass directly to dict() (or [])
1007
  hvparams = dict(opts.hvparams)
1008
  for hv_params in hvparams.values():
1009
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1010

    
1011
  diskparams = dict(opts.diskparams)
1012

    
1013
  for dt_params in diskparams.values():
1014
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1015

    
1016
  beparams = opts.beparams
1017
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1018

    
1019
  nicparams = opts.nicparams
1020
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1021

    
1022
  ndparams = opts.ndparams
1023
  if ndparams is not None:
1024
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1025

    
1026
  ipolicy = CreateIPolicyFromOpts(
1027
    ispecs_mem_size=opts.ispecs_mem_size,
1028
    ispecs_cpu_count=opts.ispecs_cpu_count,
1029
    ispecs_disk_count=opts.ispecs_disk_count,
1030
    ispecs_disk_size=opts.ispecs_disk_size,
1031
    ispecs_nic_count=opts.ispecs_nic_count,
1032
    minmax_ispecs=opts.ipolicy_bounds_specs,
1033
    std_ispecs=opts.ipolicy_std_specs,
1034
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1035
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1036
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1037
    )
1038

    
1039
  mnh = opts.maintain_node_health
1040

    
1041
  uid_pool = opts.uid_pool
1042
  if uid_pool is not None:
1043
    uid_pool = uidpool.ParseUidPool(uid_pool)
1044

    
1045
  add_uids = opts.add_uids
1046
  if add_uids is not None:
1047
    add_uids = uidpool.ParseUidPool(add_uids)
1048

    
1049
  remove_uids = opts.remove_uids
1050
  if remove_uids is not None:
1051
    remove_uids = uidpool.ParseUidPool(remove_uids)
1052

    
1053
  if opts.reserved_lvs is not None:
1054
    if opts.reserved_lvs == "":
1055
      opts.reserved_lvs = []
1056
    else:
1057
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1058

    
1059
  if opts.master_netmask is not None:
1060
    try:
1061
      opts.master_netmask = int(opts.master_netmask)
1062
    except ValueError:
1063
      ToStderr("The --master-netmask option expects an int parameter.")
1064
      return 1
1065

    
1066
  ext_ip_script = opts.use_external_mip_script
1067

    
1068
  if opts.disk_state:
1069
    disk_state = utils.FlatToDict(opts.disk_state)
1070
  else:
1071
    disk_state = {}
1072

    
1073
  hv_state = dict(opts.hv_state)
1074

    
1075
  op = opcodes.OpClusterSetParams(
1076
    vg_name=vg_name,
1077
    drbd_helper=drbd_helper,
1078
    enabled_hypervisors=hvlist,
1079
    hvparams=hvparams,
1080
    os_hvp=None,
1081
    beparams=beparams,
1082
    nicparams=nicparams,
1083
    ndparams=ndparams,
1084
    diskparams=diskparams,
1085
    ipolicy=ipolicy,
1086
    candidate_pool_size=opts.candidate_pool_size,
1087
    maintain_node_health=mnh,
1088
    uid_pool=uid_pool,
1089
    add_uids=add_uids,
1090
    remove_uids=remove_uids,
1091
    default_iallocator=opts.default_iallocator,
1092
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1093
    master_netdev=opts.master_netdev,
1094
    master_netmask=opts.master_netmask,
1095
    reserved_lvs=opts.reserved_lvs,
1096
    use_external_mip_script=ext_ip_script,
1097
    hv_state=hv_state,
1098
    disk_state=disk_state,
1099
    enabled_disk_templates=enabled_disk_templates,
1100
    )
1101
  SubmitOrSend(op, opts)
1102
  return 0
1103

    
1104

    
1105
def QueueOps(opts, args):
1106
  """Queue operations.
1107

1108
  @param opts: the command line options selected by the user
1109
  @type args: list
1110
  @param args: should contain only one element, the subcommand
1111
  @rtype: int
1112
  @return: the desired exit code
1113

1114
  """
1115
  command = args[0]
1116
  client = GetClient()
1117
  if command in ("drain", "undrain"):
1118
    drain_flag = command == "drain"
1119
    client.SetQueueDrainFlag(drain_flag)
1120
  elif command == "info":
1121
    result = client.QueryConfigValues(["drain_flag"])
1122
    if result[0]:
1123
      val = "set"
1124
    else:
1125
      val = "unset"
1126
    ToStdout("The drain flag is %s" % val)
1127
  else:
1128
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1129
                               errors.ECODE_INVAL)
1130

    
1131
  return 0
1132

    
1133

    
1134
def _ShowWatcherPause(until):
1135
  if until is None or until < time.time():
1136
    ToStdout("The watcher is not paused.")
1137
  else:
1138
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1139

    
1140

    
1141
def WatcherOps(opts, args):
1142
  """Watcher operations.
1143

1144
  @param opts: the command line options selected by the user
1145
  @type args: list
1146
  @param args: should contain only one element, the subcommand
1147
  @rtype: int
1148
  @return: the desired exit code
1149

1150
  """
1151
  command = args[0]
1152
  client = GetClient()
1153

    
1154
  if command == "continue":
1155
    client.SetWatcherPause(None)
1156
    ToStdout("The watcher is no longer paused.")
1157

    
1158
  elif command == "pause":
1159
    if len(args) < 2:
1160
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1161

    
1162
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1163
    _ShowWatcherPause(result)
1164

    
1165
  elif command == "info":
1166
    result = client.QueryConfigValues(["watcher_pause"])
1167
    _ShowWatcherPause(result[0])
1168

    
1169
  else:
1170
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1171
                               errors.ECODE_INVAL)
1172

    
1173
  return 0
1174

    
1175

    
1176
def _OobPower(opts, node_list, power):
1177
  """Puts the node in the list to desired power state.
1178

1179
  @param opts: The command line options selected by the user
1180
  @param node_list: The list of nodes to operate on
1181
  @param power: True if they should be powered on, False otherwise
1182
  @return: The success of the operation (none failed)
1183

1184
  """
1185
  if power:
1186
    command = constants.OOB_POWER_ON
1187
  else:
1188
    command = constants.OOB_POWER_OFF
1189

    
1190
  op = opcodes.OpOobCommand(node_names=node_list,
1191
                            command=command,
1192
                            ignore_status=True,
1193
                            timeout=opts.oob_timeout,
1194
                            power_delay=opts.power_delay)
1195
  result = SubmitOpCode(op, opts=opts)
1196
  errs = 0
1197
  for node_result in result:
1198
    (node_tuple, data_tuple) = node_result
1199
    (_, node_name) = node_tuple
1200
    (data_status, _) = data_tuple
1201
    if data_status != constants.RS_NORMAL:
1202
      assert data_status != constants.RS_UNAVAIL
1203
      errs += 1
1204
      ToStderr("There was a problem changing power for %s, please investigate",
1205
               node_name)
1206

    
1207
  if errs > 0:
1208
    return False
1209

    
1210
  return True
1211

    
1212

    
1213
def _InstanceStart(opts, inst_list, start, no_remember=False):
1214
  """Puts the instances in the list to desired state.
1215

1216
  @param opts: The command line options selected by the user
1217
  @param inst_list: The list of instances to operate on
1218
  @param start: True if they should be started, False for shutdown
1219
  @param no_remember: If the instance state should be remembered
1220
  @return: The success of the operation (none failed)
1221

1222
  """
1223
  if start:
1224
    opcls = opcodes.OpInstanceStartup
1225
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1226
  else:
1227
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1228
                           timeout=opts.shutdown_timeout,
1229
                           no_remember=no_remember)
1230
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1231

    
1232
  jex = JobExecutor(opts=opts)
1233

    
1234
  for inst in inst_list:
1235
    ToStdout("Submit %s of instance %s", text_submit, inst)
1236
    op = opcls(instance_name=inst)
1237
    jex.QueueJob(inst, op)
1238

    
1239
  results = jex.GetResults()
1240
  bad_cnt = len([1 for (success, _) in results if not success])
1241

    
1242
  if bad_cnt == 0:
1243
    ToStdout("All instances have been %s successfully", text_success)
1244
  else:
1245
    ToStderr("There were errors while %s instances:\n"
1246
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1247
             len(results))
1248
    return False
1249

    
1250
  return True
1251

    
1252

    
1253
class _RunWhenNodesReachableHelper:
1254
  """Helper class to make shared internal state sharing easier.
1255

1256
  @ivar success: Indicates if all action_cb calls were successful
1257

1258
  """
1259
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1260
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1261
    """Init the object.
1262

1263
    @param node_list: The list of nodes to be reachable
1264
    @param action_cb: Callback called when a new host is reachable
1265
    @type node2ip: dict
1266
    @param node2ip: Node to ip mapping
1267
    @param port: The port to use for the TCP ping
1268
    @param feedback_fn: The function used for feedback
1269
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1270
    @param _sleep_fn: Function to sleep (for unittest use only)
1271

1272
    """
1273
    self.down = set(node_list)
1274
    self.up = set()
1275
    self.node2ip = node2ip
1276
    self.success = True
1277
    self.action_cb = action_cb
1278
    self.port = port
1279
    self.feedback_fn = feedback_fn
1280
    self._ping_fn = _ping_fn
1281
    self._sleep_fn = _sleep_fn
1282

    
1283
  def __call__(self):
1284
    """When called we run action_cb.
1285

1286
    @raises utils.RetryAgain: When there are still down nodes
1287

1288
    """
1289
    if not self.action_cb(self.up):
1290
      self.success = False
1291

    
1292
    if self.down:
1293
      raise utils.RetryAgain()
1294
    else:
1295
      return self.success
1296

    
1297
  def Wait(self, secs):
1298
    """Checks if a host is up or waits remaining seconds.
1299

1300
    @param secs: The secs remaining
1301

1302
    """
1303
    start = time.time()
1304
    for node in self.down:
1305
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1306
                       live_port_needed=True):
1307
        self.feedback_fn("Node %s became available" % node)
1308
        self.up.add(node)
1309
        self.down -= self.up
1310
        # If we have a node available there is the possibility to run the
1311
        # action callback successfully, therefore we don't wait and return
1312
        return
1313

    
1314
    self._sleep_fn(max(0.0, start + secs - time.time()))
1315

    
1316

    
1317
def _RunWhenNodesReachable(node_list, action_cb, interval):
1318
  """Run action_cb when nodes become reachable.
1319

1320
  @param node_list: The list of nodes to be reachable
1321
  @param action_cb: Callback called when a new host is reachable
1322
  @param interval: The earliest time to retry
1323

1324
  """
1325
  client = GetClient()
1326
  cluster_info = client.QueryClusterInfo()
1327
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1328
    family = netutils.IPAddress.family
1329
  else:
1330
    family = netutils.IP6Address.family
1331

    
1332
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1333
                 for node in node_list)
1334

    
1335
  port = netutils.GetDaemonPort(constants.NODED)
1336
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1337
                                        ToStdout)
1338

    
1339
  try:
1340
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1341
                       wait_fn=helper.Wait)
1342
  except utils.RetryTimeout:
1343
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1344
             " again:\n  - %s", "  - ".join(helper.down))
1345
    return False
1346

    
1347

    
1348
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1349
                          _instance_start_fn=_InstanceStart):
1350
  """Start the instances conditional based on node_states.
1351

1352
  @param opts: The command line options selected by the user
1353
  @param inst_map: A dict of inst -> nodes mapping
1354
  @param nodes_online: A list of nodes online
1355
  @param _instance_start_fn: Callback to start instances (unittest use only)
1356
  @return: Success of the operation on all instances
1357

1358
  """
1359
  start_inst_list = []
1360
  for (inst, nodes) in inst_map.items():
1361
    if not (nodes - nodes_online):
1362
      # All nodes the instance lives on are back online
1363
      start_inst_list.append(inst)
1364

    
1365
  for inst in start_inst_list:
1366
    del inst_map[inst]
1367

    
1368
  if start_inst_list:
1369
    return _instance_start_fn(opts, start_inst_list, True)
1370

    
1371
  return True
1372

    
1373

    
1374
def _EpoOn(opts, full_node_list, node_list, inst_map):
1375
  """Does the actual power on.
1376

1377
  @param opts: The command line options selected by the user
1378
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1379
                         OOB)
1380
  @param node_list: The list of nodes to operate on (all need to support OOB)
1381
  @param inst_map: A dict of inst -> nodes mapping
1382
  @return: The desired exit status
1383

1384
  """
1385
  if node_list and not _OobPower(opts, node_list, False):
1386
    ToStderr("Not all nodes seem to get back up, investigate and start"
1387
             " manually if needed")
1388

    
1389
  # Wait for the nodes to be back up
1390
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1391

    
1392
  ToStdout("Waiting until all nodes are available again")
1393
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1394
    ToStderr("Please investigate and start stopped instances manually")
1395
    return constants.EXIT_FAILURE
1396

    
1397
  return constants.EXIT_SUCCESS
1398

    
1399

    
1400
def _EpoOff(opts, node_list, inst_map):
1401
  """Does the actual power off.
1402

1403
  @param opts: The command line options selected by the user
1404
  @param node_list: The list of nodes to operate on (all need to support OOB)
1405
  @param inst_map: A dict of inst -> nodes mapping
1406
  @return: The desired exit status
1407

1408
  """
1409
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1410
    ToStderr("Please investigate and stop instances manually before continuing")
1411
    return constants.EXIT_FAILURE
1412

    
1413
  if not node_list:
1414
    return constants.EXIT_SUCCESS
1415

    
1416
  if _OobPower(opts, node_list, False):
1417
    return constants.EXIT_SUCCESS
1418
  else:
1419
    return constants.EXIT_FAILURE
1420

    
1421

    
1422
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1423
        _confirm_fn=ConfirmOperation,
1424
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1425
  """EPO operations.
1426

1427
  @param opts: the command line options selected by the user
1428
  @type args: list
1429
  @param args: should contain only one element, the subcommand
1430
  @rtype: int
1431
  @return: the desired exit code
1432

1433
  """
1434
  if opts.groups and opts.show_all:
1435
    _stderr_fn("Only one of --groups or --all are allowed")
1436
    return constants.EXIT_FAILURE
1437
  elif args and opts.show_all:
1438
    _stderr_fn("Arguments in combination with --all are not allowed")
1439
    return constants.EXIT_FAILURE
1440

    
1441
  if cl is None:
1442
    cl = GetClient()
1443

    
1444
  if opts.groups:
1445
    node_query_list = \
1446
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1447
  else:
1448
    node_query_list = args
1449

    
1450
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1451
                                           "sinst_list", "powered", "offline"],
1452
                         False)
1453

    
1454
  all_nodes = map(compat.fst, result)
1455
  node_list = []
1456
  inst_map = {}
1457
  for (node, master, pinsts, sinsts, powered, offline) in result:
1458
    if not offline:
1459
      for inst in (pinsts + sinsts):
1460
        if inst in inst_map:
1461
          if not master:
1462
            inst_map[inst].add(node)
1463
        elif master:
1464
          inst_map[inst] = set()
1465
        else:
1466
          inst_map[inst] = set([node])
1467

    
1468
    if master and opts.on:
1469
      # We ignore the master for turning on the machines, in fact we are
1470
      # already operating on the master at this point :)
1471
      continue
1472
    elif master and not opts.show_all:
1473
      _stderr_fn("%s is the master node, please do a master-failover to another"
1474
                 " node not affected by the EPO or use --all if you intend to"
1475
                 " shutdown the whole cluster", node)
1476
      return constants.EXIT_FAILURE
1477
    elif powered is None:
1478
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1479
                 " handled in a fully automated manner", node)
1480
    elif powered == opts.on:
1481
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1482
    elif not offline or (offline and powered):
1483
      node_list.append(node)
1484

    
1485
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1486
    return constants.EXIT_FAILURE
1487

    
1488
  if opts.on:
1489
    return _on_fn(opts, all_nodes, node_list, inst_map)
1490
  else:
1491
    return _off_fn(opts, node_list, inst_map)
1492

    
1493

    
1494
commands = {
1495
  "init": (
1496
    InitCluster, [ArgHost(min=1, max=1)],
1497
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1498
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1499
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1500
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1501
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1502
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1503
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1504
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
1505
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS,
1506
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1507
  "destroy": (
1508
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1509
    "", "Destroy cluster"),
1510
  "rename": (
1511
    RenameCluster, [ArgHost(min=1, max=1)],
1512
    [FORCE_OPT, DRY_RUN_OPT],
1513
    "<new_name>",
1514
    "Renames the cluster"),
1515
  "redist-conf": (
1516
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1517
    "", "Forces a push of the configuration file and ssconf files"
1518
    " to the nodes in the cluster"),
1519
  "verify": (
1520
    VerifyCluster, ARGS_NONE,
1521
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1522
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1523
    "", "Does a check on the cluster configuration"),
1524
  "verify-disks": (
1525
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1526
    "", "Does a check on the cluster disk status"),
1527
  "repair-disk-sizes": (
1528
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1529
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1530
  "master-failover": (
1531
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1532
    "", "Makes the current node the master"),
1533
  "master-ping": (
1534
    MasterPing, ARGS_NONE, [],
1535
    "", "Checks if the master is alive"),
1536
  "version": (
1537
    ShowClusterVersion, ARGS_NONE, [],
1538
    "", "Shows the cluster version"),
1539
  "getmaster": (
1540
    ShowClusterMaster, ARGS_NONE, [],
1541
    "", "Shows the cluster master"),
1542
  "copyfile": (
1543
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1544
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1545
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1546
  "command": (
1547
    RunClusterCommand, [ArgCommand(min=1)],
1548
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1549
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1550
  "info": (
1551
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1552
    "[--roman]", "Show cluster configuration"),
1553
  "list-tags": (
1554
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1555
  "add-tags": (
1556
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1557
    "tag...", "Add tags to the cluster"),
1558
  "remove-tags": (
1559
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1560
    "tag...", "Remove tags from the cluster"),
1561
  "search-tags": (
1562
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1563
    "Searches the tags on all objects on"
1564
    " the cluster for a given pattern (regex)"),
1565
  "queue": (
1566
    QueueOps,
1567
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1568
    [], "drain|undrain|info", "Change queue properties"),
1569
  "watcher": (
1570
    WatcherOps,
1571
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1572
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1573
    [],
1574
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1575
  "modify": (
1576
    SetClusterParams, ARGS_NONE,
1577
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1578
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1579
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1580
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1581
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1582
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1583
     DISK_STATE_OPT, SUBMIT_OPT, ENABLED_DISK_TEMPLATES_OPT,
1584
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS,
1585
    "[opts...]",
1586
    "Alters the parameters of the cluster"),
1587
  "renew-crypto": (
1588
    RenewCrypto, ARGS_NONE,
1589
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1590
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1591
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1592
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1593
    "[opts...]",
1594
    "Renews cluster certificates, keys and secrets"),
1595
  "epo": (
1596
    Epo, [ArgUnknown()],
1597
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1598
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1599
    "[opts...] [args]",
1600
    "Performs an emergency power-off on given args"),
1601
  "activate-master-ip": (
1602
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1603
  "deactivate-master-ip": (
1604
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1605
    "Deactivates the master IP"),
1606
  }
1607

    
1608

    
1609
#: dictionary with aliases for commands
1610
aliases = {
1611
  "masterfailover": "master-failover",
1612
  "show": "info",
1613
}
1614

    
1615

    
1616
def Main():
1617
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1618
                     aliases=aliases)