Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 966e1580

History | View | Annotate | Download (51.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45
from ganeti import pathutils
46

    
47

    
48
ON_OPT = cli_option("--on", default=False,
49
                    action="store_true", dest="on",
50
                    help="Recover from an EPO")
51

    
52
GROUPS_OPT = cli_option("--groups", default=False,
53
                        action="store_true", dest="groups",
54
                        help="Arguments are node groups instead of nodes")
55

    
56
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
57
                            help="Override interactive check for --no-voting",
58
                            default=False, action="store_true")
59

    
60
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
61
_EPO_PING_TIMEOUT = 1 # 1 second
62
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
63

    
64

    
65
@UsesRPC
66
def InitCluster(opts, args):
67
  """Initialize the cluster.
68

69
  @param opts: the command line options selected by the user
70
  @type args: list
71
  @param args: should contain only one element, the desired
72
      cluster name
73
  @rtype: int
74
  @return: the desired exit code
75

76
  """
77
  if not opts.lvm_storage and opts.vg_name:
78
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
79
    return 1
80

    
81
  vg_name = opts.vg_name
82
  if opts.lvm_storage and not opts.vg_name:
83
    vg_name = constants.DEFAULT_VG
84

    
85
  if not opts.drbd_storage and opts.drbd_helper:
86
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
87
    return 1
88

    
89
  drbd_helper = opts.drbd_helper
90
  if opts.drbd_storage and not opts.drbd_helper:
91
    drbd_helper = constants.DEFAULT_DRBD_HELPER
92

    
93
  master_netdev = opts.master_netdev
94
  if master_netdev is None:
95
    master_netdev = constants.DEFAULT_BRIDGE
96

    
97
  hvlist = opts.enabled_hypervisors
98
  if hvlist is None:
99
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
100
  hvlist = hvlist.split(",")
101

    
102
  hvparams = dict(opts.hvparams)
103
  beparams = opts.beparams
104
  nicparams = opts.nicparams
105

    
106
  diskparams = dict(opts.diskparams)
107

    
108
  # check the disk template types here, as we cannot rely on the type check done
109
  # by the opcode parameter types
110
  diskparams_keys = set(diskparams.keys())
111
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
112
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
113
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
114
    return 1
115

    
116
  # prepare beparams dict
117
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
118
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
119

    
120
  # prepare nicparams dict
121
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
122
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
123

    
124
  # prepare ndparams dict
125
  if opts.ndparams is None:
126
    ndparams = dict(constants.NDC_DEFAULTS)
127
  else:
128
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
129
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
130

    
131
  # prepare hvparams dict
132
  for hv in constants.HYPER_TYPES:
133
    if hv not in hvparams:
134
      hvparams[hv] = {}
135
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
136
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
137

    
138
  # prepare diskparams dict
139
  for templ in constants.DISK_TEMPLATES:
140
    if templ not in diskparams:
141
      diskparams[templ] = {}
142
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
143
                                         diskparams[templ])
144
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
145

    
146
  # prepare ipolicy dict
147
  ipolicy = CreateIPolicyFromOpts(
148
    ispecs_mem_size=opts.ispecs_mem_size,
149
    ispecs_cpu_count=opts.ispecs_cpu_count,
150
    ispecs_disk_count=opts.ispecs_disk_count,
151
    ispecs_disk_size=opts.ispecs_disk_size,
152
    ispecs_nic_count=opts.ispecs_nic_count,
153
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
154
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
155
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
156
    fill_all=True)
157

    
158
  if opts.candidate_pool_size is None:
159
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
160

    
161
  if opts.mac_prefix is None:
162
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
163

    
164
  uid_pool = opts.uid_pool
165
  if uid_pool is not None:
166
    uid_pool = uidpool.ParseUidPool(uid_pool)
167

    
168
  if opts.prealloc_wipe_disks is None:
169
    opts.prealloc_wipe_disks = False
170

    
171
  external_ip_setup_script = opts.use_external_mip_script
172
  if external_ip_setup_script is None:
173
    external_ip_setup_script = False
174

    
175
  try:
176
    primary_ip_version = int(opts.primary_ip_version)
177
  except (ValueError, TypeError), err:
178
    ToStderr("Invalid primary ip version value: %s" % str(err))
179
    return 1
180

    
181
  master_netmask = opts.master_netmask
182
  try:
183
    if master_netmask is not None:
184
      master_netmask = int(master_netmask)
185
  except (ValueError, TypeError), err:
186
    ToStderr("Invalid master netmask value: %s" % str(err))
187
    return 1
188

    
189
  if opts.disk_state:
190
    disk_state = utils.FlatToDict(opts.disk_state)
191
  else:
192
    disk_state = {}
193

    
194
  hv_state = dict(opts.hv_state)
195

    
196
  # FIXME: remove enabled_storage_types when enabled_disk_templates are
197
  # fully implemented.
198
  enabled_storage_types = opts.enabled_storage_types
199
  if enabled_storage_types is not None:
200
    enabled_storage_types = enabled_storage_types.split(",")
201
  else:
202
    enabled_storage_types = list(constants.DEFAULT_ENABLED_STORAGE_TYPES)
203

    
204
  enabled_disk_templates = opts.enabled_disk_templates
205
  if enabled_disk_templates:
206
    enabled_disk_templates = enabled_disk_templates.split(",")
207
  else:
208
    enabled_disk_templates = list(constants.DEFAULT_ENABLED_DISK_TEMPLATES)
209

    
210
  bootstrap.InitCluster(cluster_name=args[0],
211
                        secondary_ip=opts.secondary_ip,
212
                        vg_name=vg_name,
213
                        mac_prefix=opts.mac_prefix,
214
                        master_netmask=master_netmask,
215
                        master_netdev=master_netdev,
216
                        file_storage_dir=opts.file_storage_dir,
217
                        shared_file_storage_dir=opts.shared_file_storage_dir,
218
                        enabled_hypervisors=hvlist,
219
                        hvparams=hvparams,
220
                        beparams=beparams,
221
                        nicparams=nicparams,
222
                        ndparams=ndparams,
223
                        diskparams=diskparams,
224
                        ipolicy=ipolicy,
225
                        candidate_pool_size=opts.candidate_pool_size,
226
                        modify_etc_hosts=opts.modify_etc_hosts,
227
                        modify_ssh_setup=opts.modify_ssh_setup,
228
                        maintain_node_health=opts.maintain_node_health,
229
                        drbd_helper=drbd_helper,
230
                        uid_pool=uid_pool,
231
                        default_iallocator=opts.default_iallocator,
232
                        primary_ip_version=primary_ip_version,
233
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
234
                        use_external_mip_script=external_ip_setup_script,
235
                        hv_state=hv_state,
236
                        disk_state=disk_state,
237
                        enabled_storage_types=enabled_storage_types,
238
                        )
239
  op = opcodes.OpClusterPostInit()
240
  SubmitOpCode(op, opts=opts)
241
  return 0
242

    
243

    
244
@UsesRPC
245
def DestroyCluster(opts, args):
246
  """Destroy the cluster.
247

248
  @param opts: the command line options selected by the user
249
  @type args: list
250
  @param args: should be an empty list
251
  @rtype: int
252
  @return: the desired exit code
253

254
  """
255
  if not opts.yes_do_it:
256
    ToStderr("Destroying a cluster is irreversible. If you really want"
257
             " destroy this cluster, supply the --yes-do-it option.")
258
    return 1
259

    
260
  op = opcodes.OpClusterDestroy()
261
  master = SubmitOpCode(op, opts=opts)
262
  # if we reached this, the opcode didn't fail; we can proceed to
263
  # shutdown all the daemons
264
  bootstrap.FinalizeClusterDestroy(master)
265
  return 0
266

    
267

    
268
def RenameCluster(opts, args):
269
  """Rename the cluster.
270

271
  @param opts: the command line options selected by the user
272
  @type args: list
273
  @param args: should contain only one element, the new cluster name
274
  @rtype: int
275
  @return: the desired exit code
276

277
  """
278
  cl = GetClient()
279

    
280
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
281

    
282
  new_name = args[0]
283
  if not opts.force:
284
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
285
                " connected over the network to the cluster name, the"
286
                " operation is very dangerous as the IP address will be"
287
                " removed from the node and the change may not go through."
288
                " Continue?") % (cluster_name, new_name)
289
    if not AskUser(usertext):
290
      return 1
291

    
292
  op = opcodes.OpClusterRename(name=new_name)
293
  result = SubmitOpCode(op, opts=opts, cl=cl)
294

    
295
  if result:
296
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
297

    
298
  return 0
299

    
300

    
301
def ActivateMasterIp(opts, args):
302
  """Activates the master IP.
303

304
  """
305
  op = opcodes.OpClusterActivateMasterIp()
306
  SubmitOpCode(op)
307
  return 0
308

    
309

    
310
def DeactivateMasterIp(opts, args):
311
  """Deactivates the master IP.
312

313
  """
314
  if not opts.confirm:
315
    usertext = ("This will disable the master IP. All the open connections to"
316
                " the master IP will be closed. To reach the master you will"
317
                " need to use its node IP."
318
                " Continue?")
319
    if not AskUser(usertext):
320
      return 1
321

    
322
  op = opcodes.OpClusterDeactivateMasterIp()
323
  SubmitOpCode(op)
324
  return 0
325

    
326

    
327
def RedistributeConfig(opts, args):
328
  """Forces push of the cluster configuration.
329

330
  @param opts: the command line options selected by the user
331
  @type args: list
332
  @param args: empty list
333
  @rtype: int
334
  @return: the desired exit code
335

336
  """
337
  op = opcodes.OpClusterRedistConf()
338
  SubmitOrSend(op, opts)
339
  return 0
340

    
341

    
342
def ShowClusterVersion(opts, args):
343
  """Write version of ganeti software to the standard output.
344

345
  @param opts: the command line options selected by the user
346
  @type args: list
347
  @param args: should be an empty list
348
  @rtype: int
349
  @return: the desired exit code
350

351
  """
352
  cl = GetClient(query=True)
353
  result = cl.QueryClusterInfo()
354
  ToStdout("Software version: %s", result["software_version"])
355
  ToStdout("Internode protocol: %s", result["protocol_version"])
356
  ToStdout("Configuration format: %s", result["config_version"])
357
  ToStdout("OS api version: %s", result["os_api_version"])
358
  ToStdout("Export interface: %s", result["export_version"])
359
  return 0
360

    
361

    
362
def ShowClusterMaster(opts, args):
363
  """Write name of master node to the standard output.
364

365
  @param opts: the command line options selected by the user
366
  @type args: list
367
  @param args: should be an empty list
368
  @rtype: int
369
  @return: the desired exit code
370

371
  """
372
  master = bootstrap.GetMaster()
373
  ToStdout(master)
374
  return 0
375

    
376

    
377
def _FormatGroupedParams(paramsdict, roman=False):
378
  """Format Grouped parameters (be, nic, disk) by group.
379

380
  @type paramsdict: dict of dicts
381
  @param paramsdict: {group: {param: value, ...}, ...}
382
  @rtype: dict of dicts
383
  @return: copy of the input dictionaries with strings as values
384

385
  """
386
  ret = {}
387
  for (item, val) in paramsdict.items():
388
    if isinstance(val, dict):
389
      ret[item] = _FormatGroupedParams(val, roman=roman)
390
    elif roman and isinstance(val, int):
391
      ret[item] = compat.TryToRoman(val)
392
    else:
393
      ret[item] = str(val)
394
  return ret
395

    
396

    
397
def ShowClusterConfig(opts, args):
398
  """Shows cluster information.
399

400
  @param opts: the command line options selected by the user
401
  @type args: list
402
  @param args: should be an empty list
403
  @rtype: int
404
  @return: the desired exit code
405

406
  """
407
  cl = GetClient(query=True)
408
  result = cl.QueryClusterInfo()
409

    
410
  if result["tags"]:
411
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
412
  else:
413
    tags = "(none)"
414
  if result["reserved_lvs"]:
415
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
416
  else:
417
    reserved_lvs = "(none)"
418

    
419
  info = [
420
    ("Cluster name", result["name"]),
421
    ("Cluster UUID", result["uuid"]),
422

    
423
    ("Creation time", utils.FormatTime(result["ctime"])),
424
    ("Modification time", utils.FormatTime(result["mtime"])),
425

    
426
    ("Master node", result["master"]),
427

    
428
    ("Architecture (this node)",
429
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
430

    
431
    ("Tags", tags),
432

    
433
    ("Default hypervisor", result["default_hypervisor"]),
434
    ("Enabled hypervisors",
435
     utils.CommaJoin(result["enabled_hypervisors"])),
436

    
437
    ("Hypervisor parameters", _FormatGroupedParams(result["hvparams"])),
438

    
439
    ("OS-specific hypervisor parameters",
440
     _FormatGroupedParams(result["os_hvp"])),
441

    
442
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
443

    
444
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
445
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
446

    
447
    ("Cluster parameters", [
448
      ("candidate pool size",
449
       compat.TryToRoman(result["candidate_pool_size"],
450
                         convert=opts.roman_integers)),
451
      ("master netdev", result["master_netdev"]),
452
      ("master netmask", result["master_netmask"]),
453
      ("use external master IP address setup script",
454
       result["use_external_mip_script"]),
455
      ("lvm volume group", result["volume_group_name"]),
456
      ("lvm reserved volumes", reserved_lvs),
457
      ("drbd usermode helper", result["drbd_usermode_helper"]),
458
      ("file storage path", result["file_storage_dir"]),
459
      ("shared file storage path", result["shared_file_storage_dir"]),
460
      ("maintenance of node health", result["maintain_node_health"]),
461
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
462
      ("default instance allocator", result["default_iallocator"]),
463
      ("primary ip version", result["primary_ip_version"]),
464
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
465
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
466
      ("ExtStorage Providers search path",
467
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
468
      ("enabled storage types",
469
       utils.CommaJoin(result["enabled_storage_types"])),
470
      ("enabled disk templates",
471
       utils.CommaJoin(result["enabled_disk_templates"])),
472
      ]),
473

    
474
    ("Default node parameters",
475
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
476

    
477
    ("Default instance parameters",
478
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
479

    
480
    ("Default nic parameters",
481
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
482

    
483
    ("Default disk parameters",
484
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
485

    
486
    ("Instance policy - limits for instances",
487
     FormatPolicyInfo(result["ipolicy"], None, True)),
488
    ]
489

    
490
  PrintGenericInfo(info)
491
  return 0
492

    
493

    
494
def ClusterCopyFile(opts, args):
495
  """Copy a file from master to some nodes.
496

497
  @param opts: the command line options selected by the user
498
  @type args: list
499
  @param args: should contain only one element, the path of
500
      the file to be copied
501
  @rtype: int
502
  @return: the desired exit code
503

504
  """
505
  filename = args[0]
506
  if not os.path.exists(filename):
507
    raise errors.OpPrereqError("No such filename '%s'" % filename,
508
                               errors.ECODE_INVAL)
509

    
510
  cl = GetClient()
511

    
512
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
513

    
514
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
515
                           secondary_ips=opts.use_replication_network,
516
                           nodegroup=opts.nodegroup)
517

    
518
  srun = ssh.SshRunner(cluster_name)
519
  for node in results:
520
    if not srun.CopyFileToNode(node, filename):
521
      ToStderr("Copy of file %s to node %s failed", filename, node)
522

    
523
  return 0
524

    
525

    
526
def RunClusterCommand(opts, args):
527
  """Run a command on some nodes.
528

529
  @param opts: the command line options selected by the user
530
  @type args: list
531
  @param args: should contain the command to be run and its arguments
532
  @rtype: int
533
  @return: the desired exit code
534

535
  """
536
  cl = GetClient()
537

    
538
  command = " ".join(args)
539

    
540
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
541

    
542
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
543
                                                    "master_node"])
544

    
545
  srun = ssh.SshRunner(cluster_name=cluster_name)
546

    
547
  # Make sure master node is at list end
548
  if master_node in nodes:
549
    nodes.remove(master_node)
550
    nodes.append(master_node)
551

    
552
  for name in nodes:
553
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
554

    
555
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
556
      # Do not output anything for successful commands
557
      continue
558

    
559
    ToStdout("------------------------------------------------")
560
    if opts.show_machine_names:
561
      for line in result.output.splitlines():
562
        ToStdout("%s: %s", name, line)
563
    else:
564
      ToStdout("node: %s", name)
565
      ToStdout("%s", result.output)
566
    ToStdout("return code = %s", result.exit_code)
567

    
568
  return 0
569

    
570

    
571
def VerifyCluster(opts, args):
572
  """Verify integrity of cluster, performing various test on nodes.
573

574
  @param opts: the command line options selected by the user
575
  @type args: list
576
  @param args: should be an empty list
577
  @rtype: int
578
  @return: the desired exit code
579

580
  """
581
  skip_checks = []
582

    
583
  if opts.skip_nplusone_mem:
584
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
585

    
586
  cl = GetClient()
587

    
588
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
589
                               error_codes=opts.error_codes,
590
                               debug_simulate_errors=opts.simulate_errors,
591
                               skip_checks=skip_checks,
592
                               ignore_errors=opts.ignore_errors,
593
                               group_name=opts.nodegroup)
594
  result = SubmitOpCode(op, cl=cl, opts=opts)
595

    
596
  # Keep track of submitted jobs
597
  jex = JobExecutor(cl=cl, opts=opts)
598

    
599
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
600
    jex.AddJobId(None, status, job_id)
601

    
602
  results = jex.GetResults()
603

    
604
  (bad_jobs, bad_results) = \
605
    map(len,
606
        # Convert iterators to lists
607
        map(list,
608
            # Count errors
609
            map(compat.partial(itertools.ifilterfalse, bool),
610
                # Convert result to booleans in a tuple
611
                zip(*((job_success, len(op_results) == 1 and op_results[0])
612
                      for (job_success, op_results) in results)))))
613

    
614
  if bad_jobs == 0 and bad_results == 0:
615
    rcode = constants.EXIT_SUCCESS
616
  else:
617
    rcode = constants.EXIT_FAILURE
618
    if bad_jobs > 0:
619
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
620

    
621
  return rcode
622

    
623

    
624
def VerifyDisks(opts, args):
625
  """Verify integrity of cluster disks.
626

627
  @param opts: the command line options selected by the user
628
  @type args: list
629
  @param args: should be an empty list
630
  @rtype: int
631
  @return: the desired exit code
632

633
  """
634
  cl = GetClient()
635

    
636
  op = opcodes.OpClusterVerifyDisks()
637

    
638
  result = SubmitOpCode(op, cl=cl, opts=opts)
639

    
640
  # Keep track of submitted jobs
641
  jex = JobExecutor(cl=cl, opts=opts)
642

    
643
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
644
    jex.AddJobId(None, status, job_id)
645

    
646
  retcode = constants.EXIT_SUCCESS
647

    
648
  for (status, result) in jex.GetResults():
649
    if not status:
650
      ToStdout("Job failed: %s", result)
651
      continue
652

    
653
    ((bad_nodes, instances, missing), ) = result
654

    
655
    for node, text in bad_nodes.items():
656
      ToStdout("Error gathering data on node %s: %s",
657
               node, utils.SafeEncode(text[-400:]))
658
      retcode = constants.EXIT_FAILURE
659
      ToStdout("You need to fix these nodes first before fixing instances")
660

    
661
    for iname in instances:
662
      if iname in missing:
663
        continue
664
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
665
      try:
666
        ToStdout("Activating disks for instance '%s'", iname)
667
        SubmitOpCode(op, opts=opts, cl=cl)
668
      except errors.GenericError, err:
669
        nret, msg = FormatError(err)
670
        retcode |= nret
671
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
672

    
673
    if missing:
674
      for iname, ival in missing.iteritems():
675
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
676
        if all_missing:
677
          ToStdout("Instance %s cannot be verified as it lives on"
678
                   " broken nodes", iname)
679
        else:
680
          ToStdout("Instance %s has missing logical volumes:", iname)
681
          ival.sort()
682
          for node, vol in ival:
683
            if node in bad_nodes:
684
              ToStdout("\tbroken node %s /dev/%s", node, vol)
685
            else:
686
              ToStdout("\t%s /dev/%s", node, vol)
687

    
688
      ToStdout("You need to replace or recreate disks for all the above"
689
               " instances if this message persists after fixing broken nodes.")
690
      retcode = constants.EXIT_FAILURE
691
    elif not instances:
692
      ToStdout("No disks need to be activated.")
693

    
694
  return retcode
695

    
696

    
697
def RepairDiskSizes(opts, args):
698
  """Verify sizes of cluster disks.
699

700
  @param opts: the command line options selected by the user
701
  @type args: list
702
  @param args: optional list of instances to restrict check to
703
  @rtype: int
704
  @return: the desired exit code
705

706
  """
707
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
708
  SubmitOpCode(op, opts=opts)
709

    
710

    
711
@UsesRPC
712
def MasterFailover(opts, args):
713
  """Failover the master node.
714

715
  This command, when run on a non-master node, will cause the current
716
  master to cease being master, and the non-master to become new
717
  master.
718

719
  @param opts: the command line options selected by the user
720
  @type args: list
721
  @param args: should be an empty list
722
  @rtype: int
723
  @return: the desired exit code
724

725
  """
726
  if opts.no_voting and not opts.yes_do_it:
727
    usertext = ("This will perform the failover even if most other nodes"
728
                " are down, or if this node is outdated. This is dangerous"
729
                " as it can lead to a non-consistent cluster. Check the"
730
                " gnt-cluster(8) man page before proceeding. Continue?")
731
    if not AskUser(usertext):
732
      return 1
733

    
734
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
735

    
736

    
737
def MasterPing(opts, args):
738
  """Checks if the master is alive.
739

740
  @param opts: the command line options selected by the user
741
  @type args: list
742
  @param args: should be an empty list
743
  @rtype: int
744
  @return: the desired exit code
745

746
  """
747
  try:
748
    cl = GetClient()
749
    cl.QueryClusterInfo()
750
    return 0
751
  except Exception: # pylint: disable=W0703
752
    return 1
753

    
754

    
755
def SearchTags(opts, args):
756
  """Searches the tags on all the cluster.
757

758
  @param opts: the command line options selected by the user
759
  @type args: list
760
  @param args: should contain only one element, the tag pattern
761
  @rtype: int
762
  @return: the desired exit code
763

764
  """
765
  op = opcodes.OpTagsSearch(pattern=args[0])
766
  result = SubmitOpCode(op, opts=opts)
767
  if not result:
768
    return 1
769
  result = list(result)
770
  result.sort()
771
  for path, tag in result:
772
    ToStdout("%s %s", path, tag)
773

    
774

    
775
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
776
  """Reads and verifies an X509 certificate.
777

778
  @type cert_filename: string
779
  @param cert_filename: the path of the file containing the certificate to
780
                        verify encoded in PEM format
781
  @type verify_private_key: bool
782
  @param verify_private_key: whether to verify the private key in addition to
783
                             the public certificate
784
  @rtype: string
785
  @return: a string containing the PEM-encoded certificate.
786

787
  """
788
  try:
789
    pem = utils.ReadFile(cert_filename)
790
  except IOError, err:
791
    raise errors.X509CertError(cert_filename,
792
                               "Unable to read certificate: %s" % str(err))
793

    
794
  try:
795
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
796
  except Exception, err:
797
    raise errors.X509CertError(cert_filename,
798
                               "Unable to load certificate: %s" % str(err))
799

    
800
  if verify_private_key:
801
    try:
802
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
803
    except Exception, err:
804
      raise errors.X509CertError(cert_filename,
805
                                 "Unable to load private key: %s" % str(err))
806

    
807
  return pem
808

    
809

    
810
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
811
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
812
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
813
                 cds_filename, force):
814
  """Renews cluster certificates, keys and secrets.
815

816
  @type new_cluster_cert: bool
817
  @param new_cluster_cert: Whether to generate a new cluster certificate
818
  @type new_rapi_cert: bool
819
  @param new_rapi_cert: Whether to generate a new RAPI certificate
820
  @type rapi_cert_filename: string
821
  @param rapi_cert_filename: Path to file containing new RAPI certificate
822
  @type new_spice_cert: bool
823
  @param new_spice_cert: Whether to generate a new SPICE certificate
824
  @type spice_cert_filename: string
825
  @param spice_cert_filename: Path to file containing new SPICE certificate
826
  @type spice_cacert_filename: string
827
  @param spice_cacert_filename: Path to file containing the certificate of the
828
                                CA that signed the SPICE certificate
829
  @type new_confd_hmac_key: bool
830
  @param new_confd_hmac_key: Whether to generate a new HMAC key
831
  @type new_cds: bool
832
  @param new_cds: Whether to generate a new cluster domain secret
833
  @type cds_filename: string
834
  @param cds_filename: Path to file containing new cluster domain secret
835
  @type force: bool
836
  @param force: Whether to ask user for confirmation
837

838
  """
839
  if new_rapi_cert and rapi_cert_filename:
840
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
841
             " options can be specified at the same time.")
842
    return 1
843

    
844
  if new_cds and cds_filename:
845
    ToStderr("Only one of the --new-cluster-domain-secret and"
846
             " --cluster-domain-secret options can be specified at"
847
             " the same time.")
848
    return 1
849

    
850
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
851
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
852
             " and --spice-ca-certificate must not be used.")
853
    return 1
854

    
855
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
856
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
857
             " specified.")
858
    return 1
859

    
860
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
861
  try:
862
    if rapi_cert_filename:
863
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
864
    if spice_cert_filename:
865
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
866
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
867
  except errors.X509CertError, err:
868
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
869
    return 1
870

    
871
  if cds_filename:
872
    try:
873
      cds = utils.ReadFile(cds_filename)
874
    except Exception, err: # pylint: disable=W0703
875
      ToStderr("Can't load new cluster domain secret from %s: %s" %
876
               (cds_filename, str(err)))
877
      return 1
878
  else:
879
    cds = None
880

    
881
  if not force:
882
    usertext = ("This requires all daemons on all nodes to be restarted and"
883
                " may take some time. Continue?")
884
    if not AskUser(usertext):
885
      return 1
886

    
887
  def _RenewCryptoInner(ctx):
888
    ctx.feedback_fn("Updating certificates and keys")
889
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
890
                                    new_rapi_cert,
891
                                    new_spice_cert,
892
                                    new_confd_hmac_key,
893
                                    new_cds,
894
                                    rapi_cert_pem=rapi_cert_pem,
895
                                    spice_cert_pem=spice_cert_pem,
896
                                    spice_cacert_pem=spice_cacert_pem,
897
                                    cds=cds)
898

    
899
    files_to_copy = []
900

    
901
    if new_cluster_cert:
902
      files_to_copy.append(pathutils.NODED_CERT_FILE)
903

    
904
    if new_rapi_cert or rapi_cert_pem:
905
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
906

    
907
    if new_spice_cert or spice_cert_pem:
908
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
909
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
910

    
911
    if new_confd_hmac_key:
912
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
913

    
914
    if new_cds or cds:
915
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
916

    
917
    if files_to_copy:
918
      for node_name in ctx.nonmaster_nodes:
919
        ctx.feedback_fn("Copying %s to %s" %
920
                        (", ".join(files_to_copy), node_name))
921
        for file_name in files_to_copy:
922
          ctx.ssh.CopyFileToNode(node_name, file_name)
923

    
924
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
925

    
926
  ToStdout("All requested certificates and keys have been replaced."
927
           " Running \"gnt-cluster verify\" now is recommended.")
928

    
929
  return 0
930

    
931

    
932
def RenewCrypto(opts, args):
933
  """Renews cluster certificates, keys and secrets.
934

935
  """
936
  return _RenewCrypto(opts.new_cluster_cert,
937
                      opts.new_rapi_cert,
938
                      opts.rapi_cert,
939
                      opts.new_spice_cert,
940
                      opts.spice_cert,
941
                      opts.spice_cacert,
942
                      opts.new_confd_hmac_key,
943
                      opts.new_cluster_domain_secret,
944
                      opts.cluster_domain_secret,
945
                      opts.force)
946

    
947

    
948
def SetClusterParams(opts, args):
949
  """Modify the cluster.
950

951
  @param opts: the command line options selected by the user
952
  @type args: list
953
  @param args: should be an empty list
954
  @rtype: int
955
  @return: the desired exit code
956

957
  """
958
  if not (not opts.lvm_storage or opts.vg_name or
959
          not opts.drbd_storage or opts.drbd_helper or
960
          opts.enabled_hypervisors or opts.hvparams or
961
          opts.beparams or opts.nicparams or
962
          opts.ndparams or opts.diskparams or
963
          opts.candidate_pool_size is not None or
964
          opts.uid_pool is not None or
965
          opts.maintain_node_health is not None or
966
          opts.add_uids is not None or
967
          opts.remove_uids is not None or
968
          opts.default_iallocator is not None or
969
          opts.reserved_lvs is not None or
970
          opts.master_netdev is not None or
971
          opts.master_netmask is not None or
972
          opts.use_external_mip_script is not None or
973
          opts.prealloc_wipe_disks is not None or
974
          opts.hv_state or
975
          opts.enabled_storage_types or
976
          opts.disk_state or
977
          opts.ispecs_mem_size or
978
          opts.ispecs_cpu_count or
979
          opts.ispecs_disk_count or
980
          opts.ispecs_disk_size or
981
          opts.ispecs_nic_count or
982
          opts.ipolicy_disk_templates is not None or
983
          opts.ipolicy_vcpu_ratio is not None or
984
          opts.ipolicy_spindle_ratio is not None):
985
    ToStderr("Please give at least one of the parameters.")
986
    return 1
987

    
988
  vg_name = opts.vg_name
989
  if not opts.lvm_storage and opts.vg_name:
990
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
991
    return 1
992

    
993
  if not opts.lvm_storage:
994
    vg_name = ""
995

    
996
  drbd_helper = opts.drbd_helper
997
  if not opts.drbd_storage and opts.drbd_helper:
998
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
999
    return 1
1000

    
1001
  if not opts.drbd_storage:
1002
    drbd_helper = ""
1003

    
1004
  hvlist = opts.enabled_hypervisors
1005
  if hvlist is not None:
1006
    hvlist = hvlist.split(",")
1007

    
1008
  enabled_storage_types = opts.enabled_storage_types
1009
  if enabled_storage_types is not None:
1010
    enabled_storage_types = enabled_storage_types.split(",")
1011

    
1012
  # a list of (name, dict) we can pass directly to dict() (or [])
1013
  hvparams = dict(opts.hvparams)
1014
  for hv_params in hvparams.values():
1015
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1016

    
1017
  diskparams = dict(opts.diskparams)
1018

    
1019
  for dt_params in diskparams.values():
1020
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1021

    
1022
  beparams = opts.beparams
1023
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1024

    
1025
  nicparams = opts.nicparams
1026
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1027

    
1028
  ndparams = opts.ndparams
1029
  if ndparams is not None:
1030
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1031

    
1032
  ipolicy = CreateIPolicyFromOpts(
1033
    ispecs_mem_size=opts.ispecs_mem_size,
1034
    ispecs_cpu_count=opts.ispecs_cpu_count,
1035
    ispecs_disk_count=opts.ispecs_disk_count,
1036
    ispecs_disk_size=opts.ispecs_disk_size,
1037
    ispecs_nic_count=opts.ispecs_nic_count,
1038
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1039
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1040
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1041
    )
1042

    
1043
  mnh = opts.maintain_node_health
1044

    
1045
  uid_pool = opts.uid_pool
1046
  if uid_pool is not None:
1047
    uid_pool = uidpool.ParseUidPool(uid_pool)
1048

    
1049
  add_uids = opts.add_uids
1050
  if add_uids is not None:
1051
    add_uids = uidpool.ParseUidPool(add_uids)
1052

    
1053
  remove_uids = opts.remove_uids
1054
  if remove_uids is not None:
1055
    remove_uids = uidpool.ParseUidPool(remove_uids)
1056

    
1057
  if opts.reserved_lvs is not None:
1058
    if opts.reserved_lvs == "":
1059
      opts.reserved_lvs = []
1060
    else:
1061
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1062

    
1063
  if opts.master_netmask is not None:
1064
    try:
1065
      opts.master_netmask = int(opts.master_netmask)
1066
    except ValueError:
1067
      ToStderr("The --master-netmask option expects an int parameter.")
1068
      return 1
1069

    
1070
  ext_ip_script = opts.use_external_mip_script
1071

    
1072
  if opts.disk_state:
1073
    disk_state = utils.FlatToDict(opts.disk_state)
1074
  else:
1075
    disk_state = {}
1076

    
1077
  hv_state = dict(opts.hv_state)
1078

    
1079
  op = opcodes.OpClusterSetParams(
1080
    vg_name=vg_name,
1081
    drbd_helper=drbd_helper,
1082
    enabled_hypervisors=hvlist,
1083
    hvparams=hvparams,
1084
    os_hvp=None,
1085
    beparams=beparams,
1086
    nicparams=nicparams,
1087
    ndparams=ndparams,
1088
    diskparams=diskparams,
1089
    ipolicy=ipolicy,
1090
    candidate_pool_size=opts.candidate_pool_size,
1091
    maintain_node_health=mnh,
1092
    uid_pool=uid_pool,
1093
    add_uids=add_uids,
1094
    remove_uids=remove_uids,
1095
    default_iallocator=opts.default_iallocator,
1096
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1097
    master_netdev=opts.master_netdev,
1098
    master_netmask=opts.master_netmask,
1099
    reserved_lvs=opts.reserved_lvs,
1100
    use_external_mip_script=ext_ip_script,
1101
    hv_state=hv_state,
1102
    disk_state=disk_state,
1103
    enabled_storage_types=enabled_storage_types,
1104
    )
1105
  SubmitOrSend(op, opts)
1106
  return 0
1107

    
1108

    
1109
def QueueOps(opts, args):
1110
  """Queue operations.
1111

1112
  @param opts: the command line options selected by the user
1113
  @type args: list
1114
  @param args: should contain only one element, the subcommand
1115
  @rtype: int
1116
  @return: the desired exit code
1117

1118
  """
1119
  command = args[0]
1120
  client = GetClient()
1121
  if command in ("drain", "undrain"):
1122
    drain_flag = command == "drain"
1123
    client.SetQueueDrainFlag(drain_flag)
1124
  elif command == "info":
1125
    result = client.QueryConfigValues(["drain_flag"])
1126
    if result[0]:
1127
      val = "set"
1128
    else:
1129
      val = "unset"
1130
    ToStdout("The drain flag is %s" % val)
1131
  else:
1132
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1133
                               errors.ECODE_INVAL)
1134

    
1135
  return 0
1136

    
1137

    
1138
def _ShowWatcherPause(until):
1139
  if until is None or until < time.time():
1140
    ToStdout("The watcher is not paused.")
1141
  else:
1142
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1143

    
1144

    
1145
def WatcherOps(opts, args):
1146
  """Watcher operations.
1147

1148
  @param opts: the command line options selected by the user
1149
  @type args: list
1150
  @param args: should contain only one element, the subcommand
1151
  @rtype: int
1152
  @return: the desired exit code
1153

1154
  """
1155
  command = args[0]
1156
  client = GetClient()
1157

    
1158
  if command == "continue":
1159
    client.SetWatcherPause(None)
1160
    ToStdout("The watcher is no longer paused.")
1161

    
1162
  elif command == "pause":
1163
    if len(args) < 2:
1164
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1165

    
1166
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1167
    _ShowWatcherPause(result)
1168

    
1169
  elif command == "info":
1170
    result = client.QueryConfigValues(["watcher_pause"])
1171
    _ShowWatcherPause(result[0])
1172

    
1173
  else:
1174
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1175
                               errors.ECODE_INVAL)
1176

    
1177
  return 0
1178

    
1179

    
1180
def _OobPower(opts, node_list, power):
1181
  """Puts the node in the list to desired power state.
1182

1183
  @param opts: The command line options selected by the user
1184
  @param node_list: The list of nodes to operate on
1185
  @param power: True if they should be powered on, False otherwise
1186
  @return: The success of the operation (none failed)
1187

1188
  """
1189
  if power:
1190
    command = constants.OOB_POWER_ON
1191
  else:
1192
    command = constants.OOB_POWER_OFF
1193

    
1194
  op = opcodes.OpOobCommand(node_names=node_list,
1195
                            command=command,
1196
                            ignore_status=True,
1197
                            timeout=opts.oob_timeout,
1198
                            power_delay=opts.power_delay)
1199
  result = SubmitOpCode(op, opts=opts)
1200
  errs = 0
1201
  for node_result in result:
1202
    (node_tuple, data_tuple) = node_result
1203
    (_, node_name) = node_tuple
1204
    (data_status, _) = data_tuple
1205
    if data_status != constants.RS_NORMAL:
1206
      assert data_status != constants.RS_UNAVAIL
1207
      errs += 1
1208
      ToStderr("There was a problem changing power for %s, please investigate",
1209
               node_name)
1210

    
1211
  if errs > 0:
1212
    return False
1213

    
1214
  return True
1215

    
1216

    
1217
def _InstanceStart(opts, inst_list, start, no_remember=False):
1218
  """Puts the instances in the list to desired state.
1219

1220
  @param opts: The command line options selected by the user
1221
  @param inst_list: The list of instances to operate on
1222
  @param start: True if they should be started, False for shutdown
1223
  @param no_remember: If the instance state should be remembered
1224
  @return: The success of the operation (none failed)
1225

1226
  """
1227
  if start:
1228
    opcls = opcodes.OpInstanceStartup
1229
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1230
  else:
1231
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1232
                           timeout=opts.shutdown_timeout,
1233
                           no_remember=no_remember)
1234
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1235

    
1236
  jex = JobExecutor(opts=opts)
1237

    
1238
  for inst in inst_list:
1239
    ToStdout("Submit %s of instance %s", text_submit, inst)
1240
    op = opcls(instance_name=inst)
1241
    jex.QueueJob(inst, op)
1242

    
1243
  results = jex.GetResults()
1244
  bad_cnt = len([1 for (success, _) in results if not success])
1245

    
1246
  if bad_cnt == 0:
1247
    ToStdout("All instances have been %s successfully", text_success)
1248
  else:
1249
    ToStderr("There were errors while %s instances:\n"
1250
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1251
             len(results))
1252
    return False
1253

    
1254
  return True
1255

    
1256

    
1257
class _RunWhenNodesReachableHelper:
1258
  """Helper class to make shared internal state sharing easier.
1259

1260
  @ivar success: Indicates if all action_cb calls were successful
1261

1262
  """
1263
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1264
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1265
    """Init the object.
1266

1267
    @param node_list: The list of nodes to be reachable
1268
    @param action_cb: Callback called when a new host is reachable
1269
    @type node2ip: dict
1270
    @param node2ip: Node to ip mapping
1271
    @param port: The port to use for the TCP ping
1272
    @param feedback_fn: The function used for feedback
1273
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1274
    @param _sleep_fn: Function to sleep (for unittest use only)
1275

1276
    """
1277
    self.down = set(node_list)
1278
    self.up = set()
1279
    self.node2ip = node2ip
1280
    self.success = True
1281
    self.action_cb = action_cb
1282
    self.port = port
1283
    self.feedback_fn = feedback_fn
1284
    self._ping_fn = _ping_fn
1285
    self._sleep_fn = _sleep_fn
1286

    
1287
  def __call__(self):
1288
    """When called we run action_cb.
1289

1290
    @raises utils.RetryAgain: When there are still down nodes
1291

1292
    """
1293
    if not self.action_cb(self.up):
1294
      self.success = False
1295

    
1296
    if self.down:
1297
      raise utils.RetryAgain()
1298
    else:
1299
      return self.success
1300

    
1301
  def Wait(self, secs):
1302
    """Checks if a host is up or waits remaining seconds.
1303

1304
    @param secs: The secs remaining
1305

1306
    """
1307
    start = time.time()
1308
    for node in self.down:
1309
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1310
                       live_port_needed=True):
1311
        self.feedback_fn("Node %s became available" % node)
1312
        self.up.add(node)
1313
        self.down -= self.up
1314
        # If we have a node available there is the possibility to run the
1315
        # action callback successfully, therefore we don't wait and return
1316
        return
1317

    
1318
    self._sleep_fn(max(0.0, start + secs - time.time()))
1319

    
1320

    
1321
def _RunWhenNodesReachable(node_list, action_cb, interval):
1322
  """Run action_cb when nodes become reachable.
1323

1324
  @param node_list: The list of nodes to be reachable
1325
  @param action_cb: Callback called when a new host is reachable
1326
  @param interval: The earliest time to retry
1327

1328
  """
1329
  client = GetClient()
1330
  cluster_info = client.QueryClusterInfo()
1331
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1332
    family = netutils.IPAddress.family
1333
  else:
1334
    family = netutils.IP6Address.family
1335

    
1336
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1337
                 for node in node_list)
1338

    
1339
  port = netutils.GetDaemonPort(constants.NODED)
1340
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1341
                                        ToStdout)
1342

    
1343
  try:
1344
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1345
                       wait_fn=helper.Wait)
1346
  except utils.RetryTimeout:
1347
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1348
             " again:\n  - %s", "  - ".join(helper.down))
1349
    return False
1350

    
1351

    
1352
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1353
                          _instance_start_fn=_InstanceStart):
1354
  """Start the instances conditional based on node_states.
1355

1356
  @param opts: The command line options selected by the user
1357
  @param inst_map: A dict of inst -> nodes mapping
1358
  @param nodes_online: A list of nodes online
1359
  @param _instance_start_fn: Callback to start instances (unittest use only)
1360
  @return: Success of the operation on all instances
1361

1362
  """
1363
  start_inst_list = []
1364
  for (inst, nodes) in inst_map.items():
1365
    if not (nodes - nodes_online):
1366
      # All nodes the instance lives on are back online
1367
      start_inst_list.append(inst)
1368

    
1369
  for inst in start_inst_list:
1370
    del inst_map[inst]
1371

    
1372
  if start_inst_list:
1373
    return _instance_start_fn(opts, start_inst_list, True)
1374

    
1375
  return True
1376

    
1377

    
1378
def _EpoOn(opts, full_node_list, node_list, inst_map):
1379
  """Does the actual power on.
1380

1381
  @param opts: The command line options selected by the user
1382
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1383
                         OOB)
1384
  @param node_list: The list of nodes to operate on (all need to support OOB)
1385
  @param inst_map: A dict of inst -> nodes mapping
1386
  @return: The desired exit status
1387

1388
  """
1389
  if node_list and not _OobPower(opts, node_list, False):
1390
    ToStderr("Not all nodes seem to get back up, investigate and start"
1391
             " manually if needed")
1392

    
1393
  # Wait for the nodes to be back up
1394
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1395

    
1396
  ToStdout("Waiting until all nodes are available again")
1397
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1398
    ToStderr("Please investigate and start stopped instances manually")
1399
    return constants.EXIT_FAILURE
1400

    
1401
  return constants.EXIT_SUCCESS
1402

    
1403

    
1404
def _EpoOff(opts, node_list, inst_map):
1405
  """Does the actual power off.
1406

1407
  @param opts: The command line options selected by the user
1408
  @param node_list: The list of nodes to operate on (all need to support OOB)
1409
  @param inst_map: A dict of inst -> nodes mapping
1410
  @return: The desired exit status
1411

1412
  """
1413
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1414
    ToStderr("Please investigate and stop instances manually before continuing")
1415
    return constants.EXIT_FAILURE
1416

    
1417
  if not node_list:
1418
    return constants.EXIT_SUCCESS
1419

    
1420
  if _OobPower(opts, node_list, False):
1421
    return constants.EXIT_SUCCESS
1422
  else:
1423
    return constants.EXIT_FAILURE
1424

    
1425

    
1426
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1427
        _confirm_fn=ConfirmOperation,
1428
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1429
  """EPO operations.
1430

1431
  @param opts: the command line options selected by the user
1432
  @type args: list
1433
  @param args: should contain only one element, the subcommand
1434
  @rtype: int
1435
  @return: the desired exit code
1436

1437
  """
1438
  if opts.groups and opts.show_all:
1439
    _stderr_fn("Only one of --groups or --all are allowed")
1440
    return constants.EXIT_FAILURE
1441
  elif args and opts.show_all:
1442
    _stderr_fn("Arguments in combination with --all are not allowed")
1443
    return constants.EXIT_FAILURE
1444

    
1445
  if cl is None:
1446
    cl = GetClient()
1447

    
1448
  if opts.groups:
1449
    node_query_list = \
1450
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1451
  else:
1452
    node_query_list = args
1453

    
1454
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1455
                                           "sinst_list", "powered", "offline"],
1456
                         False)
1457

    
1458
  all_nodes = map(compat.fst, result)
1459
  node_list = []
1460
  inst_map = {}
1461
  for (node, master, pinsts, sinsts, powered, offline) in result:
1462
    if not offline:
1463
      for inst in (pinsts + sinsts):
1464
        if inst in inst_map:
1465
          if not master:
1466
            inst_map[inst].add(node)
1467
        elif master:
1468
          inst_map[inst] = set()
1469
        else:
1470
          inst_map[inst] = set([node])
1471

    
1472
    if master and opts.on:
1473
      # We ignore the master for turning on the machines, in fact we are
1474
      # already operating on the master at this point :)
1475
      continue
1476
    elif master and not opts.show_all:
1477
      _stderr_fn("%s is the master node, please do a master-failover to another"
1478
                 " node not affected by the EPO or use --all if you intend to"
1479
                 " shutdown the whole cluster", node)
1480
      return constants.EXIT_FAILURE
1481
    elif powered is None:
1482
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1483
                 " handled in a fully automated manner", node)
1484
    elif powered == opts.on:
1485
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1486
    elif not offline or (offline and powered):
1487
      node_list.append(node)
1488

    
1489
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1490
    return constants.EXIT_FAILURE
1491

    
1492
  if opts.on:
1493
    return _on_fn(opts, all_nodes, node_list, inst_map)
1494
  else:
1495
    return _off_fn(opts, node_list, inst_map)
1496

    
1497

    
1498
commands = {
1499
  "init": (
1500
    InitCluster, [ArgHost(min=1, max=1)],
1501
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1502
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1503
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1504
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1505
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1506
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1507
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1508
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_STORAGE_TYPES_OPT]
1509
     + INSTANCE_POLICY_OPTS,
1510
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1511
  "destroy": (
1512
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1513
    "", "Destroy cluster"),
1514
  "rename": (
1515
    RenameCluster, [ArgHost(min=1, max=1)],
1516
    [FORCE_OPT, DRY_RUN_OPT],
1517
    "<new_name>",
1518
    "Renames the cluster"),
1519
  "redist-conf": (
1520
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1521
    "", "Forces a push of the configuration file and ssconf files"
1522
    " to the nodes in the cluster"),
1523
  "verify": (
1524
    VerifyCluster, ARGS_NONE,
1525
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1526
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1527
    "", "Does a check on the cluster configuration"),
1528
  "verify-disks": (
1529
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1530
    "", "Does a check on the cluster disk status"),
1531
  "repair-disk-sizes": (
1532
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1533
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1534
  "master-failover": (
1535
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1536
    "", "Makes the current node the master"),
1537
  "master-ping": (
1538
    MasterPing, ARGS_NONE, [],
1539
    "", "Checks if the master is alive"),
1540
  "version": (
1541
    ShowClusterVersion, ARGS_NONE, [],
1542
    "", "Shows the cluster version"),
1543
  "getmaster": (
1544
    ShowClusterMaster, ARGS_NONE, [],
1545
    "", "Shows the cluster master"),
1546
  "copyfile": (
1547
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1548
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1549
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1550
  "command": (
1551
    RunClusterCommand, [ArgCommand(min=1)],
1552
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1553
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1554
  "info": (
1555
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1556
    "[--roman]", "Show cluster configuration"),
1557
  "list-tags": (
1558
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1559
  "add-tags": (
1560
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1561
    "tag...", "Add tags to the cluster"),
1562
  "remove-tags": (
1563
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1564
    "tag...", "Remove tags from the cluster"),
1565
  "search-tags": (
1566
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1567
    "Searches the tags on all objects on"
1568
    " the cluster for a given pattern (regex)"),
1569
  "queue": (
1570
    QueueOps,
1571
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1572
    [], "drain|undrain|info", "Change queue properties"),
1573
  "watcher": (
1574
    WatcherOps,
1575
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1576
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1577
    [],
1578
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1579
  "modify": (
1580
    SetClusterParams, ARGS_NONE,
1581
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1582
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1583
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1584
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1585
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1586
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1587
     DISK_STATE_OPT, SUBMIT_OPT, ENABLED_STORAGE_TYPES_OPT] +
1588
    INSTANCE_POLICY_OPTS,
1589
    "[opts...]",
1590
    "Alters the parameters of the cluster"),
1591
  "renew-crypto": (
1592
    RenewCrypto, ARGS_NONE,
1593
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1594
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1595
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1596
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1597
    "[opts...]",
1598
    "Renews cluster certificates, keys and secrets"),
1599
  "epo": (
1600
    Epo, [ArgUnknown()],
1601
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1602
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1603
    "[opts...] [args]",
1604
    "Performs an emergency power-off on given args"),
1605
  "activate-master-ip": (
1606
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1607
  "deactivate-master-ip": (
1608
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1609
    "Deactivates the master IP"),
1610
  }
1611

    
1612

    
1613
#: dictionary with aliases for commands
1614
aliases = {
1615
  "masterfailover": "master-failover",
1616
  "show": "info",
1617
}
1618

    
1619

    
1620
def Main():
1621
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1622
                     aliases=aliases)