Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ eeaa5f6c

History | View | Annotate | Download (51.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45
from ganeti import pathutils
46

    
47

    
48
ON_OPT = cli_option("--on", default=False,
49
                    action="store_true", dest="on",
50
                    help="Recover from an EPO")
51

    
52
GROUPS_OPT = cli_option("--groups", default=False,
53
                        action="store_true", dest="groups",
54
                        help="Arguments are node groups instead of nodes")
55

    
56
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
57
                            help="Override interactive check for --no-voting",
58
                            default=False, action="store_true")
59

    
60
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
61
_EPO_PING_TIMEOUT = 1 # 1 second
62
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
63

    
64

    
65
@UsesRPC
66
def InitCluster(opts, args):
67
  """Initialize the cluster.
68

69
  @param opts: the command line options selected by the user
70
  @type args: list
71
  @param args: should contain only one element, the desired
72
      cluster name
73
  @rtype: int
74
  @return: the desired exit code
75

76
  """
77
  if not opts.lvm_storage and opts.vg_name:
78
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
79
    return 1
80

    
81
  vg_name = opts.vg_name
82
  if opts.lvm_storage and not opts.vg_name:
83
    vg_name = constants.DEFAULT_VG
84

    
85
  if not opts.drbd_storage and opts.drbd_helper:
86
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
87
    return 1
88

    
89
  drbd_helper = opts.drbd_helper
90
  if opts.drbd_storage and not opts.drbd_helper:
91
    drbd_helper = constants.DEFAULT_DRBD_HELPER
92

    
93
  master_netdev = opts.master_netdev
94
  if master_netdev is None:
95
    master_netdev = constants.DEFAULT_BRIDGE
96

    
97
  hvlist = opts.enabled_hypervisors
98
  if hvlist is None:
99
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
100
  hvlist = hvlist.split(",")
101

    
102
  hvparams = dict(opts.hvparams)
103
  beparams = opts.beparams
104
  nicparams = opts.nicparams
105

    
106
  diskparams = dict(opts.diskparams)
107

    
108
  # check the disk template types here, as we cannot rely on the type check done
109
  # by the opcode parameter types
110
  diskparams_keys = set(diskparams.keys())
111
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
112
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
113
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
114
    return 1
115

    
116
  # prepare beparams dict
117
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
118
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
119

    
120
  # prepare nicparams dict
121
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
122
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
123

    
124
  # prepare ndparams dict
125
  if opts.ndparams is None:
126
    ndparams = dict(constants.NDC_DEFAULTS)
127
  else:
128
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
129
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
130

    
131
  # prepare hvparams dict
132
  for hv in constants.HYPER_TYPES:
133
    if hv not in hvparams:
134
      hvparams[hv] = {}
135
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
136
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
137

    
138
  # prepare diskparams dict
139
  for templ in constants.DISK_TEMPLATES:
140
    if templ not in diskparams:
141
      diskparams[templ] = {}
142
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
143
                                         diskparams[templ])
144
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
145

    
146
  # prepare ipolicy dict
147
  ipolicy = CreateIPolicyFromOpts(
148
    ispecs_mem_size=opts.ispecs_mem_size,
149
    ispecs_cpu_count=opts.ispecs_cpu_count,
150
    ispecs_disk_count=opts.ispecs_disk_count,
151
    ispecs_disk_size=opts.ispecs_disk_size,
152
    ispecs_nic_count=opts.ispecs_nic_count,
153
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
154
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
155
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
156
    fill_all=True)
157

    
158
  if opts.candidate_pool_size is None:
159
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
160

    
161
  if opts.mac_prefix is None:
162
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
163

    
164
  uid_pool = opts.uid_pool
165
  if uid_pool is not None:
166
    uid_pool = uidpool.ParseUidPool(uid_pool)
167

    
168
  if opts.prealloc_wipe_disks is None:
169
    opts.prealloc_wipe_disks = False
170

    
171
  external_ip_setup_script = opts.use_external_mip_script
172
  if external_ip_setup_script is None:
173
    external_ip_setup_script = False
174

    
175
  try:
176
    primary_ip_version = int(opts.primary_ip_version)
177
  except (ValueError, TypeError), err:
178
    ToStderr("Invalid primary ip version value: %s" % str(err))
179
    return 1
180

    
181
  master_netmask = opts.master_netmask
182
  try:
183
    if master_netmask is not None:
184
      master_netmask = int(master_netmask)
185
  except (ValueError, TypeError), err:
186
    ToStderr("Invalid master netmask value: %s" % str(err))
187
    return 1
188

    
189
  if opts.disk_state:
190
    disk_state = utils.FlatToDict(opts.disk_state)
191
  else:
192
    disk_state = {}
193

    
194
  hv_state = dict(opts.hv_state)
195

    
196
  enabled_storage_types = opts.enabled_storage_types
197
  if enabled_storage_types is not None:
198
    enabled_storage_types = enabled_storage_types.split(",")
199
  else:
200
    enabled_storage_types = list(constants.DEFAULT_ENABLED_STORAGE_TYPES)
201

    
202
  bootstrap.InitCluster(cluster_name=args[0],
203
                        secondary_ip=opts.secondary_ip,
204
                        vg_name=vg_name,
205
                        mac_prefix=opts.mac_prefix,
206
                        master_netmask=master_netmask,
207
                        master_netdev=master_netdev,
208
                        file_storage_dir=opts.file_storage_dir,
209
                        shared_file_storage_dir=opts.shared_file_storage_dir,
210
                        enabled_hypervisors=hvlist,
211
                        hvparams=hvparams,
212
                        beparams=beparams,
213
                        nicparams=nicparams,
214
                        ndparams=ndparams,
215
                        diskparams=diskparams,
216
                        ipolicy=ipolicy,
217
                        candidate_pool_size=opts.candidate_pool_size,
218
                        modify_etc_hosts=opts.modify_etc_hosts,
219
                        modify_ssh_setup=opts.modify_ssh_setup,
220
                        maintain_node_health=opts.maintain_node_health,
221
                        drbd_helper=drbd_helper,
222
                        uid_pool=uid_pool,
223
                        default_iallocator=opts.default_iallocator,
224
                        primary_ip_version=primary_ip_version,
225
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
226
                        use_external_mip_script=external_ip_setup_script,
227
                        hv_state=hv_state,
228
                        disk_state=disk_state,
229
                        enabled_storage_types=enabled_storage_types,
230
                        )
231
  op = opcodes.OpClusterPostInit()
232
  SubmitOpCode(op, opts=opts)
233
  return 0
234

    
235

    
236
@UsesRPC
237
def DestroyCluster(opts, args):
238
  """Destroy the cluster.
239

240
  @param opts: the command line options selected by the user
241
  @type args: list
242
  @param args: should be an empty list
243
  @rtype: int
244
  @return: the desired exit code
245

246
  """
247
  if not opts.yes_do_it:
248
    ToStderr("Destroying a cluster is irreversible. If you really want"
249
             " destroy this cluster, supply the --yes-do-it option.")
250
    return 1
251

    
252
  op = opcodes.OpClusterDestroy()
253
  master = SubmitOpCode(op, opts=opts)
254
  # if we reached this, the opcode didn't fail; we can proceed to
255
  # shutdown all the daemons
256
  bootstrap.FinalizeClusterDestroy(master)
257
  return 0
258

    
259

    
260
def RenameCluster(opts, args):
261
  """Rename the cluster.
262

263
  @param opts: the command line options selected by the user
264
  @type args: list
265
  @param args: should contain only one element, the new cluster name
266
  @rtype: int
267
  @return: the desired exit code
268

269
  """
270
  cl = GetClient()
271

    
272
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
273

    
274
  new_name = args[0]
275
  if not opts.force:
276
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
277
                " connected over the network to the cluster name, the"
278
                " operation is very dangerous as the IP address will be"
279
                " removed from the node and the change may not go through."
280
                " Continue?") % (cluster_name, new_name)
281
    if not AskUser(usertext):
282
      return 1
283

    
284
  op = opcodes.OpClusterRename(name=new_name)
285
  result = SubmitOpCode(op, opts=opts, cl=cl)
286

    
287
  if result:
288
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
289

    
290
  return 0
291

    
292

    
293
def ActivateMasterIp(opts, args):
294
  """Activates the master IP.
295

296
  """
297
  op = opcodes.OpClusterActivateMasterIp()
298
  SubmitOpCode(op)
299
  return 0
300

    
301

    
302
def DeactivateMasterIp(opts, args):
303
  """Deactivates the master IP.
304

305
  """
306
  if not opts.confirm:
307
    usertext = ("This will disable the master IP. All the open connections to"
308
                " the master IP will be closed. To reach the master you will"
309
                " need to use its node IP."
310
                " Continue?")
311
    if not AskUser(usertext):
312
      return 1
313

    
314
  op = opcodes.OpClusterDeactivateMasterIp()
315
  SubmitOpCode(op)
316
  return 0
317

    
318

    
319
def RedistributeConfig(opts, args):
320
  """Forces push of the cluster configuration.
321

322
  @param opts: the command line options selected by the user
323
  @type args: list
324
  @param args: empty list
325
  @rtype: int
326
  @return: the desired exit code
327

328
  """
329
  op = opcodes.OpClusterRedistConf()
330
  SubmitOrSend(op, opts)
331
  return 0
332

    
333

    
334
def ShowClusterVersion(opts, args):
335
  """Write version of ganeti software to the standard output.
336

337
  @param opts: the command line options selected by the user
338
  @type args: list
339
  @param args: should be an empty list
340
  @rtype: int
341
  @return: the desired exit code
342

343
  """
344
  cl = GetClient(query=True)
345
  result = cl.QueryClusterInfo()
346
  ToStdout("Software version: %s", result["software_version"])
347
  ToStdout("Internode protocol: %s", result["protocol_version"])
348
  ToStdout("Configuration format: %s", result["config_version"])
349
  ToStdout("OS api version: %s", result["os_api_version"])
350
  ToStdout("Export interface: %s", result["export_version"])
351
  return 0
352

    
353

    
354
def ShowClusterMaster(opts, args):
355
  """Write name of master node to the standard output.
356

357
  @param opts: the command line options selected by the user
358
  @type args: list
359
  @param args: should be an empty list
360
  @rtype: int
361
  @return: the desired exit code
362

363
  """
364
  master = bootstrap.GetMaster()
365
  ToStdout(master)
366
  return 0
367

    
368

    
369
def _FormatGroupedParams(paramsdict, roman=False):
370
  """Format Grouped parameters (be, nic, disk) by group.
371

372
  @type paramsdict: dict of dicts
373
  @param paramsdict: {group: {param: value, ...}, ...}
374
  @rtype: dict of dicts
375
  @return: copy of the input dictionaries with strings as values
376

377
  """
378
  ret = {}
379
  for (item, val) in paramsdict.items():
380
    if isinstance(val, dict):
381
      ret[item] = _FormatGroupedParams(val, roman=roman)
382
    elif roman and isinstance(val, int):
383
      ret[item] = compat.TryToRoman(val)
384
    else:
385
      ret[item] = str(val)
386
  return ret
387

    
388

    
389
def ShowClusterConfig(opts, args):
390
  """Shows cluster information.
391

392
  @param opts: the command line options selected by the user
393
  @type args: list
394
  @param args: should be an empty list
395
  @rtype: int
396
  @return: the desired exit code
397

398
  """
399
  cl = GetClient(query=True)
400
  result = cl.QueryClusterInfo()
401

    
402
  if result["tags"]:
403
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
404
  else:
405
    tags = "(none)"
406
  if result["reserved_lvs"]:
407
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
408
  else:
409
    reserved_lvs = "(none)"
410

    
411
  info = [
412
    ("Cluster name", result["name"]),
413
    ("Cluster UUID", result["uuid"]),
414

    
415
    ("Creation time", utils.FormatTime(result["ctime"])),
416
    ("Modification time", utils.FormatTime(result["mtime"])),
417

    
418
    ("Master node", result["master"]),
419

    
420
    ("Architecture (this node)",
421
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
422

    
423
    ("Tags", tags),
424

    
425
    ("Default hypervisor", result["default_hypervisor"]),
426
    ("Enabled hypervisors",
427
     utils.CommaJoin(result["enabled_hypervisors"])),
428

    
429
    ("Hypervisor parameters", _FormatGroupedParams(result["hvparams"])),
430

    
431
    ("OS-specific hypervisor parameters",
432
     _FormatGroupedParams(result["os_hvp"])),
433

    
434
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
435

    
436
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
437
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
438

    
439
    ("Cluster parameters", [
440
      ("candidate pool size",
441
       compat.TryToRoman(result["candidate_pool_size"],
442
                         convert=opts.roman_integers)),
443
      ("master netdev", result["master_netdev"]),
444
      ("master netmask", result["master_netmask"]),
445
      ("use external master IP address setup script",
446
       result["use_external_mip_script"]),
447
      ("lvm volume group", result["volume_group_name"]),
448
      ("lvm reserved volumes", reserved_lvs),
449
      ("drbd usermode helper", result["drbd_usermode_helper"]),
450
      ("file storage path", result["file_storage_dir"]),
451
      ("shared file storage path", result["shared_file_storage_dir"]),
452
      ("maintenance of node health", result["maintain_node_health"]),
453
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
454
      ("default instance allocator", result["default_iallocator"]),
455
      ("primary ip version", result["primary_ip_version"]),
456
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
457
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
458
      ("ExtStorage Providers search path",
459
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
460
      ("enabled storage types",
461
       utils.CommaJoin(result["enabled_storage_types"])),
462
      ]),
463

    
464
    ("Default node parameters",
465
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
466

    
467
    ("Default instance parameters",
468
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
469

    
470
    ("Default nic parameters",
471
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
472

    
473
    ("Default disk parameters",
474
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
475

    
476
    ("Instance policy - limits for instances",
477
     [
478
       (key,
479
        _FormatGroupedParams(result["ipolicy"][key], roman=opts.roman_integers))
480
       for key in constants.IPOLICY_ISPECS
481
       ] +
482
     [
483
       ("enabled disk templates",
484
        utils.CommaJoin(result["ipolicy"][constants.IPOLICY_DTS])),
485
       ] +
486
     [
487
       (key, result["ipolicy"][key])
488
       for key in constants.IPOLICY_PARAMETERS
489
       ]),
490
    ]
491

    
492
  PrintGenericInfo(info)
493
  return 0
494

    
495

    
496
def ClusterCopyFile(opts, args):
497
  """Copy a file from master to some nodes.
498

499
  @param opts: the command line options selected by the user
500
  @type args: list
501
  @param args: should contain only one element, the path of
502
      the file to be copied
503
  @rtype: int
504
  @return: the desired exit code
505

506
  """
507
  filename = args[0]
508
  if not os.path.exists(filename):
509
    raise errors.OpPrereqError("No such filename '%s'" % filename,
510
                               errors.ECODE_INVAL)
511

    
512
  cl = GetClient()
513

    
514
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
515

    
516
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
517
                           secondary_ips=opts.use_replication_network,
518
                           nodegroup=opts.nodegroup)
519

    
520
  srun = ssh.SshRunner(cluster_name)
521
  for node in results:
522
    if not srun.CopyFileToNode(node, filename):
523
      ToStderr("Copy of file %s to node %s failed", filename, node)
524

    
525
  return 0
526

    
527

    
528
def RunClusterCommand(opts, args):
529
  """Run a command on some nodes.
530

531
  @param opts: the command line options selected by the user
532
  @type args: list
533
  @param args: should contain the command to be run and its arguments
534
  @rtype: int
535
  @return: the desired exit code
536

537
  """
538
  cl = GetClient()
539

    
540
  command = " ".join(args)
541

    
542
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
543

    
544
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
545
                                                    "master_node"])
546

    
547
  srun = ssh.SshRunner(cluster_name=cluster_name)
548

    
549
  # Make sure master node is at list end
550
  if master_node in nodes:
551
    nodes.remove(master_node)
552
    nodes.append(master_node)
553

    
554
  for name in nodes:
555
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
556

    
557
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
558
      # Do not output anything for successful commands
559
      continue
560

    
561
    ToStdout("------------------------------------------------")
562
    if opts.show_machine_names:
563
      for line in result.output.splitlines():
564
        ToStdout("%s: %s", name, line)
565
    else:
566
      ToStdout("node: %s", name)
567
      ToStdout("%s", result.output)
568
    ToStdout("return code = %s", result.exit_code)
569

    
570
  return 0
571

    
572

    
573
def VerifyCluster(opts, args):
574
  """Verify integrity of cluster, performing various test on nodes.
575

576
  @param opts: the command line options selected by the user
577
  @type args: list
578
  @param args: should be an empty list
579
  @rtype: int
580
  @return: the desired exit code
581

582
  """
583
  skip_checks = []
584

    
585
  if opts.skip_nplusone_mem:
586
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
587

    
588
  cl = GetClient()
589

    
590
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
591
                               error_codes=opts.error_codes,
592
                               debug_simulate_errors=opts.simulate_errors,
593
                               skip_checks=skip_checks,
594
                               ignore_errors=opts.ignore_errors,
595
                               group_name=opts.nodegroup)
596
  result = SubmitOpCode(op, cl=cl, opts=opts)
597

    
598
  # Keep track of submitted jobs
599
  jex = JobExecutor(cl=cl, opts=opts)
600

    
601
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
602
    jex.AddJobId(None, status, job_id)
603

    
604
  results = jex.GetResults()
605

    
606
  (bad_jobs, bad_results) = \
607
    map(len,
608
        # Convert iterators to lists
609
        map(list,
610
            # Count errors
611
            map(compat.partial(itertools.ifilterfalse, bool),
612
                # Convert result to booleans in a tuple
613
                zip(*((job_success, len(op_results) == 1 and op_results[0])
614
                      for (job_success, op_results) in results)))))
615

    
616
  if bad_jobs == 0 and bad_results == 0:
617
    rcode = constants.EXIT_SUCCESS
618
  else:
619
    rcode = constants.EXIT_FAILURE
620
    if bad_jobs > 0:
621
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
622

    
623
  return rcode
624

    
625

    
626
def VerifyDisks(opts, args):
627
  """Verify integrity of cluster disks.
628

629
  @param opts: the command line options selected by the user
630
  @type args: list
631
  @param args: should be an empty list
632
  @rtype: int
633
  @return: the desired exit code
634

635
  """
636
  cl = GetClient()
637

    
638
  op = opcodes.OpClusterVerifyDisks()
639

    
640
  result = SubmitOpCode(op, cl=cl, opts=opts)
641

    
642
  # Keep track of submitted jobs
643
  jex = JobExecutor(cl=cl, opts=opts)
644

    
645
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
646
    jex.AddJobId(None, status, job_id)
647

    
648
  retcode = constants.EXIT_SUCCESS
649

    
650
  for (status, result) in jex.GetResults():
651
    if not status:
652
      ToStdout("Job failed: %s", result)
653
      continue
654

    
655
    ((bad_nodes, instances, missing), ) = result
656

    
657
    for node, text in bad_nodes.items():
658
      ToStdout("Error gathering data on node %s: %s",
659
               node, utils.SafeEncode(text[-400:]))
660
      retcode = constants.EXIT_FAILURE
661
      ToStdout("You need to fix these nodes first before fixing instances")
662

    
663
    for iname in instances:
664
      if iname in missing:
665
        continue
666
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
667
      try:
668
        ToStdout("Activating disks for instance '%s'", iname)
669
        SubmitOpCode(op, opts=opts, cl=cl)
670
      except errors.GenericError, err:
671
        nret, msg = FormatError(err)
672
        retcode |= nret
673
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
674

    
675
    if missing:
676
      for iname, ival in missing.iteritems():
677
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
678
        if all_missing:
679
          ToStdout("Instance %s cannot be verified as it lives on"
680
                   " broken nodes", iname)
681
        else:
682
          ToStdout("Instance %s has missing logical volumes:", iname)
683
          ival.sort()
684
          for node, vol in ival:
685
            if node in bad_nodes:
686
              ToStdout("\tbroken node %s /dev/%s", node, vol)
687
            else:
688
              ToStdout("\t%s /dev/%s", node, vol)
689

    
690
      ToStdout("You need to replace or recreate disks for all the above"
691
               " instances if this message persists after fixing broken nodes.")
692
      retcode = constants.EXIT_FAILURE
693
    elif not instances:
694
      ToStdout("No disks need to be activated.")
695

    
696
  return retcode
697

    
698

    
699
def RepairDiskSizes(opts, args):
700
  """Verify sizes of cluster disks.
701

702
  @param opts: the command line options selected by the user
703
  @type args: list
704
  @param args: optional list of instances to restrict check to
705
  @rtype: int
706
  @return: the desired exit code
707

708
  """
709
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
710
  SubmitOpCode(op, opts=opts)
711

    
712

    
713
@UsesRPC
714
def MasterFailover(opts, args):
715
  """Failover the master node.
716

717
  This command, when run on a non-master node, will cause the current
718
  master to cease being master, and the non-master to become new
719
  master.
720

721
  @param opts: the command line options selected by the user
722
  @type args: list
723
  @param args: should be an empty list
724
  @rtype: int
725
  @return: the desired exit code
726

727
  """
728
  if opts.no_voting and not opts.yes_do_it:
729
    usertext = ("This will perform the failover even if most other nodes"
730
                " are down, or if this node is outdated. This is dangerous"
731
                " as it can lead to a non-consistent cluster. Check the"
732
                " gnt-cluster(8) man page before proceeding. Continue?")
733
    if not AskUser(usertext):
734
      return 1
735

    
736
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
737

    
738

    
739
def MasterPing(opts, args):
740
  """Checks if the master is alive.
741

742
  @param opts: the command line options selected by the user
743
  @type args: list
744
  @param args: should be an empty list
745
  @rtype: int
746
  @return: the desired exit code
747

748
  """
749
  try:
750
    cl = GetClient()
751
    cl.QueryClusterInfo()
752
    return 0
753
  except Exception: # pylint: disable=W0703
754
    return 1
755

    
756

    
757
def SearchTags(opts, args):
758
  """Searches the tags on all the cluster.
759

760
  @param opts: the command line options selected by the user
761
  @type args: list
762
  @param args: should contain only one element, the tag pattern
763
  @rtype: int
764
  @return: the desired exit code
765

766
  """
767
  op = opcodes.OpTagsSearch(pattern=args[0])
768
  result = SubmitOpCode(op, opts=opts)
769
  if not result:
770
    return 1
771
  result = list(result)
772
  result.sort()
773
  for path, tag in result:
774
    ToStdout("%s %s", path, tag)
775

    
776

    
777
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
778
  """Reads and verifies an X509 certificate.
779

780
  @type cert_filename: string
781
  @param cert_filename: the path of the file containing the certificate to
782
                        verify encoded in PEM format
783
  @type verify_private_key: bool
784
  @param verify_private_key: whether to verify the private key in addition to
785
                             the public certificate
786
  @rtype: string
787
  @return: a string containing the PEM-encoded certificate.
788

789
  """
790
  try:
791
    pem = utils.ReadFile(cert_filename)
792
  except IOError, err:
793
    raise errors.X509CertError(cert_filename,
794
                               "Unable to read certificate: %s" % str(err))
795

    
796
  try:
797
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
798
  except Exception, err:
799
    raise errors.X509CertError(cert_filename,
800
                               "Unable to load certificate: %s" % str(err))
801

    
802
  if verify_private_key:
803
    try:
804
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
805
    except Exception, err:
806
      raise errors.X509CertError(cert_filename,
807
                                 "Unable to load private key: %s" % str(err))
808

    
809
  return pem
810

    
811

    
812
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
813
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
814
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
815
                 cds_filename, force):
816
  """Renews cluster certificates, keys and secrets.
817

818
  @type new_cluster_cert: bool
819
  @param new_cluster_cert: Whether to generate a new cluster certificate
820
  @type new_rapi_cert: bool
821
  @param new_rapi_cert: Whether to generate a new RAPI certificate
822
  @type rapi_cert_filename: string
823
  @param rapi_cert_filename: Path to file containing new RAPI certificate
824
  @type new_spice_cert: bool
825
  @param new_spice_cert: Whether to generate a new SPICE certificate
826
  @type spice_cert_filename: string
827
  @param spice_cert_filename: Path to file containing new SPICE certificate
828
  @type spice_cacert_filename: string
829
  @param spice_cacert_filename: Path to file containing the certificate of the
830
                                CA that signed the SPICE certificate
831
  @type new_confd_hmac_key: bool
832
  @param new_confd_hmac_key: Whether to generate a new HMAC key
833
  @type new_cds: bool
834
  @param new_cds: Whether to generate a new cluster domain secret
835
  @type cds_filename: string
836
  @param cds_filename: Path to file containing new cluster domain secret
837
  @type force: bool
838
  @param force: Whether to ask user for confirmation
839

840
  """
841
  if new_rapi_cert and rapi_cert_filename:
842
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
843
             " options can be specified at the same time.")
844
    return 1
845

    
846
  if new_cds and cds_filename:
847
    ToStderr("Only one of the --new-cluster-domain-secret and"
848
             " --cluster-domain-secret options can be specified at"
849
             " the same time.")
850
    return 1
851

    
852
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
853
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
854
             " and --spice-ca-certificate must not be used.")
855
    return 1
856

    
857
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
858
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
859
             " specified.")
860
    return 1
861

    
862
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
863
  try:
864
    if rapi_cert_filename:
865
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
866
    if spice_cert_filename:
867
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
868
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
869
  except errors.X509CertError, err:
870
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
871
    return 1
872

    
873
  if cds_filename:
874
    try:
875
      cds = utils.ReadFile(cds_filename)
876
    except Exception, err: # pylint: disable=W0703
877
      ToStderr("Can't load new cluster domain secret from %s: %s" %
878
               (cds_filename, str(err)))
879
      return 1
880
  else:
881
    cds = None
882

    
883
  if not force:
884
    usertext = ("This requires all daemons on all nodes to be restarted and"
885
                " may take some time. Continue?")
886
    if not AskUser(usertext):
887
      return 1
888

    
889
  def _RenewCryptoInner(ctx):
890
    ctx.feedback_fn("Updating certificates and keys")
891
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
892
                                    new_rapi_cert,
893
                                    new_spice_cert,
894
                                    new_confd_hmac_key,
895
                                    new_cds,
896
                                    rapi_cert_pem=rapi_cert_pem,
897
                                    spice_cert_pem=spice_cert_pem,
898
                                    spice_cacert_pem=spice_cacert_pem,
899
                                    cds=cds)
900

    
901
    files_to_copy = []
902

    
903
    if new_cluster_cert:
904
      files_to_copy.append(pathutils.NODED_CERT_FILE)
905

    
906
    if new_rapi_cert or rapi_cert_pem:
907
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
908

    
909
    if new_spice_cert or spice_cert_pem:
910
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
911
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
912

    
913
    if new_confd_hmac_key:
914
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
915

    
916
    if new_cds or cds:
917
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
918

    
919
    if files_to_copy:
920
      for node_name in ctx.nonmaster_nodes:
921
        ctx.feedback_fn("Copying %s to %s" %
922
                        (", ".join(files_to_copy), node_name))
923
        for file_name in files_to_copy:
924
          ctx.ssh.CopyFileToNode(node_name, file_name)
925

    
926
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
927

    
928
  ToStdout("All requested certificates and keys have been replaced."
929
           " Running \"gnt-cluster verify\" now is recommended.")
930

    
931
  return 0
932

    
933

    
934
def RenewCrypto(opts, args):
935
  """Renews cluster certificates, keys and secrets.
936

937
  """
938
  return _RenewCrypto(opts.new_cluster_cert,
939
                      opts.new_rapi_cert,
940
                      opts.rapi_cert,
941
                      opts.new_spice_cert,
942
                      opts.spice_cert,
943
                      opts.spice_cacert,
944
                      opts.new_confd_hmac_key,
945
                      opts.new_cluster_domain_secret,
946
                      opts.cluster_domain_secret,
947
                      opts.force)
948

    
949

    
950
def SetClusterParams(opts, args):
951
  """Modify the cluster.
952

953
  @param opts: the command line options selected by the user
954
  @type args: list
955
  @param args: should be an empty list
956
  @rtype: int
957
  @return: the desired exit code
958

959
  """
960
  if not (not opts.lvm_storage or opts.vg_name or
961
          not opts.drbd_storage or opts.drbd_helper or
962
          opts.enabled_hypervisors or opts.hvparams or
963
          opts.beparams or opts.nicparams or
964
          opts.ndparams or opts.diskparams or
965
          opts.candidate_pool_size is not None or
966
          opts.uid_pool is not None or
967
          opts.maintain_node_health is not None or
968
          opts.add_uids is not None or
969
          opts.remove_uids is not None or
970
          opts.default_iallocator is not None or
971
          opts.reserved_lvs is not None or
972
          opts.master_netdev is not None or
973
          opts.master_netmask is not None or
974
          opts.use_external_mip_script is not None or
975
          opts.prealloc_wipe_disks is not None or
976
          opts.hv_state or
977
          opts.enabled_storage_types or
978
          opts.disk_state or
979
          opts.ispecs_mem_size or
980
          opts.ispecs_cpu_count or
981
          opts.ispecs_disk_count or
982
          opts.ispecs_disk_size or
983
          opts.ispecs_nic_count or
984
          opts.ipolicy_disk_templates is not None or
985
          opts.ipolicy_vcpu_ratio is not None or
986
          opts.ipolicy_spindle_ratio is not None):
987
    ToStderr("Please give at least one of the parameters.")
988
    return 1
989

    
990
  vg_name = opts.vg_name
991
  if not opts.lvm_storage and opts.vg_name:
992
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
993
    return 1
994

    
995
  if not opts.lvm_storage:
996
    vg_name = ""
997

    
998
  drbd_helper = opts.drbd_helper
999
  if not opts.drbd_storage and opts.drbd_helper:
1000
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
1001
    return 1
1002

    
1003
  if not opts.drbd_storage:
1004
    drbd_helper = ""
1005

    
1006
  hvlist = opts.enabled_hypervisors
1007
  if hvlist is not None:
1008
    hvlist = hvlist.split(",")
1009

    
1010
  enabled_storage_types = opts.enabled_storage_types
1011
  if enabled_storage_types is not None:
1012
    enabled_storage_types = enabled_storage_types.split(",")
1013

    
1014
  # a list of (name, dict) we can pass directly to dict() (or [])
1015
  hvparams = dict(opts.hvparams)
1016
  for hv_params in hvparams.values():
1017
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1018

    
1019
  diskparams = dict(opts.diskparams)
1020

    
1021
  for dt_params in diskparams.values():
1022
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1023

    
1024
  beparams = opts.beparams
1025
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1026

    
1027
  nicparams = opts.nicparams
1028
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1029

    
1030
  ndparams = opts.ndparams
1031
  if ndparams is not None:
1032
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1033

    
1034
  ipolicy = CreateIPolicyFromOpts(
1035
    ispecs_mem_size=opts.ispecs_mem_size,
1036
    ispecs_cpu_count=opts.ispecs_cpu_count,
1037
    ispecs_disk_count=opts.ispecs_disk_count,
1038
    ispecs_disk_size=opts.ispecs_disk_size,
1039
    ispecs_nic_count=opts.ispecs_nic_count,
1040
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1041
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1042
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1043
    )
1044

    
1045
  mnh = opts.maintain_node_health
1046

    
1047
  uid_pool = opts.uid_pool
1048
  if uid_pool is not None:
1049
    uid_pool = uidpool.ParseUidPool(uid_pool)
1050

    
1051
  add_uids = opts.add_uids
1052
  if add_uids is not None:
1053
    add_uids = uidpool.ParseUidPool(add_uids)
1054

    
1055
  remove_uids = opts.remove_uids
1056
  if remove_uids is not None:
1057
    remove_uids = uidpool.ParseUidPool(remove_uids)
1058

    
1059
  if opts.reserved_lvs is not None:
1060
    if opts.reserved_lvs == "":
1061
      opts.reserved_lvs = []
1062
    else:
1063
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1064

    
1065
  if opts.master_netmask is not None:
1066
    try:
1067
      opts.master_netmask = int(opts.master_netmask)
1068
    except ValueError:
1069
      ToStderr("The --master-netmask option expects an int parameter.")
1070
      return 1
1071

    
1072
  ext_ip_script = opts.use_external_mip_script
1073

    
1074
  if opts.disk_state:
1075
    disk_state = utils.FlatToDict(opts.disk_state)
1076
  else:
1077
    disk_state = {}
1078

    
1079
  hv_state = dict(opts.hv_state)
1080

    
1081
  op = opcodes.OpClusterSetParams(
1082
    vg_name=vg_name,
1083
    drbd_helper=drbd_helper,
1084
    enabled_hypervisors=hvlist,
1085
    hvparams=hvparams,
1086
    os_hvp=None,
1087
    beparams=beparams,
1088
    nicparams=nicparams,
1089
    ndparams=ndparams,
1090
    diskparams=diskparams,
1091
    ipolicy=ipolicy,
1092
    candidate_pool_size=opts.candidate_pool_size,
1093
    maintain_node_health=mnh,
1094
    uid_pool=uid_pool,
1095
    add_uids=add_uids,
1096
    remove_uids=remove_uids,
1097
    default_iallocator=opts.default_iallocator,
1098
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1099
    master_netdev=opts.master_netdev,
1100
    master_netmask=opts.master_netmask,
1101
    reserved_lvs=opts.reserved_lvs,
1102
    use_external_mip_script=ext_ip_script,
1103
    hv_state=hv_state,
1104
    disk_state=disk_state,
1105
    enabled_storage_types=enabled_storage_types,
1106
    )
1107
  SubmitOrSend(op, opts)
1108
  return 0
1109

    
1110

    
1111
def QueueOps(opts, args):
1112
  """Queue operations.
1113

1114
  @param opts: the command line options selected by the user
1115
  @type args: list
1116
  @param args: should contain only one element, the subcommand
1117
  @rtype: int
1118
  @return: the desired exit code
1119

1120
  """
1121
  command = args[0]
1122
  client = GetClient()
1123
  if command in ("drain", "undrain"):
1124
    drain_flag = command == "drain"
1125
    client.SetQueueDrainFlag(drain_flag)
1126
  elif command == "info":
1127
    result = client.QueryConfigValues(["drain_flag"])
1128
    if result[0]:
1129
      val = "set"
1130
    else:
1131
      val = "unset"
1132
    ToStdout("The drain flag is %s" % val)
1133
  else:
1134
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1135
                               errors.ECODE_INVAL)
1136

    
1137
  return 0
1138

    
1139

    
1140
def _ShowWatcherPause(until):
1141
  if until is None or until < time.time():
1142
    ToStdout("The watcher is not paused.")
1143
  else:
1144
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1145

    
1146

    
1147
def WatcherOps(opts, args):
1148
  """Watcher operations.
1149

1150
  @param opts: the command line options selected by the user
1151
  @type args: list
1152
  @param args: should contain only one element, the subcommand
1153
  @rtype: int
1154
  @return: the desired exit code
1155

1156
  """
1157
  command = args[0]
1158
  client = GetClient()
1159

    
1160
  if command == "continue":
1161
    client.SetWatcherPause(None)
1162
    ToStdout("The watcher is no longer paused.")
1163

    
1164
  elif command == "pause":
1165
    if len(args) < 2:
1166
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1167

    
1168
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1169
    _ShowWatcherPause(result)
1170

    
1171
  elif command == "info":
1172
    result = client.QueryConfigValues(["watcher_pause"])
1173
    _ShowWatcherPause(result[0])
1174

    
1175
  else:
1176
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1177
                               errors.ECODE_INVAL)
1178

    
1179
  return 0
1180

    
1181

    
1182
def _OobPower(opts, node_list, power):
1183
  """Puts the node in the list to desired power state.
1184

1185
  @param opts: The command line options selected by the user
1186
  @param node_list: The list of nodes to operate on
1187
  @param power: True if they should be powered on, False otherwise
1188
  @return: The success of the operation (none failed)
1189

1190
  """
1191
  if power:
1192
    command = constants.OOB_POWER_ON
1193
  else:
1194
    command = constants.OOB_POWER_OFF
1195

    
1196
  op = opcodes.OpOobCommand(node_names=node_list,
1197
                            command=command,
1198
                            ignore_status=True,
1199
                            timeout=opts.oob_timeout,
1200
                            power_delay=opts.power_delay)
1201
  result = SubmitOpCode(op, opts=opts)
1202
  errs = 0
1203
  for node_result in result:
1204
    (node_tuple, data_tuple) = node_result
1205
    (_, node_name) = node_tuple
1206
    (data_status, _) = data_tuple
1207
    if data_status != constants.RS_NORMAL:
1208
      assert data_status != constants.RS_UNAVAIL
1209
      errs += 1
1210
      ToStderr("There was a problem changing power for %s, please investigate",
1211
               node_name)
1212

    
1213
  if errs > 0:
1214
    return False
1215

    
1216
  return True
1217

    
1218

    
1219
def _InstanceStart(opts, inst_list, start, no_remember=False):
1220
  """Puts the instances in the list to desired state.
1221

1222
  @param opts: The command line options selected by the user
1223
  @param inst_list: The list of instances to operate on
1224
  @param start: True if they should be started, False for shutdown
1225
  @param no_remember: If the instance state should be remembered
1226
  @return: The success of the operation (none failed)
1227

1228
  """
1229
  if start:
1230
    opcls = opcodes.OpInstanceStartup
1231
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1232
  else:
1233
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1234
                           timeout=opts.shutdown_timeout,
1235
                           no_remember=no_remember)
1236
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1237

    
1238
  jex = JobExecutor(opts=opts)
1239

    
1240
  for inst in inst_list:
1241
    ToStdout("Submit %s of instance %s", text_submit, inst)
1242
    op = opcls(instance_name=inst)
1243
    jex.QueueJob(inst, op)
1244

    
1245
  results = jex.GetResults()
1246
  bad_cnt = len([1 for (success, _) in results if not success])
1247

    
1248
  if bad_cnt == 0:
1249
    ToStdout("All instances have been %s successfully", text_success)
1250
  else:
1251
    ToStderr("There were errors while %s instances:\n"
1252
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1253
             len(results))
1254
    return False
1255

    
1256
  return True
1257

    
1258

    
1259
class _RunWhenNodesReachableHelper:
1260
  """Helper class to make shared internal state sharing easier.
1261

1262
  @ivar success: Indicates if all action_cb calls were successful
1263

1264
  """
1265
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1266
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1267
    """Init the object.
1268

1269
    @param node_list: The list of nodes to be reachable
1270
    @param action_cb: Callback called when a new host is reachable
1271
    @type node2ip: dict
1272
    @param node2ip: Node to ip mapping
1273
    @param port: The port to use for the TCP ping
1274
    @param feedback_fn: The function used for feedback
1275
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1276
    @param _sleep_fn: Function to sleep (for unittest use only)
1277

1278
    """
1279
    self.down = set(node_list)
1280
    self.up = set()
1281
    self.node2ip = node2ip
1282
    self.success = True
1283
    self.action_cb = action_cb
1284
    self.port = port
1285
    self.feedback_fn = feedback_fn
1286
    self._ping_fn = _ping_fn
1287
    self._sleep_fn = _sleep_fn
1288

    
1289
  def __call__(self):
1290
    """When called we run action_cb.
1291

1292
    @raises utils.RetryAgain: When there are still down nodes
1293

1294
    """
1295
    if not self.action_cb(self.up):
1296
      self.success = False
1297

    
1298
    if self.down:
1299
      raise utils.RetryAgain()
1300
    else:
1301
      return self.success
1302

    
1303
  def Wait(self, secs):
1304
    """Checks if a host is up or waits remaining seconds.
1305

1306
    @param secs: The secs remaining
1307

1308
    """
1309
    start = time.time()
1310
    for node in self.down:
1311
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1312
                       live_port_needed=True):
1313
        self.feedback_fn("Node %s became available" % node)
1314
        self.up.add(node)
1315
        self.down -= self.up
1316
        # If we have a node available there is the possibility to run the
1317
        # action callback successfully, therefore we don't wait and return
1318
        return
1319

    
1320
    self._sleep_fn(max(0.0, start + secs - time.time()))
1321

    
1322

    
1323
def _RunWhenNodesReachable(node_list, action_cb, interval):
1324
  """Run action_cb when nodes become reachable.
1325

1326
  @param node_list: The list of nodes to be reachable
1327
  @param action_cb: Callback called when a new host is reachable
1328
  @param interval: The earliest time to retry
1329

1330
  """
1331
  client = GetClient()
1332
  cluster_info = client.QueryClusterInfo()
1333
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1334
    family = netutils.IPAddress.family
1335
  else:
1336
    family = netutils.IP6Address.family
1337

    
1338
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1339
                 for node in node_list)
1340

    
1341
  port = netutils.GetDaemonPort(constants.NODED)
1342
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1343
                                        ToStdout)
1344

    
1345
  try:
1346
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1347
                       wait_fn=helper.Wait)
1348
  except utils.RetryTimeout:
1349
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1350
             " again:\n  - %s", "  - ".join(helper.down))
1351
    return False
1352

    
1353

    
1354
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1355
                          _instance_start_fn=_InstanceStart):
1356
  """Start the instances conditional based on node_states.
1357

1358
  @param opts: The command line options selected by the user
1359
  @param inst_map: A dict of inst -> nodes mapping
1360
  @param nodes_online: A list of nodes online
1361
  @param _instance_start_fn: Callback to start instances (unittest use only)
1362
  @return: Success of the operation on all instances
1363

1364
  """
1365
  start_inst_list = []
1366
  for (inst, nodes) in inst_map.items():
1367
    if not (nodes - nodes_online):
1368
      # All nodes the instance lives on are back online
1369
      start_inst_list.append(inst)
1370

    
1371
  for inst in start_inst_list:
1372
    del inst_map[inst]
1373

    
1374
  if start_inst_list:
1375
    return _instance_start_fn(opts, start_inst_list, True)
1376

    
1377
  return True
1378

    
1379

    
1380
def _EpoOn(opts, full_node_list, node_list, inst_map):
1381
  """Does the actual power on.
1382

1383
  @param opts: The command line options selected by the user
1384
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1385
                         OOB)
1386
  @param node_list: The list of nodes to operate on (all need to support OOB)
1387
  @param inst_map: A dict of inst -> nodes mapping
1388
  @return: The desired exit status
1389

1390
  """
1391
  if node_list and not _OobPower(opts, node_list, False):
1392
    ToStderr("Not all nodes seem to get back up, investigate and start"
1393
             " manually if needed")
1394

    
1395
  # Wait for the nodes to be back up
1396
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1397

    
1398
  ToStdout("Waiting until all nodes are available again")
1399
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1400
    ToStderr("Please investigate and start stopped instances manually")
1401
    return constants.EXIT_FAILURE
1402

    
1403
  return constants.EXIT_SUCCESS
1404

    
1405

    
1406
def _EpoOff(opts, node_list, inst_map):
1407
  """Does the actual power off.
1408

1409
  @param opts: The command line options selected by the user
1410
  @param node_list: The list of nodes to operate on (all need to support OOB)
1411
  @param inst_map: A dict of inst -> nodes mapping
1412
  @return: The desired exit status
1413

1414
  """
1415
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1416
    ToStderr("Please investigate and stop instances manually before continuing")
1417
    return constants.EXIT_FAILURE
1418

    
1419
  if not node_list:
1420
    return constants.EXIT_SUCCESS
1421

    
1422
  if _OobPower(opts, node_list, False):
1423
    return constants.EXIT_SUCCESS
1424
  else:
1425
    return constants.EXIT_FAILURE
1426

    
1427

    
1428
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1429
        _confirm_fn=ConfirmOperation,
1430
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1431
  """EPO operations.
1432

1433
  @param opts: the command line options selected by the user
1434
  @type args: list
1435
  @param args: should contain only one element, the subcommand
1436
  @rtype: int
1437
  @return: the desired exit code
1438

1439
  """
1440
  if opts.groups and opts.show_all:
1441
    _stderr_fn("Only one of --groups or --all are allowed")
1442
    return constants.EXIT_FAILURE
1443
  elif args and opts.show_all:
1444
    _stderr_fn("Arguments in combination with --all are not allowed")
1445
    return constants.EXIT_FAILURE
1446

    
1447
  if cl is None:
1448
    cl = GetClient()
1449

    
1450
  if opts.groups:
1451
    node_query_list = \
1452
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1453
  else:
1454
    node_query_list = args
1455

    
1456
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1457
                                           "sinst_list", "powered", "offline"],
1458
                         False)
1459

    
1460
  all_nodes = map(compat.fst, result)
1461
  node_list = []
1462
  inst_map = {}
1463
  for (node, master, pinsts, sinsts, powered, offline) in result:
1464
    if not offline:
1465
      for inst in (pinsts + sinsts):
1466
        if inst in inst_map:
1467
          if not master:
1468
            inst_map[inst].add(node)
1469
        elif master:
1470
          inst_map[inst] = set()
1471
        else:
1472
          inst_map[inst] = set([node])
1473

    
1474
    if master and opts.on:
1475
      # We ignore the master for turning on the machines, in fact we are
1476
      # already operating on the master at this point :)
1477
      continue
1478
    elif master and not opts.show_all:
1479
      _stderr_fn("%s is the master node, please do a master-failover to another"
1480
                 " node not affected by the EPO or use --all if you intend to"
1481
                 " shutdown the whole cluster", node)
1482
      return constants.EXIT_FAILURE
1483
    elif powered is None:
1484
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1485
                 " handled in a fully automated manner", node)
1486
    elif powered == opts.on:
1487
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1488
    elif not offline or (offline and powered):
1489
      node_list.append(node)
1490

    
1491
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1492
    return constants.EXIT_FAILURE
1493

    
1494
  if opts.on:
1495
    return _on_fn(opts, all_nodes, node_list, inst_map)
1496
  else:
1497
    return _off_fn(opts, node_list, inst_map)
1498

    
1499

    
1500
commands = {
1501
  "init": (
1502
    InitCluster, [ArgHost(min=1, max=1)],
1503
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1504
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1505
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1506
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1507
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1508
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1509
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1510
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_STORAGE_TYPES_OPT]
1511
     + INSTANCE_POLICY_OPTS,
1512
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1513
  "destroy": (
1514
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1515
    "", "Destroy cluster"),
1516
  "rename": (
1517
    RenameCluster, [ArgHost(min=1, max=1)],
1518
    [FORCE_OPT, DRY_RUN_OPT],
1519
    "<new_name>",
1520
    "Renames the cluster"),
1521
  "redist-conf": (
1522
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1523
    "", "Forces a push of the configuration file and ssconf files"
1524
    " to the nodes in the cluster"),
1525
  "verify": (
1526
    VerifyCluster, ARGS_NONE,
1527
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1528
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1529
    "", "Does a check on the cluster configuration"),
1530
  "verify-disks": (
1531
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1532
    "", "Does a check on the cluster disk status"),
1533
  "repair-disk-sizes": (
1534
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1535
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1536
  "master-failover": (
1537
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1538
    "", "Makes the current node the master"),
1539
  "master-ping": (
1540
    MasterPing, ARGS_NONE, [],
1541
    "", "Checks if the master is alive"),
1542
  "version": (
1543
    ShowClusterVersion, ARGS_NONE, [],
1544
    "", "Shows the cluster version"),
1545
  "getmaster": (
1546
    ShowClusterMaster, ARGS_NONE, [],
1547
    "", "Shows the cluster master"),
1548
  "copyfile": (
1549
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1550
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1551
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1552
  "command": (
1553
    RunClusterCommand, [ArgCommand(min=1)],
1554
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1555
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1556
  "info": (
1557
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1558
    "[--roman]", "Show cluster configuration"),
1559
  "list-tags": (
1560
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1561
  "add-tags": (
1562
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1563
    "tag...", "Add tags to the cluster"),
1564
  "remove-tags": (
1565
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1566
    "tag...", "Remove tags from the cluster"),
1567
  "search-tags": (
1568
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1569
    "Searches the tags on all objects on"
1570
    " the cluster for a given pattern (regex)"),
1571
  "queue": (
1572
    QueueOps,
1573
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1574
    [], "drain|undrain|info", "Change queue properties"),
1575
  "watcher": (
1576
    WatcherOps,
1577
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1578
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1579
    [],
1580
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1581
  "modify": (
1582
    SetClusterParams, ARGS_NONE,
1583
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1584
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1585
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1586
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1587
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1588
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1589
     DISK_STATE_OPT, SUBMIT_OPT, ENABLED_STORAGE_TYPES_OPT] +
1590
    INSTANCE_POLICY_OPTS,
1591
    "[opts...]",
1592
    "Alters the parameters of the cluster"),
1593
  "renew-crypto": (
1594
    RenewCrypto, ARGS_NONE,
1595
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1596
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1597
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1598
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1599
    "[opts...]",
1600
    "Renews cluster certificates, keys and secrets"),
1601
  "epo": (
1602
    Epo, [ArgUnknown()],
1603
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1604
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1605
    "[opts...] [args]",
1606
    "Performs an emergency power-off on given args"),
1607
  "activate-master-ip": (
1608
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1609
  "deactivate-master-ip": (
1610
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1611
    "Deactivates the master IP"),
1612
  }
1613

    
1614

    
1615
#: dictionary with aliases for commands
1616
aliases = {
1617
  "masterfailover": "master-failover",
1618
  "show": "info",
1619
}
1620

    
1621

    
1622
def Main():
1623
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1624
                     aliases=aliases)